From 785481c95efd1874fcb7abb9cc3455a878a6f3bd Mon Sep 17 00:00:00 2001 From: Vladimir Fokin <115186975+scobca@users.noreply.github.com> Date: Fri, 15 Aug 2025 11:28:15 +0300 Subject: [PATCH 1/7] feat(security): created security config with password encoder bean --- .../csmailservice/security/SecurityConfig.kt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/security/SecurityConfig.kt diff --git a/src/main/kotlin/org/careerseekers/csmailservice/security/SecurityConfig.kt b/src/main/kotlin/org/careerseekers/csmailservice/security/SecurityConfig.kt new file mode 100644 index 0000000..09a8398 --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/security/SecurityConfig.kt @@ -0,0 +1,15 @@ +package org.careerseekers.csmailservice.security + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder +import org.springframework.security.crypto.password.PasswordEncoder + +@Configuration +class SecurityConfig { + + @Bean + fun passwordEncoder(): PasswordEncoder { + return BCryptPasswordEncoder() + } +} \ No newline at end of file From 765c4b2a2d7a902bff70068cceaa4b73e59457e7 Mon Sep 17 00:00:00 2001 From: Vladimir Fokin <115186975+scobca@users.noreply.github.com> Date: Fri, 15 Aug 2025 11:29:13 +0300 Subject: [PATCH 2/7] feat(kafka): created kafka enums --- .../org/careerseekers/csmailservice/enums/KafkaTopics.kt | 5 +++++ .../org/careerseekers/csmailservice/enums/MailEventTypes.kt | 6 ++++++ 2 files changed, 11 insertions(+) create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/enums/KafkaTopics.kt create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/enums/MailEventTypes.kt diff --git a/src/main/kotlin/org/careerseekers/csmailservice/enums/KafkaTopics.kt b/src/main/kotlin/org/careerseekers/csmailservice/enums/KafkaTopics.kt new file mode 100644 index 0000000..b2263bb --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/enums/KafkaTopics.kt @@ -0,0 +1,5 @@ +package org.careerseekers.csmailservice.enums + +enum class KafkaTopics { + EMAIL_SENDING_TASKS +} \ No newline at end of file diff --git a/src/main/kotlin/org/careerseekers/csmailservice/enums/MailEventTypes.kt b/src/main/kotlin/org/careerseekers/csmailservice/enums/MailEventTypes.kt new file mode 100644 index 0000000..3089809 --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/enums/MailEventTypes.kt @@ -0,0 +1,6 @@ +package org.careerseekers.csmailservice.enums + +enum class MailEventTypes { + REGISTRATION, + PASSWORD_RESET +} \ No newline at end of file From 63c740e2d771b574c2c2e9b553cf8f6dac104d63 Mon Sep 17 00:00:00 2001 From: Vladimir Fokin <115186975+scobca@users.noreply.github.com> Date: Fri, 15 Aug 2025 11:30:52 +0300 Subject: [PATCH 3/7] build(core): added kafka properties to application.yaml --- src/main/resources/application.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 2b4a85e..19c1129 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -11,6 +11,18 @@ spring: username: ${MAIL_USERNAME} password: ${MAIL_PASSWORD} debug: ${MAIL_DEBUG} + kafka: + bootstrap-servers: ${KAFKA1_BOOTSTRAP_SERVICE_URL},${KAFKA2_BOOTSTRAP_SERVICE_URL},${KAFKA3_BOOTSTRAP_SERVICE_URL} + consumer: + properties: + session: + timeout: + ms: 60000 + reconnect: + backoff: + ms: 1000 + max: + ms: 20000 server: port: ${SERVER_INTERNAL_PORT} From 1ad8eb51366fd6a839c36ff6d7c6e0a3b8909de1 Mon Sep 17 00:00:00 2001 From: Vladimir Fokin <115186975+scobca@users.noreply.github.com> Date: Fri, 15 Aug 2025 11:34:20 +0300 Subject: [PATCH 4/7] build(kafka): created KafkaConfig.kt --- .../csmailservice/config/KafkaConfig.kt | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/config/KafkaConfig.kt diff --git a/src/main/kotlin/org/careerseekers/csmailservice/config/KafkaConfig.kt b/src/main/kotlin/org/careerseekers/csmailservice/config/KafkaConfig.kt new file mode 100644 index 0000000..ba11fea --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/config/KafkaConfig.kt @@ -0,0 +1,20 @@ +package org.careerseekers.csmailservice.config + +import org.apache.kafka.clients.admin.NewTopic +import org.careerseekers.csmailservice.enums.KafkaTopics +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.config.TopicBuilder + +@Configuration +class KafkaConfig { + + @Bean + fun emailSendingTasksTopic(): NewTopic { + return TopicBuilder + .name(KafkaTopics.EMAIL_SENDING_TASKS.name) + .partitions(12) + .replicas(3) + .build() + } +} \ No newline at end of file From 0ee784caa87a4161bdbcac27a5f27ced3bedbb13 Mon Sep 17 00:00:00 2001 From: Vladimir Fokin <115186975+scobca@users.noreply.github.com> Date: Fri, 15 Aug 2025 11:35:39 +0300 Subject: [PATCH 5/7] feat(serialization): created custom kafka serializer --- .../csmailservice/dto/KafkaMessagesDto.kt | 17 ++++++++++ .../serializers/CustomSerializerModule.kt | 5 +++ .../serializers/PolymorphicKafkaSerializer.kt | 32 +++++++++++++++++++ 3 files changed, 54 insertions(+) create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/dto/KafkaMessagesDto.kt create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/serializers/PolymorphicKafkaSerializer.kt diff --git a/src/main/kotlin/org/careerseekers/csmailservice/dto/KafkaMessagesDto.kt b/src/main/kotlin/org/careerseekers/csmailservice/dto/KafkaMessagesDto.kt new file mode 100644 index 0000000..2ab3f8f --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/dto/KafkaMessagesDto.kt @@ -0,0 +1,17 @@ +package org.careerseekers.csmailservice.dto + +import kotlinx.serialization.Polymorphic +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import org.careerseekers.csmailservice.enums.MailEventTypes + +@Serializable +@Polymorphic +sealed class KafkaMessagesDto : DtoClass + +@Serializable +@SerialName("email_sending_task") +class EmailSendingTaskDto( + val token: String, + val eventType: MailEventTypes, +) : KafkaMessagesDto() \ No newline at end of file diff --git a/src/main/kotlin/org/careerseekers/csmailservice/serializers/CustomSerializerModule.kt b/src/main/kotlin/org/careerseekers/csmailservice/serializers/CustomSerializerModule.kt index d6a75ff..4014741 100644 --- a/src/main/kotlin/org/careerseekers/csmailservice/serializers/CustomSerializerModule.kt +++ b/src/main/kotlin/org/careerseekers/csmailservice/serializers/CustomSerializerModule.kt @@ -4,6 +4,8 @@ import kotlinx.serialization.json.Json import kotlinx.serialization.modules.SerializersModule import kotlinx.serialization.modules.polymorphic import org.careerseekers.csmailservice.dto.CachesDto +import org.careerseekers.csmailservice.dto.EmailSendingTaskDto +import org.careerseekers.csmailservice.dto.KafkaMessagesDto import org.careerseekers.csmailservice.dto.UsersCacheDto object CustomSerializerModule { @@ -11,6 +13,9 @@ object CustomSerializerModule { polymorphic(CachesDto::class) { subclass(UsersCacheDto::class, UsersCacheDto.serializer()) } + polymorphic(KafkaMessagesDto::class) { + subclass(EmailSendingTaskDto::class, EmailSendingTaskDto.serializer()) + } } val json = Json { diff --git a/src/main/kotlin/org/careerseekers/csmailservice/serializers/PolymorphicKafkaSerializer.kt b/src/main/kotlin/org/careerseekers/csmailservice/serializers/PolymorphicKafkaSerializer.kt new file mode 100644 index 0000000..ba3dc7e --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/serializers/PolymorphicKafkaSerializer.kt @@ -0,0 +1,32 @@ +package org.careerseekers.csmailservice.serializers + +import kotlinx.serialization.KSerializer +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serializer +import org.careerseekers.csmailservice.dto.KafkaMessagesDto +import org.springframework.stereotype.Component +import kotlin.collections.isEmpty +import kotlin.collections.toString +import kotlin.text.toByteArray + +@Suppress("UNCHECKED_CAST") +@Component +class PolymorphicKafkaSerializer : Serializer, Deserializer { + + private val json = CustomSerializerModule.json + private val baseSerializer = KafkaMessagesDto.serializer() as KSerializer + + override fun configure(configs: MutableMap?, isKey: Boolean) {} + + override fun close() {} + + override fun deserialize(topic: String, data: ByteArray?): Base? { + if (data == null || data.isEmpty()) return null + return json.decodeFromString(baseSerializer, data.toString(Charsets.UTF_8)) + } + + override fun serialize(topic: String, data: Base?): ByteArray? { + if (data == null) return null + return json.encodeToString(baseSerializer, data).toByteArray(Charsets.UTF_8) + } +} From c5201e2b0dbbd574353798e8a371a461fd57f97d Mon Sep 17 00:00:00 2001 From: Vladimir Fokin <115186975+scobca@users.noreply.github.com> Date: Fri, 15 Aug 2025 13:38:38 +0300 Subject: [PATCH 6/7] feat(kafka): created email tasks consumer factory --- .../consumers/EmailSendingConsumerConfig.kt | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/config/kafka/consumers/EmailSendingConsumerConfig.kt diff --git a/src/main/kotlin/org/careerseekers/csmailservice/config/kafka/consumers/EmailSendingConsumerConfig.kt b/src/main/kotlin/org/careerseekers/csmailservice/config/kafka/consumers/EmailSendingConsumerConfig.kt new file mode 100644 index 0000000..3360ec5 --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/config/kafka/consumers/EmailSendingConsumerConfig.kt @@ -0,0 +1,84 @@ +package org.careerseekers.csmailservice.config.kafka.consumers + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.careerseekers.csmailservice.dto.EmailSendingTaskDto +import org.careerseekers.csmailservice.serializers.PolymorphicKafkaSerializer +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.listener.ContainerProperties + +@Configuration +@EnableKafka +class EmailSendingConsumerConfig { + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var kafkaUrl: String + + @Bean + fun emailSendingConsumerFactory(): ConsumerFactory { + val configProps = mapOf( + /** + * Kafka cluster connection settings + * Connecting to Kafka and serializing keys and values + */ + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaUrl, + ConsumerConfig.GROUP_ID_CONFIG to "email_sending_tasks_consumer", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to PolymorphicKafkaSerializer::class.java, + + /** + * Auto commit configuration + */ + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to 1000, + + /** + * Session and heartbeat settings + */ + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to 20000, + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 1000, + + /** + * Fetch settings + */ + ConsumerConfig.FETCH_MIN_BYTES_CONFIG to 1, + ConsumerConfig.FETCH_MAX_BYTES_CONFIG to 52428800, // 50MB + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG to 1048576, // 1MB + + /** + * Offset reset policy + */ + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + + /** + * Max poll records + */ + ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 500, + + /** + * Isolation level for transactions + */ + ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed" + ) + + + return DefaultKafkaConsumerFactory(configProps) + } + + @Bean + fun kafkaListenerContainerFactory( + consumerFactory: ConsumerFactory + ): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + + factory.consumerFactory = consumerFactory + factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE + + return factory + } +} From 0fb72b7bee43d9786bc4b1951fa9867460aff008 Mon Sep 17 00:00:00 2001 From: Vladimir Fokin <115186975+scobca@users.noreply.github.com> Date: Fri, 15 Aug 2025 13:46:50 +0300 Subject: [PATCH 7/7] feat(kafka): created email tasks consumer Signed-off-by: Vladimir Fokin <115186975+scobca@users.noreply.github.com> --- .../kafka/consumers/CustomKafkaConsumer.kt | 8 ++++++ .../consumers/KafkaEmailSendingConsumer.kt | 25 +++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/CustomKafkaConsumer.kt create mode 100644 src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/KafkaEmailSendingConsumer.kt diff --git a/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/CustomKafkaConsumer.kt b/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/CustomKafkaConsumer.kt new file mode 100644 index 0000000..a9fbb1a --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/CustomKafkaConsumer.kt @@ -0,0 +1,8 @@ +package org.careerseekers.csmailservice.services.kafka.consumers + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.support.Acknowledgment + +interface CustomKafkaConsumer { + fun receiveMessage(consumerRecord: ConsumerRecord, acknowledgment: Acknowledgment): Any +} \ No newline at end of file diff --git a/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/KafkaEmailSendingConsumer.kt b/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/KafkaEmailSendingConsumer.kt new file mode 100644 index 0000000..4253fd5 --- /dev/null +++ b/src/main/kotlin/org/careerseekers/csmailservice/services/kafka/consumers/KafkaEmailSendingConsumer.kt @@ -0,0 +1,25 @@ +package org.careerseekers.csmailservice.services.kafka.consumers + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.careerseekers.csmailservice.dto.EmailSendingTaskDto +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.support.Acknowledgment +import org.springframework.stereotype.Service + +@Service +class KafkaEmailSendingConsumer : CustomKafkaConsumer { + + @KafkaListener( + topics = ["EMAIL_SENDING_TASKS"], + groupId = "email_sending_tasks_consumer" + ) + override fun receiveMessage( + consumerRecord: ConsumerRecord, + acknowledgment: Acknowledgment + ) { + println(consumerRecord.value()) + + acknowledgment.acknowledge() + } + +} \ No newline at end of file