From 785481c95efd1874fcb7abb9cc3455a878a6f3bd Mon Sep 17 00:00:00 2001
From: Vladimir Fokin <115186975+scobca@users.noreply.github.com>
Date: Fri, 15 Aug 2025 11:28:15 +0300
Subject: [PATCH 1/7] feat(security): created security config with password
encoder bean
---
.../csmailservice/security/SecurityConfig.kt | 15 +++++++++++++++
1 file changed, 15 insertions(+)
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/security/SecurityConfig.kt
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/security/SecurityConfig.kt b/src/main/kotlin/org/careerseekers/csmailservice/security/SecurityConfig.kt
new file mode 100644
index 0000000..09a8398
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/security/SecurityConfig.kt
@@ -0,0 +1,15 @@
+package org.careerseekers.csmailservice.security
+
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder
+import org.springframework.security.crypto.password.PasswordEncoder
+
+@Configuration
+class SecurityConfig {
+
+ @Bean
+ fun passwordEncoder(): PasswordEncoder {
+ return BCryptPasswordEncoder()
+ }
+}
\ No newline at end of file
From 765c4b2a2d7a902bff70068cceaa4b73e59457e7 Mon Sep 17 00:00:00 2001
From: Vladimir Fokin <115186975+scobca@users.noreply.github.com>
Date: Fri, 15 Aug 2025 11:29:13 +0300
Subject: [PATCH 2/7] feat(kafka): created kafka enums
---
.../org/careerseekers/csmailservice/enums/KafkaTopics.kt | 5 +++++
.../org/careerseekers/csmailservice/enums/MailEventTypes.kt | 6 ++++++
2 files changed, 11 insertions(+)
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/enums/KafkaTopics.kt
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/enums/MailEventTypes.kt
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/enums/KafkaTopics.kt b/src/main/kotlin/org/careerseekers/csmailservice/enums/KafkaTopics.kt
new file mode 100644
index 0000000..b2263bb
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/enums/KafkaTopics.kt
@@ -0,0 +1,5 @@
+package org.careerseekers.csmailservice.enums
+
+enum class KafkaTopics {
+ EMAIL_SENDING_TASKS
+}
\ No newline at end of file
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/enums/MailEventTypes.kt b/src/main/kotlin/org/careerseekers/csmailservice/enums/MailEventTypes.kt
new file mode 100644
index 0000000..3089809
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/enums/MailEventTypes.kt
@@ -0,0 +1,6 @@
+package org.careerseekers.csmailservice.enums
+
+enum class MailEventTypes {
+ REGISTRATION,
+ PASSWORD_RESET
+}
\ No newline at end of file
From 63c740e2d771b574c2c2e9b553cf8f6dac104d63 Mon Sep 17 00:00:00 2001
From: Vladimir Fokin <115186975+scobca@users.noreply.github.com>
Date: Fri, 15 Aug 2025 11:30:52 +0300
Subject: [PATCH 3/7] build(core): added kafka properties to application.yaml
---
src/main/resources/application.yaml | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml
index 2b4a85e..19c1129 100644
--- a/src/main/resources/application.yaml
+++ b/src/main/resources/application.yaml
@@ -11,6 +11,18 @@ spring:
username: ${MAIL_USERNAME}
password: ${MAIL_PASSWORD}
debug: ${MAIL_DEBUG}
+ kafka:
+ bootstrap-servers: ${KAFKA1_BOOTSTRAP_SERVICE_URL},${KAFKA2_BOOTSTRAP_SERVICE_URL},${KAFKA3_BOOTSTRAP_SERVICE_URL}
+ consumer:
+ properties:
+ session:
+ timeout:
+ ms: 60000
+ reconnect:
+ backoff:
+ ms: 1000
+ max:
+ ms: 20000
server:
port: ${SERVER_INTERNAL_PORT}
From 1ad8eb51366fd6a839c36ff6d7c6e0a3b8909de1 Mon Sep 17 00:00:00 2001
From: Vladimir Fokin <115186975+scobca@users.noreply.github.com>
Date: Fri, 15 Aug 2025 11:34:20 +0300
Subject: [PATCH 4/7] build(kafka): created KafkaConfig.kt
---
.../csmailservice/config/KafkaConfig.kt | 20 +++++++++++++++++++
1 file changed, 20 insertions(+)
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/config/KafkaConfig.kt
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/config/KafkaConfig.kt b/src/main/kotlin/org/careerseekers/csmailservice/config/KafkaConfig.kt
new file mode 100644
index 0000000..ba11fea
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/config/KafkaConfig.kt
@@ -0,0 +1,20 @@
+package org.careerseekers.csmailservice.config
+
+import org.apache.kafka.clients.admin.NewTopic
+import org.careerseekers.csmailservice.enums.KafkaTopics
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.config.TopicBuilder
+
+@Configuration
+class KafkaConfig {
+
+ @Bean
+ fun emailSendingTasksTopic(): NewTopic {
+ return TopicBuilder
+ .name(KafkaTopics.EMAIL_SENDING_TASKS.name)
+ .partitions(12)
+ .replicas(3)
+ .build()
+ }
+}
\ No newline at end of file
From 0ee784caa87a4161bdbcac27a5f27ced3bedbb13 Mon Sep 17 00:00:00 2001
From: Vladimir Fokin <115186975+scobca@users.noreply.github.com>
Date: Fri, 15 Aug 2025 11:35:39 +0300
Subject: [PATCH 5/7] feat(serialization): created custom kafka serializer
---
.../csmailservice/dto/KafkaMessagesDto.kt | 17 ++++++++++
.../serializers/CustomSerializerModule.kt | 5 +++
.../serializers/PolymorphicKafkaSerializer.kt | 32 +++++++++++++++++++
3 files changed, 54 insertions(+)
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/dto/KafkaMessagesDto.kt
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/serializers/PolymorphicKafkaSerializer.kt
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/dto/KafkaMessagesDto.kt b/src/main/kotlin/org/careerseekers/csmailservice/dto/KafkaMessagesDto.kt
new file mode 100644
index 0000000..2ab3f8f
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/dto/KafkaMessagesDto.kt
@@ -0,0 +1,17 @@
+package org.careerseekers.csmailservice.dto
+
+import kotlinx.serialization.Polymorphic
+import kotlinx.serialization.SerialName
+import kotlinx.serialization.Serializable
+import org.careerseekers.csmailservice.enums.MailEventTypes
+
+@Serializable
+@Polymorphic
+sealed class KafkaMessagesDto : DtoClass
+
+@Serializable
+@SerialName("email_sending_task")
+class EmailSendingTaskDto(
+ val token: String,
+ val eventType: MailEventTypes,
+) : KafkaMessagesDto()
\ No newline at end of file
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/serializers/CustomSerializerModule.kt b/src/main/kotlin/org/careerseekers/csmailservice/serializers/CustomSerializerModule.kt
index d6a75ff..4014741 100644
--- a/src/main/kotlin/org/careerseekers/csmailservice/serializers/CustomSerializerModule.kt
+++ b/src/main/kotlin/org/careerseekers/csmailservice/serializers/CustomSerializerModule.kt
@@ -4,6 +4,8 @@ import kotlinx.serialization.json.Json
import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.modules.polymorphic
import org.careerseekers.csmailservice.dto.CachesDto
+import org.careerseekers.csmailservice.dto.EmailSendingTaskDto
+import org.careerseekers.csmailservice.dto.KafkaMessagesDto
import org.careerseekers.csmailservice.dto.UsersCacheDto
object CustomSerializerModule {
@@ -11,6 +13,9 @@ object CustomSerializerModule {
polymorphic(CachesDto::class) {
subclass(UsersCacheDto::class, UsersCacheDto.serializer())
}
+ polymorphic(KafkaMessagesDto::class) {
+ subclass(EmailSendingTaskDto::class, EmailSendingTaskDto.serializer())
+ }
}
val json = Json {
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/serializers/PolymorphicKafkaSerializer.kt b/src/main/kotlin/org/careerseekers/csmailservice/serializers/PolymorphicKafkaSerializer.kt
new file mode 100644
index 0000000..ba3dc7e
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/serializers/PolymorphicKafkaSerializer.kt
@@ -0,0 +1,32 @@
+package org.careerseekers.csmailservice.serializers
+
+import kotlinx.serialization.KSerializer
+import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.Serializer
+import org.careerseekers.csmailservice.dto.KafkaMessagesDto
+import org.springframework.stereotype.Component
+import kotlin.collections.isEmpty
+import kotlin.collections.toString
+import kotlin.text.toByteArray
+
+@Suppress("UNCHECKED_CAST")
+@Component
+class PolymorphicKafkaSerializer : Serializer, Deserializer {
+
+ private val json = CustomSerializerModule.json
+ private val baseSerializer = KafkaMessagesDto.serializer() as KSerializer
+
+ override fun configure(configs: MutableMap?, isKey: Boolean) {}
+
+ override fun close() {}
+
+ override fun deserialize(topic: String, data: ByteArray?): Base? {
+ if (data == null || data.isEmpty()) return null
+ return json.decodeFromString(baseSerializer, data.toString(Charsets.UTF_8))
+ }
+
+ override fun serialize(topic: String, data: Base?): ByteArray? {
+ if (data == null) return null
+ return json.encodeToString(baseSerializer, data).toByteArray(Charsets.UTF_8)
+ }
+}
From c5201e2b0dbbd574353798e8a371a461fd57f97d Mon Sep 17 00:00:00 2001
From: Vladimir Fokin <115186975+scobca@users.noreply.github.com>
Date: Fri, 15 Aug 2025 13:38:38 +0300
Subject: [PATCH 6/7] feat(kafka): created email tasks consumer factory
---
.../consumers/EmailSendingConsumerConfig.kt | 84 +++++++++++++++++++
1 file changed, 84 insertions(+)
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/config/kafka/consumers/EmailSendingConsumerConfig.kt
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/config/kafka/consumers/EmailSendingConsumerConfig.kt b/src/main/kotlin/org/careerseekers/csmailservice/config/kafka/consumers/EmailSendingConsumerConfig.kt
new file mode 100644
index 0000000..3360ec5
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/config/kafka/consumers/EmailSendingConsumerConfig.kt
@@ -0,0 +1,84 @@
+package org.careerseekers.csmailservice.config.kafka.consumers
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.careerseekers.csmailservice.dto.EmailSendingTaskDto
+import org.careerseekers.csmailservice.serializers.PolymorphicKafkaSerializer
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.listener.ContainerProperties
+
+@Configuration
+@EnableKafka
+class EmailSendingConsumerConfig {
+ @Value("\${spring.kafka.bootstrap-servers}")
+ private lateinit var kafkaUrl: String
+
+ @Bean
+ fun emailSendingConsumerFactory(): ConsumerFactory {
+ val configProps = mapOf(
+ /**
+ * Kafka cluster connection settings
+ * Connecting to Kafka and serializing keys and values
+ */
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaUrl,
+ ConsumerConfig.GROUP_ID_CONFIG to "email_sending_tasks_consumer",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to PolymorphicKafkaSerializer::class.java,
+
+ /**
+ * Auto commit configuration
+ */
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
+ ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to 1000,
+
+ /**
+ * Session and heartbeat settings
+ */
+ ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to 20000,
+ ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 1000,
+
+ /**
+ * Fetch settings
+ */
+ ConsumerConfig.FETCH_MIN_BYTES_CONFIG to 1,
+ ConsumerConfig.FETCH_MAX_BYTES_CONFIG to 52428800, // 50MB
+ ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to 1048576, // 1MB
+
+ /**
+ * Offset reset policy
+ */
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
+
+ /**
+ * Max poll records
+ */
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 500,
+
+ /**
+ * Isolation level for transactions
+ */
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed"
+ )
+
+
+ return DefaultKafkaConsumerFactory(configProps)
+ }
+
+ @Bean
+ fun kafkaListenerContainerFactory(
+ consumerFactory: ConsumerFactory
+ ): ConcurrentKafkaListenerContainerFactory {
+ val factory = ConcurrentKafkaListenerContainerFactory()
+
+ factory.consumerFactory = consumerFactory
+ factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
+
+ return factory
+ }
+}
From 0fb72b7bee43d9786bc4b1951fa9867460aff008 Mon Sep 17 00:00:00 2001
From: Vladimir Fokin <115186975+scobca@users.noreply.github.com>
Date: Fri, 15 Aug 2025 13:46:50 +0300
Subject: [PATCH 7/7] feat(kafka): created email tasks consumer
Signed-off-by: Vladimir Fokin <115186975+scobca@users.noreply.github.com>
---
.../kafka/consumers/CustomKafkaConsumer.kt | 8 ++++++
.../consumers/KafkaEmailSendingConsumer.kt | 25 +++++++++++++++++++
2 files changed, 33 insertions(+)
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/CustomKafkaConsumer.kt
create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/KafkaEmailSendingConsumer.kt
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/CustomKafkaConsumer.kt b/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/CustomKafkaConsumer.kt
new file mode 100644
index 0000000..a9fbb1a
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/CustomKafkaConsumer.kt
@@ -0,0 +1,8 @@
+package org.careerseekers.csmailservice.services.kafka.consumers
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.springframework.kafka.support.Acknowledgment
+
+interface CustomKafkaConsumer {
+ fun receiveMessage(consumerRecord: ConsumerRecord, acknowledgment: Acknowledgment): Any
+}
\ No newline at end of file
diff --git a/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/KafkaEmailSendingConsumer.kt b/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/KafkaEmailSendingConsumer.kt
new file mode 100644
index 0000000..4253fd5
--- /dev/null
+++ b/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/KafkaEmailSendingConsumer.kt
@@ -0,0 +1,25 @@
+package org.careerseekers.csmailservice.services.kafka.consumers
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.careerseekers.csmailservice.dto.EmailSendingTaskDto
+import org.springframework.kafka.annotation.KafkaListener
+import org.springframework.kafka.support.Acknowledgment
+import org.springframework.stereotype.Service
+
+@Service
+class KafkaEmailSendingConsumer : CustomKafkaConsumer {
+
+ @KafkaListener(
+ topics = ["EMAIL_SENDING_TASKS"],
+ groupId = "email_sending_tasks_consumer"
+ )
+ override fun receiveMessage(
+ consumerRecord: ConsumerRecord,
+ acknowledgment: Acknowledgment
+ ) {
+ println(consumerRecord.value())
+
+ acknowledgment.acknowledge()
+ }
+
+}
\ No newline at end of file