-
Notifications
You must be signed in to change notification settings - Fork 691
Add log channel to admin client #1448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for forwarding librdkafka log events to a Go channel in the AdminClient, enabling programmatic log consumption for improved observability.
- Adds a log channel setup in NewAdminClient and cleans it up in Close.
- Introduces a new unit test (TestAdminClientLog) verifying the log event forwarding functionality.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
kafka/adminapi_test.go | Added unit test to validate log event delivery. |
kafka/adminapi.go | Added support for log channel configuration and termination. |
Comments suppressed due to low confidence (1)
kafka/adminapi.go:3733
- [nitpick] Consider using a channel of type struct{} for termination signals instead of bool to better convey that only a signal is needed.
// before we do anything with the configuration, create a copy such that
{"INIT", "librdkafka"}: false, | ||
} | ||
|
||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider adding an explicit termination condition in the goroutine reading from logsChan to avoid potential goroutine leaks when no more log events are sent. For example, signal closure of logsChan when the test completes.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in acdb44b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one change, and add a CHANGELOG.md entry, keep the version of the entry as 2.12 for now
@@ -2737,6 +2738,9 @@ func (a *AdminClient) Close() { | |||
return | |||
} | |||
|
|||
if a.adminTermChan != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a Logs() method onto the AdminClient, too, which returns a.handle.logs, to be used in case "go.logs.channel.enable": true,
but "go.logs.channel": nil,
or unset.
After this, if handle.closeLogsChan
is true, then also close the logs channel here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7245731
to
f9051be
Compare
@@ -1336,3 +1336,135 @@ func TestAdminAPIs(t *testing.T) { | |||
|
|||
a.Close() | |||
} | |||
|
|||
func TestAdminClientLog(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like this to reduce the Cognitive Complexity
func TestAdminClientLog(t *testing.T) { | |
func TestAdminClientLog(t *testing.T) { | |
logsChan := make(chan LogEvent, 100) | |
admin, err := NewAdminClient(&ConfigMap{ | |
"debug": "all", | |
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs | |
"go.logs.channel.enable": true, | |
"go.logs.channel": logsChan, | |
}) | |
if err != nil { | |
t.Fatalf("Failed to create AdminClient: %v", err) | |
} | |
defer func() { | |
admin.Close() | |
close(logsChan) | |
}() | |
// Verify that Logs() method returns the correct channel | |
if admin.Logs() != logsChan { | |
t.Fatalf("Expected admin.Logs() %v == logsChan %v", admin.Logs(), logsChan) | |
} | |
// Define expected logs to validate | |
expectedLogs := []struct { | |
tag string | |
message string | |
found bool | |
}{ | |
{"INIT", "librdkafka", false}, | |
} | |
go func() { | |
for log := range logsChan { | |
t.Log(log.String()) | |
// Check each expected log | |
for i := range expectedLogs { | |
if !expectedLogs[i].found && log.Tag == expectedLogs[i].tag && | |
strings.Contains(log.Message, expectedLogs[i].message) { | |
expectedLogs[i].found = true | |
} | |
} | |
} | |
}() | |
<-time.After(time.Second * 5) | |
// Validate all expected logs were found | |
for _, expected := range expectedLogs { | |
if !expected.found { | |
t.Errorf("Expected to find log with tag `%s' and message containing `%s', but didn't find any.", | |
expected.tag, expected.message) | |
} | |
} | |
} |
} | ||
} | ||
|
||
func TestAdminClientLogWithoutChannel(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the other comment
func TestAdminClientLogWithoutChannel(t *testing.T) { | |
func TestAdminClientLogWithoutChannel(t *testing.T) { | |
admin, err := NewAdminClient(&ConfigMap{ | |
"debug": "all", | |
"bootstrap.servers": "localhost:65533", // Unreachable to generate logs | |
"go.logs.channel.enable": true, | |
// Note: no "go.logs.channel" specified | |
}) | |
if err != nil { | |
t.Fatalf("Failed to create AdminClient: %v", err) | |
} | |
defer admin.Close() | |
// Verify that Logs() method returns a channel when enabled but no channel provided | |
logsChan := admin.Logs() | |
if logsChan == nil { | |
t.Fatalf("Expected admin.Logs() to return a channel, got nil") | |
} | |
// Define expected logs to validate | |
expectedLogs := []struct { | |
tag string | |
message string | |
found bool | |
}{ | |
{"INIT", "librdkafka", false}, | |
} | |
go func() { | |
for log := range logsChan { | |
t.Log(log.String()) | |
// Check each expected log | |
for i := range expectedLogs { | |
if !expectedLogs[i].found && log.Tag == expectedLogs[i].tag && | |
strings.Contains(log.Message, expectedLogs[i].message) { | |
expectedLogs[i].found = true | |
} | |
} | |
} | |
}() | |
<-time.After(time.Second * 5) | |
// Validate all expected logs were found | |
for _, expected := range expectedLogs { | |
if !expected.found { | |
t.Errorf("Expected to find log with tag `%s' and message containing `%s', but didn't find any.", | |
expected.tag, expected.message) | |
} | |
} | |
} |
Summary
This PR adds support for forwarding librdkafka log events to a Go channel in the AdminClient, similar to the existing functionality in the Producer and Consumer clients. This allows users to programmatically consume and process log events generated by the underlying librdkafka instance used by the AdminClient.
Motivation
where users requested the ability to receive log events from the AdminClient via a Go channel.
Changes
Test
Unit Test
Manual test