Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.careerseekers.csmailservice.config

import org.apache.kafka.clients.admin.NewTopic
import org.careerseekers.csmailservice.enums.KafkaTopics
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.TopicBuilder

@Configuration
class KafkaConfig {

@Bean
fun emailSendingTasksTopic(): NewTopic {
return TopicBuilder
.name(KafkaTopics.EMAIL_SENDING_TASKS.name)
.partitions(12)
.replicas(3)
.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.careerseekers.csmailservice.config.kafka.consumers

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.careerseekers.csmailservice.dto.EmailSendingTaskDto
import org.careerseekers.csmailservice.serializers.PolymorphicKafkaSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ContainerProperties

@Configuration
@EnableKafka
class EmailSendingConsumerConfig {
@Value("\${spring.kafka.bootstrap-servers}")
private lateinit var kafkaUrl: String

@Bean
fun emailSendingConsumerFactory(): ConsumerFactory<String, EmailSendingTaskDto> {
val configProps = mapOf(
/**
* Kafka cluster connection settings
* Connecting to Kafka and serializing keys and values
*/
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaUrl,
ConsumerConfig.GROUP_ID_CONFIG to "email_sending_tasks_consumer",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to PolymorphicKafkaSerializer::class.java,

/**
* Auto commit configuration
*/
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to 1000,

/**
* Session and heartbeat settings
*/
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to 20000,
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 1000,

/**
* Fetch settings
*/
ConsumerConfig.FETCH_MIN_BYTES_CONFIG to 1,
ConsumerConfig.FETCH_MAX_BYTES_CONFIG to 52428800, // 50MB
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to 1048576, // 1MB

/**
* Offset reset policy
*/
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",

/**
* Max poll records
*/
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 500,

/**
* Isolation level for transactions
*/
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed"
)


return DefaultKafkaConsumerFactory(configProps)
}

@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<String, EmailSendingTaskDto>
): ConcurrentKafkaListenerContainerFactory<String, EmailSendingTaskDto> {
val factory = ConcurrentKafkaListenerContainerFactory<String, EmailSendingTaskDto>()

factory.consumerFactory = consumerFactory
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE

return factory
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.careerseekers.csmailservice.dto

import kotlinx.serialization.Polymorphic
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import org.careerseekers.csmailservice.enums.MailEventTypes

@Serializable
@Polymorphic
sealed class KafkaMessagesDto : DtoClass

@Serializable
@SerialName("email_sending_task")
class EmailSendingTaskDto(
val token: String,
val eventType: MailEventTypes,
) : KafkaMessagesDto()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.careerseekers.csmailservice.enums

enum class KafkaTopics {
EMAIL_SENDING_TASKS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.careerseekers.csmailservice.enums

enum class MailEventTypes {
REGISTRATION,
PASSWORD_RESET
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.careerseekers.csmailservice.security

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder
import org.springframework.security.crypto.password.PasswordEncoder

@Configuration
class SecurityConfig {

@Bean
fun passwordEncoder(): PasswordEncoder {
return BCryptPasswordEncoder()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import kotlinx.serialization.json.Json
import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.modules.polymorphic
import org.careerseekers.csmailservice.dto.CachesDto
import org.careerseekers.csmailservice.dto.EmailSendingTaskDto
import org.careerseekers.csmailservice.dto.KafkaMessagesDto
import org.careerseekers.csmailservice.dto.UsersCacheDto

object CustomSerializerModule {
val customSerializerModule = SerializersModule {
polymorphic(CachesDto::class) {
subclass(UsersCacheDto::class, UsersCacheDto.serializer())
}
polymorphic(KafkaMessagesDto::class) {
subclass(EmailSendingTaskDto::class, EmailSendingTaskDto.serializer())
}
}

val json = Json {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.careerseekers.csmailservice.serializers

import kotlinx.serialization.KSerializer
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.Serializer
import org.careerseekers.csmailservice.dto.KafkaMessagesDto
import org.springframework.stereotype.Component
import kotlin.collections.isEmpty
import kotlin.collections.toString
import kotlin.text.toByteArray
Copy link
Preview

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These explicit imports from kotlin.collections and kotlin.text are unnecessary as these functions are available by default. Remove these imports to clean up the code.

Suggested change
import kotlin.text.toByteArray

Copilot uses AI. Check for mistakes.


@Suppress("UNCHECKED_CAST")
@Component
class PolymorphicKafkaSerializer<Base : KafkaMessagesDto> : Serializer<Base>, Deserializer<Base> {

private val json = CustomSerializerModule.json
private val baseSerializer = KafkaMessagesDto.serializer() as KSerializer<Base>

override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {}

override fun close() {}

override fun deserialize(topic: String, data: ByteArray?): Base? {
if (data == null || data.isEmpty()) return null
return json.decodeFromString(baseSerializer, data.toString(Charsets.UTF_8))
}

override fun serialize(topic: String, data: Base?): ByteArray? {
if (data == null) return null
return json.encodeToString(baseSerializer, data).toByteArray(Charsets.UTF_8)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.careerseekers.csmailservice.services.kafka.consumers

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.support.Acknowledgment

interface CustomKafkaConsumer<T, K> {
fun receiveMessage(consumerRecord: ConsumerRecord<T, K>, acknowledgment: Acknowledgment): Any
Copy link
Preview

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type 'Any' is too generic and provides no meaningful contract. Consider using 'Unit' if no return value is needed, or define a specific return type that represents the operation result.

Suggested change
fun receiveMessage(consumerRecord: ConsumerRecord<T, K>, acknowledgment: Acknowledgment): Any
fun receiveMessage(consumerRecord: ConsumerRecord<T, K>, acknowledgment: Acknowledgment): Unit

Copilot uses AI. Check for mistakes.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.careerseekers.csmailservice.services.kafka.consumers

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.careerseekers.csmailservice.dto.EmailSendingTaskDto
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Service

@Service
class KafkaEmailSendingConsumer : CustomKafkaConsumer<String, EmailSendingTaskDto> {

@KafkaListener(
topics = ["EMAIL_SENDING_TASKS"],
groupId = "email_sending_tasks_consumer"
)
override fun receiveMessage(
consumerRecord: ConsumerRecord<String, EmailSendingTaskDto>,
acknowledgment: Acknowledgment
) {
println(consumerRecord.value())
Copy link
Preview

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using println for logging in production code is not recommended. Consider using a proper logging framework like SLF4J with a logger instance.

Suggested change
println(consumerRecord.value())
logger.info("Received message: {}", consumerRecord.value())

Copilot uses AI. Check for mistakes.


acknowledgment.acknowledge()
}

}
12 changes: 12 additions & 0 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ spring:
username: ${MAIL_USERNAME}
password: ${MAIL_PASSWORD}
debug: ${MAIL_DEBUG}
kafka:
bootstrap-servers: ${KAFKA1_BOOTSTRAP_SERVICE_URL},${KAFKA2_BOOTSTRAP_SERVICE_URL},${KAFKA3_BOOTSTRAP_SERVICE_URL}
consumer:
properties:
session:
timeout:
ms: 60000
reconnect:
backoff:
ms: 1000
max:
ms: 20000
Copy link
Preview

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session timeout configuration is inconsistent between YAML (60000ms) and Java config (20000ms in EmailSendingConsumerConfig.kt line 43). This could lead to unexpected behavior as the Java configuration will override the YAML settings.

Copilot uses AI. Check for mistakes.


server:
port: ${SERVER_INTERNAL_PORT}
Expand Down
Loading