-
Notifications
You must be signed in to change notification settings - Fork 0
Feat/kafka setup #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/kafka setup #19
Changes from all commits
785481c
765c4b2
63c740e
1ad8eb5
0ee784c
c5201e2
0fb72b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package org.careerseekers.csmailservice.config | ||
|
||
import org.apache.kafka.clients.admin.NewTopic | ||
import org.careerseekers.csmailservice.enums.KafkaTopics | ||
import org.springframework.context.annotation.Bean | ||
import org.springframework.context.annotation.Configuration | ||
import org.springframework.kafka.config.TopicBuilder | ||
|
||
@Configuration | ||
class KafkaConfig { | ||
|
||
@Bean | ||
fun emailSendingTasksTopic(): NewTopic { | ||
return TopicBuilder | ||
.name(KafkaTopics.EMAIL_SENDING_TASKS.name) | ||
.partitions(12) | ||
.replicas(3) | ||
.build() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package org.careerseekers.csmailservice.config.kafka.consumers | ||
|
||
import org.apache.kafka.clients.consumer.ConsumerConfig | ||
import org.apache.kafka.common.serialization.StringDeserializer | ||
import org.careerseekers.csmailservice.dto.EmailSendingTaskDto | ||
import org.careerseekers.csmailservice.serializers.PolymorphicKafkaSerializer | ||
import org.springframework.beans.factory.annotation.Value | ||
import org.springframework.context.annotation.Bean | ||
import org.springframework.context.annotation.Configuration | ||
import org.springframework.kafka.annotation.EnableKafka | ||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory | ||
import org.springframework.kafka.core.ConsumerFactory | ||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory | ||
import org.springframework.kafka.listener.ContainerProperties | ||
|
||
@Configuration | ||
@EnableKafka | ||
class EmailSendingConsumerConfig { | ||
@Value("\${spring.kafka.bootstrap-servers}") | ||
private lateinit var kafkaUrl: String | ||
|
||
@Bean | ||
fun emailSendingConsumerFactory(): ConsumerFactory<String, EmailSendingTaskDto> { | ||
val configProps = mapOf( | ||
/** | ||
* Kafka cluster connection settings | ||
* Connecting to Kafka and serializing keys and values | ||
*/ | ||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaUrl, | ||
ConsumerConfig.GROUP_ID_CONFIG to "email_sending_tasks_consumer", | ||
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, | ||
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to PolymorphicKafkaSerializer::class.java, | ||
|
||
/** | ||
* Auto commit configuration | ||
*/ | ||
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, | ||
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to 1000, | ||
|
||
/** | ||
* Session and heartbeat settings | ||
*/ | ||
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to 20000, | ||
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 1000, | ||
|
||
/** | ||
* Fetch settings | ||
*/ | ||
ConsumerConfig.FETCH_MIN_BYTES_CONFIG to 1, | ||
ConsumerConfig.FETCH_MAX_BYTES_CONFIG to 52428800, // 50MB | ||
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to 1048576, // 1MB | ||
|
||
/** | ||
* Offset reset policy | ||
*/ | ||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", | ||
|
||
/** | ||
* Max poll records | ||
*/ | ||
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 500, | ||
|
||
/** | ||
* Isolation level for transactions | ||
*/ | ||
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed" | ||
) | ||
|
||
|
||
return DefaultKafkaConsumerFactory(configProps) | ||
} | ||
|
||
@Bean | ||
fun kafkaListenerContainerFactory( | ||
consumerFactory: ConsumerFactory<String, EmailSendingTaskDto> | ||
): ConcurrentKafkaListenerContainerFactory<String, EmailSendingTaskDto> { | ||
val factory = ConcurrentKafkaListenerContainerFactory<String, EmailSendingTaskDto>() | ||
|
||
factory.consumerFactory = consumerFactory | ||
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE | ||
|
||
return factory | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package org.careerseekers.csmailservice.dto | ||
|
||
import kotlinx.serialization.Polymorphic | ||
import kotlinx.serialization.SerialName | ||
import kotlinx.serialization.Serializable | ||
import org.careerseekers.csmailservice.enums.MailEventTypes | ||
|
||
@Serializable | ||
@Polymorphic | ||
sealed class KafkaMessagesDto : DtoClass | ||
|
||
@Serializable | ||
@SerialName("email_sending_task") | ||
class EmailSendingTaskDto( | ||
val token: String, | ||
val eventType: MailEventTypes, | ||
) : KafkaMessagesDto() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package org.careerseekers.csmailservice.enums | ||
|
||
enum class KafkaTopics { | ||
EMAIL_SENDING_TASKS | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package org.careerseekers.csmailservice.enums | ||
|
||
enum class MailEventTypes { | ||
REGISTRATION, | ||
PASSWORD_RESET | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package org.careerseekers.csmailservice.security | ||
|
||
import org.springframework.context.annotation.Bean | ||
import org.springframework.context.annotation.Configuration | ||
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder | ||
import org.springframework.security.crypto.password.PasswordEncoder | ||
|
||
@Configuration | ||
class SecurityConfig { | ||
|
||
@Bean | ||
fun passwordEncoder(): PasswordEncoder { | ||
return BCryptPasswordEncoder() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package org.careerseekers.csmailservice.serializers | ||
|
||
import kotlinx.serialization.KSerializer | ||
import org.apache.kafka.common.serialization.Deserializer | ||
import org.apache.kafka.common.serialization.Serializer | ||
import org.careerseekers.csmailservice.dto.KafkaMessagesDto | ||
import org.springframework.stereotype.Component | ||
import kotlin.collections.isEmpty | ||
import kotlin.collections.toString | ||
import kotlin.text.toByteArray | ||
|
||
@Suppress("UNCHECKED_CAST") | ||
@Component | ||
class PolymorphicKafkaSerializer<Base : KafkaMessagesDto> : Serializer<Base>, Deserializer<Base> { | ||
|
||
private val json = CustomSerializerModule.json | ||
private val baseSerializer = KafkaMessagesDto.serializer() as KSerializer<Base> | ||
|
||
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {} | ||
|
||
override fun close() {} | ||
|
||
override fun deserialize(topic: String, data: ByteArray?): Base? { | ||
if (data == null || data.isEmpty()) return null | ||
return json.decodeFromString(baseSerializer, data.toString(Charsets.UTF_8)) | ||
} | ||
|
||
override fun serialize(topic: String, data: Base?): ByteArray? { | ||
if (data == null) return null | ||
return json.encodeToString(baseSerializer, data).toByteArray(Charsets.UTF_8) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,8 @@ | ||||||
package org.careerseekers.csmailservice.services.kafka.consumers | ||||||
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord | ||||||
import org.springframework.kafka.support.Acknowledgment | ||||||
|
||||||
interface CustomKafkaConsumer<T, K> { | ||||||
fun receiveMessage(consumerRecord: ConsumerRecord<T, K>, acknowledgment: Acknowledgment): Any | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The return type 'Any' is too generic and provides no meaningful contract. Consider using 'Unit' if no return value is needed, or define a specific return type that represents the operation result.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,25 @@ | ||||||
package org.careerseekers.csmailservice.services.kafka.consumers | ||||||
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord | ||||||
import org.careerseekers.csmailservice.dto.EmailSendingTaskDto | ||||||
import org.springframework.kafka.annotation.KafkaListener | ||||||
import org.springframework.kafka.support.Acknowledgment | ||||||
import org.springframework.stereotype.Service | ||||||
|
||||||
@Service | ||||||
class KafkaEmailSendingConsumer : CustomKafkaConsumer<String, EmailSendingTaskDto> { | ||||||
|
||||||
@KafkaListener( | ||||||
topics = ["EMAIL_SENDING_TASKS"], | ||||||
groupId = "email_sending_tasks_consumer" | ||||||
) | ||||||
override fun receiveMessage( | ||||||
consumerRecord: ConsumerRecord<String, EmailSendingTaskDto>, | ||||||
acknowledgment: Acknowledgment | ||||||
) { | ||||||
println(consumerRecord.value()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using println for logging in production code is not recommended. Consider using a proper logging framework like SLF4J with a logger instance.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
|
||||||
acknowledgment.acknowledge() | ||||||
} | ||||||
|
||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,18 @@ spring: | |
username: ${MAIL_USERNAME} | ||
password: ${MAIL_PASSWORD} | ||
debug: ${MAIL_DEBUG} | ||
kafka: | ||
bootstrap-servers: ${KAFKA1_BOOTSTRAP_SERVICE_URL},${KAFKA2_BOOTSTRAP_SERVICE_URL},${KAFKA3_BOOTSTRAP_SERVICE_URL} | ||
consumer: | ||
properties: | ||
session: | ||
timeout: | ||
ms: 60000 | ||
reconnect: | ||
backoff: | ||
ms: 1000 | ||
max: | ||
ms: 20000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The session timeout configuration is inconsistent between YAML (60000ms) and Java config (20000ms in EmailSendingConsumerConfig.kt line 43). This could lead to unexpected behavior as the Java configuration will override the YAML settings. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
|
||
server: | ||
port: ${SERVER_INTERNAL_PORT} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These explicit imports from kotlin.collections and kotlin.text are unnecessary as these functions are available by default. Remove these imports to clean up the code.
Copilot uses AI. Check for mistakes.