Skip to content

Commit

Permalink
(recipes): kotlin ktor with kafka implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 16, 2024
1 parent 7fb80be commit 01343c1
Show file tree
Hide file tree
Showing 24 changed files with 382 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ class TestSystemConfig : AbstractProjectConfig() {
private val logger: Logger = LoggerFactory.getLogger("WireMockMonitor")

override suspend fun beforeProject(): Unit =
TestSystem(baseUrl = "http://localhost:8001")
TestSystem(baseUrl = "http://localhost:8001"){
this.enableReuseForTestContainers()
this.keepDependenciesRunning()
}
.with {
httpClient()
couchbase {
Expand Down
1 change: 1 addition & 0 deletions recipes/kotlin-recipes/ktor-recipe/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
implementation(libs.kediatr.koin)
implementation(libs.mongodb.kotlin.coroutine)
implementation(libs.mongodb.bson.kotlin)
implementation(libs.kafkaKotlin)
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package com.trendyol.stove.examples.kotlin.ktor
import com.trendyol.stove.examples.kotlin.ktor.application.RecipeAppConfig
import com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.*
import com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.http.registerHttpClient
import com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka.*
import com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kediatr.registerKediatR
import com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.mongo.configureMongo
import com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.serialization.*
import com.trendyol.stove.examples.kotlin.ktor.infra.components.external.registerCategoryExternalHttpApi
import com.trendyol.stove.examples.kotlin.ktor.infra.components.product.api.productApi
import com.trendyol.stove.examples.kotlin.ktor.infra.components.product.registerProductComponents
import io.github.oshai.kotlinlogging.KotlinLogging
Expand Down Expand Up @@ -40,21 +42,29 @@ fun Application.appModule(
) {
install(Koin) {
allowOverride(true)
modules(module { single { config } })
modules(
module {
single { config }
single { config.externalApis.category }
}
)
registerAppDeps()
registerHttpClient()
registerKafka(config.kafka)
modules(overrides)
}
configureRouting()
configureExceptionHandling()
configureContentNegotiation()
configureConsumerEngine()
}

fun KoinApplication.registerAppDeps() {
configureMongo()
configureJackson()
registerKediatR()
registerProductComponents()
registerCategoryExternalHttpApi()
}

fun Application.configureRouting() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package com.trendyol.stove.examples.kotlin.ktor.application

import com.trendyol.stove.examples.kotlin.ktor.application.external.CategoryApiConfiguration
import com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka.Topic

/**
* Represents the main configuration
*/
data class RecipeAppConfig(
val server: ServerConfig,
val kafka: KafkaConfiguration,
val mongo: MongoConfiguration
val mongo: MongoConfiguration,
val externalApis: ExternalApisConfig
)

data class ExternalApisConfig(
val category: CategoryApiConfiguration
)

/**
Expand Down Expand Up @@ -40,13 +48,5 @@ data class KafkaConfiguration(
val interceptorClasses: List<String>,
val topics: Map<String, Topic>
) {
fun flattenInterceptorClasses(): String {
return interceptorClasses.joinToString(",")
}
fun flattenInterceptorClasses(): String = interceptorClasses.joinToString(",")
}

data class Topic(
val name: String,
val retry: String,
val deadLetter: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ import com.trendyol.stove.recipes.shared.application.category.CategoryApiRespons
interface CategoryHttpApi {
suspend fun getCategory(id: Int): CategoryApiResponse
}

data class CategoryApiConfiguration(
val url: String,
val timeout: Long
)
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.trendyol.stove.examples.kotlin.ktor.application.external

import com.trendyol.stove.recipes.shared.application.category.*
import com.trendyol.stove.recipes.shared.application.category.CategoryApiResponse
import io.ktor.client.*
import io.ktor.client.call.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.http.*
import kotlin.time.Duration.Companion.seconds

class CategoryHttpApiImpl(
private val httpClient: HttpClient,
Expand All @@ -14,6 +16,11 @@ class CategoryHttpApiImpl(
return httpClient
.get("${categoryApiConfiguration.url}/categories/$id") {
accept(ContentType.Application.Json)
timeout {
requestTimeoutMillis = categoryApiConfiguration.timeout.seconds.inWholeMilliseconds
connectTimeoutMillis = categoryApiConfiguration.timeout.seconds.inWholeMilliseconds
socketTimeoutMillis = categoryApiConfiguration.timeout.seconds.inWholeMilliseconds
}
}
.body()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package com.trendyol.stove.examples.kotlin.ktor.application.product.command

import com.trendyol.kediatr.*
import com.trendyol.stove.examples.domain.product.Product
import com.trendyol.stove.examples.kotlin.ktor.application.external.CategoryHttpApi
import com.trendyol.stove.examples.kotlin.ktor.domain.product.ProductRepository
import com.trendyol.stove.recipes.shared.application.BusinessException
import io.github.oshai.kotlinlogging.KotlinLogging

data class CreateProductCommand(
Expand All @@ -11,10 +13,18 @@ data class CreateProductCommand(
val categoryId: Int
) : Command

class ProductCommandHandler(private val productRepository: ProductRepository) : CommandHandler<CreateProductCommand> {
class ProductCommandHandler(
private val productRepository: ProductRepository,
private val categoryHttpApi: CategoryHttpApi
) : CommandHandler<CreateProductCommand> {
private val logger = KotlinLogging.logger { }

override suspend fun handle(command: CreateProductCommand) {
val category = categoryHttpApi.getCategory(command.categoryId)
if (!category.isActive) {
throw BusinessException("Category is not active")
}

productRepository.save(Product.create(command.name, command.price, command.categoryId))
logger.info { "Product saved: $command" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.koin.dsl.module
fun KoinApplication.registerProductCommandHandling() {
modules(
module {
single { ProductCommandHandler(get()) }
single { ProductCommandHandler(get(), get()) }
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ private fun createHttpClient(
install(ContentNegotiation) {
register(ContentType.Application.Json, JacksonConverter(objectMapper))
}
val logger = KotlinLogging.logger("JourneyHttpClient")
val logger = KotlinLogging.logger("StoveHttpClient")
install(HttpTimeout) {}
install(HttpRequestRetry) {
maxRetries = 1
retryOnServerErrors()
retryOnException(retryOnTimeout = true)
exponentialDelay()
modifyRequest { request ->
logger.warn(cause) { "Retrying request: ${request.url}" }
request.headers.append("x-retry-count", retryCount.toString())
request.headers.append("X-Retry-Count", retryCount.toString())
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka

class ConsumerEngine(
private val supervisors: List<ConsumerSupervisor<*, *>>
) {
fun start() {
supervisors.forEach { it.start() }
}

fun stop() {
supervisors.forEach { it.cancel() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka

import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import org.apache.kafka.clients.consumer.ConsumerRecord
import java.time.Duration

abstract class ConsumerSupervisor<K, V>(
private val kafkaReceiver: KafkaReceiver<K, V>,
private val maxConcurrency: Int
) {
private val logger = KotlinLogging.logger("ConsumerSupervisor[${javaClass.simpleName}]")
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val recordChannel = Channel<ConsumerRecord<K, V>>(maxConcurrency)

abstract val topics: List<String>

fun start() {
scope.launch {
logger.info { "Receiving records from topics: $topics" }
subscribe()
}
logger.info { "Consuming records with concurrency: $maxConcurrency" }
}

@Suppress("TooGenericExceptionCaught")
private suspend fun subscribe() {
kafkaReceiver.withConsumer { consumer ->
consumer.subscribe(topics)
while (scope.isActive) {
val records = consumer.poll(Duration.ofMillis(100))
records.forEach { record ->
logger.debug { "Received record: $record" }
try {
consume(record)
consumer.commitAsync()
} catch (e: Exception) {
handleError(e, record)
}
}
}
}
}

abstract suspend fun consume(record: ConsumerRecord<K, V>)

protected open fun handleError(e: Exception, record: ConsumerRecord<K, V>) {
logger.error(e) { "Error while processing record: $record" }
}

fun cancel() {
logger.info { "Cancelling consumer supervisor" }
scope.cancel()
}

/**
* Offers the record to the channel. If the channel is full, it suspends until a space becomes available.
*/
@Suppress("TooGenericExceptionCaught")
private fun consume() = repeat(maxConcurrency) {
scope.launch {
for (record in recordChannel) {
try {
consume(record)
} catch (e: Exception) {
handleError(e, record)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka

import com.fasterxml.jackson.databind.ObjectMapper
import com.trendyol.stove.examples.domain.ddd.*
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.*

class KafkaDomainEventPublisher(
private val publisher: KafkaPublisher<String, Any>,
private val topicResolver: TopicResolver,
private val objectMapper: ObjectMapper
) : EventPublisher {
private val logger: Logger = LoggerFactory.getLogger(KafkaDomainEventPublisher::class.java)

override fun <TId> publishFor(aggregateRoot: AggregateRoot<TId>) = runBlocking {
mapEventsToProducerRecords(aggregateRoot)
.forEach { record -> publisher.publishScope { offer(record) } }
}

private fun <TId> mapEventsToProducerRecords(
aggregateRoot: AggregateRoot<TId>
): List<ProducerRecord<String, Any>> = aggregateRoot.domainEvents()
.map { event ->
val topic: Topic = topicResolver(aggregateRoot.aggregateName)
logger.info("Publishing event {} to topic {}", event, topic.name)
ProducerRecord<String, Any>(
topic.name,
aggregateRoot.idAsString,
objectMapper.writeValueAsString(event)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka

import com.fasterxml.jackson.module.kotlin.readValue
import com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.serialization.JacksonConfiguration
import org.apache.kafka.common.serialization.*

private val kafkaObjectMapperRef = JacksonConfiguration.default

@Suppress("UNCHECKED_CAST")
class StoveKafkaValueDeserializer<T : Any> : Deserializer<T> {
override fun deserialize(
topic: String,
data: ByteArray
): T = kafkaObjectMapperRef.readValue<Any>(data) as T
}

class StoveKafkaValueSerializer<T : Any> : Serializer<T> {
override fun serialize(
topic: String,
data: T
): ByteArray = kafkaObjectMapperRef.writeValueAsBytes(data)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka

data class Topic(
val name: String,
val retry: String,
val deadLetter: String,
val maxRetry: Int = 1,
val concurrency: Int = 1
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.trendyol.stove.examples.kotlin.ktor.infra.boilerplate.kafka

import com.trendyol.stove.examples.kotlin.ktor.application.*

class TopicResolver(private val kafkaConfiguration: KafkaConfiguration) {
operator fun invoke(aggregateName: String): Topic = kafkaConfiguration.topics.getValue(aggregateName)
}
Loading

0 comments on commit 01343c1

Please sign in to comment.