-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Add support for optional kafka SASL #24798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for optional kafka SASL #24798
Conversation
9bc1fd1
to
9cc5a3a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Can you attribute trinodb/trino#6929 in your commit message ? See 'backport commits' section from the CONTRIBUTING doc
- Can you update the
kafka.rst
file with the new SASL properties
9cc5a3a
to
a004651
Compare
@aaneja - Addressed the review comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the doc! Looks good. Some minor formatting suggestions, and one question for you to consider.
a004651
to
a26ab6a
Compare
Thanks @steveburnett for reviewing the documentation-related changes. I’ve addressed all your review comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a product test that uses these new configs ? We currently have a singlenode-kafka
test, lets make a new one called singlenode-kafka-sasl
public class SaslKafkaConsumerManager | ||
implements KafkaConsumerManager | ||
{ | ||
private final ImmutableMap<String, Object> map; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change type to Map<String, Object>
public class SaslKafkaProducerFactory | ||
implements KafkaProducerFactory | ||
{ | ||
private final ImmutableMap<String, Object> map; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this to kafkaClientConfigProperties
, and make it a Map<String,Object>
Properties properties = new Properties(); | ||
properties.putAll(delegate.configure(bootstrapServers)); | ||
properties.putAll(map); | ||
log.info("Loading SASL Configs for Kafka..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the log - it does not add any information
public SaslKafkaProducerFactory(@ForKafkaSasl KafkaProducerFactory delegate, KafkaSaslConfig saslConfig) | ||
{ | ||
this.delegate = requireNonNull(delegate, "delegate is null"); | ||
map = ImmutableMap.copyOf(saslConfig.getKafkaClientProperties()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we just store a reference to the saslConfig
and use it in configure when needed ?
.build(); | ||
return new KafkaProducer<>(propertiesWithBootstrapServers, new ByteArraySerializer(), new ByteArraySerializer()); | ||
.collect(joining(","))); | ||
log.info("Loading Default Configs for Kafka..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the log, it does not add any value
.put(ACKS_CONFIG, "all") | ||
.put(LINGER_MS_CONFIG, 5) | ||
.build(); | ||
configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating two maps, make a single Map<String, Object>
called configurationProperties
and store the ACKS_CONFIG
and LINGER_MS_CONFIG
in them
{ | ||
this.properties = ImmutableMap.<String, Object>builder() | ||
this.map = ImmutableMap.<String, Object>builder() | ||
.put(ACKS_CONFIG, "all") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these non-default values used, and why are they not configurable ?
public SaslKafkaConsumerManager(@ForKafkaSasl KafkaConsumerManager delegate, KafkaSaslConfig saslConfig) | ||
{ | ||
this.delegate = requireNonNull(delegate, "delegate is null"); | ||
map = ImmutableMap.copyOf(saslConfig.getKafkaClientProperties()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map
is redundant, just use properties.putAll(saslConfig.getKafkaClientProperties())
on L50
properties.put(GROUP_ID_CONFIG, threadName); | ||
properties.put(MAX_POLL_RECORDS_CONFIG, Integer.toString(maxPollRecords)); | ||
properties.put(MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes); | ||
properties.put(CLIENT_ID_CONFIG, String.format("%s-%s", threadName, hostAddress.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toString()
is redundant
return this; | ||
} | ||
|
||
public Map<String, Object> getKafkaClientProperties() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this to getKafkaSaslProperties()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick update! I found a few more nits and suggestions, again nothing major.
08fbc5e
to
6903a7e
Compare
@aaneja - I have addressed all your comments regarding the Java-related changes. |
Thanks again, @steveburnett, for your suggestions. I have addressed all your comments regarding the doc-related changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
Pull updated branch, new local doc build, looks good. Thanks!
presto-kafka/src/main/java/com/facebook/presto/kafka/SaslKafkaConsumerManager.java
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/SaslKafkaProducerFactory.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/SaslKafkaProducerFactory.java
Show resolved
Hide resolved
final Properties properties = new Properties(); | ||
properties.put(BOOTSTRAP_SERVERS_CONFIG, hostAddress.toString()); | ||
properties.put(GROUP_ID_CONFIG, threadName); | ||
properties.put(MAX_POLL_RECORDS_CONFIG, Integer.toString(maxPollRecords)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the Integer.toString
needed ? From a quick glance at ConsumerConfig, it looks like we could use the integer directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaneja - We can pass the value either as an int or a String, but to avoid any issues during runtime, it's better to stick with the existing code pattern used in the initial implementation of the KafkaConsumerManager class, which passes it as a String. Let me know if you’d prefer using an Integer instead — happy to update it.
6fa2430
6903a7e
to
6fa2430
Compare
Please create an issue to implement the |
Git issue : #24993 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
Pull updated branch, new local doc build, looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor things, overall this is good
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/security/KafkaKeystoreTruststoreType.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java
Outdated
Show resolved
Hide resolved
dde1f1f
6fa2430
to
dde1f1f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also rebase this PR on the latest master branch? I noticed the git log doesn't include anything from April
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/security/KafkaKeystoreTruststoreType.java
Outdated
Show resolved
Hide resolved
dde1f1f
to
51cf71e
Compare
Rebased the PR with the latest commits from the master branch and addressed all your comments. |
Cherry-pick of trinodb/trino#6929 Co-authored-by: Kenzyme Le <kl@kenzymele.com> Co-authored-by: Matthias Strobl <matthias.strobl@bmw.de>
51cf71e
to
32c46eb
Compare
Thanks for the release note! Minor nit suggestions:
|
Done |
Description
Currently the presto-kafka connector can be configured to use SASL authentication (https://docs.confluent.io/platform/current/security/authentication/sasl/plain/overview.html#kafka-sasl-auth-plain) those config properties were made known to the KafkaConfig and are used in the producer as well as consumer factory if being set (all optional).
As of now, we have configured the following parameters for the above requirement:
As part of the code-level configuration, represent the parameter naming as below and map them to the corresponding Kafka config parameters:
Taking reference from Trinodb Kafka SSL implementation: trinodb/trino#6929
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.