Skip to content

Conversation

ShahimSharafudeen
Copy link
Contributor

@ShahimSharafudeen ShahimSharafudeen commented Mar 26, 2025

Description

Currently the presto-kafka connector can be configured to use SASL authentication (https://docs.confluent.io/platform/current/security/authentication/sasl/plain/overview.html#kafka-sasl-auth-plain) those config properties were made known to the KafkaConfig and are used in the producer as well as consumer factory if being set (all optional).

As of now, we have configured the following parameters for the above requirement:

  1. security.protocol
  2. sasl.mechanism
  3. sasl.jaas.config
  4. ssl.truststore.location
  5. ssl.truststore.password
  6. ssl.truststore.type

As part of the code-level configuration, represent the parameter naming as below and map them to the corresponding Kafka config parameters:

  1. kafka.security-protocol=SASL_SSL
  2. kafka.sasl.mechanism=PLAIN
  3. kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username= password=<API KEY/PASSWORD>;
  4. kafka.truststore.path=XXX
  5. kafka.truststore.password=XXX
  6. kafka.truststore.type=XXX

Taking reference from Trinodb Kafka SSL implementation: trinodb/trino#6929

Motivation and Context

Impact

Test Plan

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

Kafka Connector Changes 
* Add support for optional Apache Kafka SASL. 

@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Mar 26, 2025
@ShahimSharafudeen ShahimSharafudeen force-pushed the kafka_sasl_config branch 2 times, most recently from 9bc1fd1 to 9cc5a3a Compare March 28, 2025 12:39
@ShahimSharafudeen ShahimSharafudeen marked this pull request as ready for review April 7, 2025 09:32
@ShahimSharafudeen ShahimSharafudeen requested a review from a team as a code owner April 7, 2025 09:32
@prestodb-ci prestodb-ci requested review from a team, aaneja and imjalpreet and removed request for a team April 7, 2025 09:32
Copy link
Contributor

@aaneja aaneja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Can you attribute trinodb/trino#6929 in your commit message ? See 'backport commits' section from the CONTRIBUTING doc
  • Can you update the kafka.rst file with the new SASL properties

@ShahimSharafudeen
Copy link
Contributor Author

@aaneja - Addressed the review comments.

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the doc! Looks good. Some minor formatting suggestions, and one question for you to consider.

@ShahimSharafudeen
Copy link
Contributor Author

Thanks for the doc! Looks good. Some minor formatting suggestions, and one question for you to consider.

Thanks @steveburnett for reviewing the documentation-related changes. I’ve addressed all your review comments.

Copy link
Contributor

@aaneja aaneja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a product test that uses these new configs ? We currently have a singlenode-kafka test, lets make a new one called singlenode-kafka-sasl

public class SaslKafkaConsumerManager
implements KafkaConsumerManager
{
private final ImmutableMap<String, Object> map;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change type to Map<String, Object>

public class SaslKafkaProducerFactory
implements KafkaProducerFactory
{
private final ImmutableMap<String, Object> map;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this to kafkaClientConfigProperties, and make it a Map<String,Object>

Properties properties = new Properties();
properties.putAll(delegate.configure(bootstrapServers));
properties.putAll(map);
log.info("Loading SASL Configs for Kafka...");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the log - it does not add any information

public SaslKafkaProducerFactory(@ForKafkaSasl KafkaProducerFactory delegate, KafkaSaslConfig saslConfig)
{
this.delegate = requireNonNull(delegate, "delegate is null");
map = ImmutableMap.copyOf(saslConfig.getKafkaClientProperties());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just store a reference to the saslConfig and use it in configure when needed ?

.build();
return new KafkaProducer<>(propertiesWithBootstrapServers, new ByteArraySerializer(), new ByteArraySerializer());
.collect(joining(",")));
log.info("Loading Default Configs for Kafka...");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the log, it does not add any value

.put(ACKS_CONFIG, "all")
.put(LINGER_MS_CONFIG, 5)
.build();
configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating two maps, make a single Map<String, Object> called configurationProperties and store the ACKS_CONFIG and LINGER_MS_CONFIG in them

{
this.properties = ImmutableMap.<String, Object>builder()
this.map = ImmutableMap.<String, Object>builder()
.put(ACKS_CONFIG, "all")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these non-default values used, and why are they not configurable ?

public SaslKafkaConsumerManager(@ForKafkaSasl KafkaConsumerManager delegate, KafkaSaslConfig saslConfig)
{
this.delegate = requireNonNull(delegate, "delegate is null");
map = ImmutableMap.copyOf(saslConfig.getKafkaClientProperties());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map is redundant, just use properties.putAll(saslConfig.getKafkaClientProperties()) on L50

properties.put(GROUP_ID_CONFIG, threadName);
properties.put(MAX_POLL_RECORDS_CONFIG, Integer.toString(maxPollRecords));
properties.put(MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
properties.put(CLIENT_ID_CONFIG, String.format("%s-%s", threadName, hostAddress.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toString() is redundant

return this;
}

public Map<String, Object> getKafkaClientProperties()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this to getKafkaSaslProperties()

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick update! I found a few more nits and suggestions, again nothing major.

@ShahimSharafudeen
Copy link
Contributor Author

@aaneja - I have addressed all your comments regarding the Java-related changes.

@ShahimSharafudeen
Copy link
Contributor Author

Thanks for the quick update! I found a few more nits and suggestions, again nothing major.

Thanks again, @steveburnett, for your suggestions. I have addressed all your comments regarding the doc-related changes.

steveburnett
steveburnett previously approved these changes Apr 25, 2025
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull updated branch, new local doc build, looks good. Thanks!

aaneja
aaneja previously approved these changes Apr 27, 2025
final Properties properties = new Properties();
properties.put(BOOTSTRAP_SERVERS_CONFIG, hostAddress.toString());
properties.put(GROUP_ID_CONFIG, threadName);
properties.put(MAX_POLL_RECORDS_CONFIG, Integer.toString(maxPollRecords));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Integer.toString needed ? From a quick glance at ConsumerConfig, it looks like we could use the integer directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aaneja - We can pass the value either as an int or a String, but to avoid any issues during runtime, it's better to stick with the existing code pattern used in the initial implementation of the KafkaConsumerManager class, which passes it as a String. Let me know if you’d prefer using an Integer instead — happy to update it.

aaneja
aaneja previously approved these changes Apr 28, 2025
@aaneja
Copy link
Contributor

aaneja commented Apr 28, 2025

Please create an issue to implement the singlenode-kafka-sasl product test. This can be punted to later

@ShahimSharafudeen
Copy link
Contributor Author

Please create an issue to implement the singlenode-kafka-sasl product test. This can be punted to later

Git issue : #24993

steveburnett
steveburnett previously approved these changes Apr 28, 2025
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull updated branch, new local doc build, looks good.

Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor things, overall this is good

Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also rebase this PR on the latest master branch? I noticed the git log doesn't include anything from April

@ShahimSharafudeen
Copy link
Contributor Author

ShahimSharafudeen commented Apr 30, 2025

Can you also rebase this PR on the latest master branch? I noticed the git log doesn't include anything from April

Rebased the PR with the latest commits from the master branch and addressed all your comments.

Cherry-pick of trinodb/trino#6929

Co-authored-by: Kenzyme Le <kl@kenzymele.com>
Co-authored-by: Matthias Strobl <matthias.strobl@bmw.de>
@steveburnett
Copy link
Contributor

Thanks for the release note! Minor nit suggestions:

== RELEASE NOTES ==

Kafka Connector Changes
* Add support for optional Apache Kafka SASL. 

@ShahimSharafudeen
Copy link
Contributor Author

Thanks for the release note! Minor nit suggestions:

== RELEASE NOTES ==

Kafka Connector Changes
* Add support for optional Apache Kafka SASL. 

Done

@ethanyzhang ethanyzhang merged commit 60bbd2e into prestodb:master May 1, 2025
99 checks passed
@ZacBlanco ZacBlanco mentioned this pull request May 29, 2025
21 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
from:IBM PR from IBM
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants