Skip to content

Commit

Permalink
Enrich Ktor example with stanalone kafka and and proper configuration…
Browse files Browse the repository at this point in the history
… handling (#409)
  • Loading branch information
osoykan authored Apr 29, 2024
1 parent fbd1cf5 commit b77dd12
Show file tree
Hide file tree
Showing 44 changed files with 906 additions and 428 deletions.
9 changes: 9 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import org.gradle.plugins.ide.idea.model.IdeaModel
import org.jetbrains.dokka.gradle.DokkaMultiModuleTask
import org.jetbrains.kotlin.gradle.dsl.JvmTarget

Expand All @@ -11,6 +12,7 @@ plugins {
alias(testLibs.plugins.testLogger)
alias(libs.plugins.kover)
alias(libs.plugins.detekt)
idea
java
}
group = "com.trendyol"
Expand Down Expand Up @@ -53,6 +55,7 @@ subprojects.of("lib", "spring", "examples", "ktor") {
plugin(rootProject.testLibs.plugins.testLogger.get().pluginId)
plugin(rootProject.libs.plugins.kover.get().pluginId)
plugin(rootProject.libs.plugins.detekt.get().pluginId)
plugin("idea")
}

val testImplementation by configurations
Expand Down Expand Up @@ -81,6 +84,12 @@ subprojects.of("lib", "spring", "examples", "ktor") {
ktlint().setEditorConfigPath(rootProject.layout.projectDirectory.file(".editorconfig"))
}
}
the<IdeaModel>().apply {
module {
isDownloadSources = true
isDownloadJavadoc = true
}
}

tasks {
test {
Expand Down
55 changes: 30 additions & 25 deletions examples/ktor-example/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,42 +1,47 @@
@file:Suppress("UnstableApiUsage", "DSL_SCOPE_VIOLATION")

plugins {
kotlin("jvm") version libs.versions.kotlin
application
idea
kotlin("plugin.serialization") version libs.versions.kotlin
kotlin("jvm") version libs.versions.kotlin
application
idea
kotlin("plugin.serialization") version libs.versions.kotlin
}

application {
val groupId = rootProject.group.toString()
val artifactId = project.name
mainClass.set("$groupId.$artifactId.ApplicationKt")
val groupId = rootProject.group.toString()
val artifactId = project.name
mainClass.set("$groupId.$artifactId.ApplicationKt")

val isDevelopment: Boolean = project.ext.has("development")
applicationDefaultJvmArgs = listOf("-Dio.ktor.development=$isDevelopment")
val isDevelopment: Boolean = project.ext.has("development")
applicationDefaultJvmArgs = listOf("-Dio.ktor.development=$isDevelopment")
}

dependencies {
implementation(libs.ktor.server)
implementation(libs.ktor.server.netty)
implementation(libs.ktor.serialization.kotlinx.json)
implementation(libs.ktor.server.call.logging)
implementation(libs.koin.ktor)
implementation(libs.koin.logger.slf4j)
implementation(libs.kotlinx.reactor)
implementation(libs.r2dbc.postgresql)
implementation(libs.ktor.server)
implementation(libs.ktor.server.netty)
implementation(libs.ktor.serialization.kotlinx.json)
implementation(libs.ktor.server.call.logging)
implementation(libs.koin.ktor)
implementation(libs.koin.logger.slf4j)
implementation(libs.kotlinx.reactor)
implementation(libs.r2dbc.postgresql)
implementation(libs.kafkaKotlin)
implementation(libs.hoplite.yaml)
implementation(libs.jackson.kotlin)
implementation(libs.jackson.databind)
}

dependencies {
testImplementation(testLibs.ktor.server.tests.jvm)
testImplementation(testLibs.kotest.property.jvm)
testImplementation(testLibs.kotest.runner.junit5)
testImplementation(project(":lib:stove-testing-e2e-http"))
testImplementation(project(":lib:stove-testing-e2e-wiremock"))
testImplementation(project(":lib:stove-testing-e2e-rdbms-postgres"))
testImplementation(project(":starters:ktor:stove-ktor-testing-e2e"))
testImplementation(testLibs.ktor.server.tests.jvm)
testImplementation(testLibs.kotest.property.jvm)
testImplementation(testLibs.kotest.runner.junit5)
testImplementation(projects.stove.lib.stoveTestingE2eHttp)
testImplementation(projects.stove.lib.stoveTestingE2eWiremock)
testImplementation(projects.stove.lib.stoveTestingE2eRdbmsPostgres)
testImplementation(projects.stove.lib.stoveTestingE2eKafka)
testImplementation(projects.stove.starters.ktor.stoveKtorTestingE2e)
}

repositories {
mavenCentral()
mavenCentral()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.plugins.callloging.*
import io.ktor.server.plugins.contentnegotiation.*
import org.koin.core.module.Module
import org.koin.dsl.module
import org.koin.ktor.ext.get
import org.koin.ktor.plugin.Koin
import org.koin.logger.SLF4JLogger
import stove.ktor.example.app.*
import stove.ktor.example.application.ExampleAppConsumer

const val CONNECT_TIMEOUT_SECONDS = 10L

Expand All @@ -25,22 +26,35 @@ fun run(
shouldWait: Boolean = false,
applicationOverrides: () -> Module = { module { } }
): Application {
val applicationEngine = embeddedServer(Netty, port = 8080, host = "0.0.0.0") {
mainModule(args, applicationOverrides)
val config = loadConfiguration<AppConfiguration>(args)

val applicationEngine = embeddedServer(Netty, port = config.port, host = "localhost") {
mainModule(config, applicationOverrides)
}

applicationEngine.environment.monitor.subscribe(ApplicationStarted) {
it.get<ExampleAppConsumer<String, Any>>().start()
}

applicationEngine.environment.monitor.subscribe(ApplicationStopping) {
it.get<ExampleAppConsumer<String, Any>>().stop()
}

applicationEngine.start(wait = shouldWait)
return applicationEngine.application
}

fun Application.mainModule(args: Array<String>, applicationOverrides: () -> Module) {
fun Application.mainModule(config: AppConfiguration, applicationOverrides: () -> Module) {
install(ContentNegotiation) {
json()
}

install(Koin) {
SLF4JLogger()
modules(
postgresql(args),
module { single { config } },
kafka(),
postgresql(),
app(),
applicationOverrides()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package stove.ktor.example.app

import com.fasterxml.jackson.databind.ObjectMapper
import org.koin.dsl.*
import stove.ktor.example.application.*
import stove.ktor.example.domain.ProductRepository

val objectMapperRef: ObjectMapper = ObjectMapper().apply {
findAndRegisterModules()
}

fun app() = module {
single { ProductRepository(get()) }
single { ProductService(get(), get()) }
single { ProductService(get(), get(), get()) }
single { MutexLockProvider() }.bind<LockProvider>()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package stove.ktor.example.app

import com.sksamuel.hoplite.*
import com.sksamuel.hoplite.env.Environment

@OptIn(ExperimentalHoplite::class)
inline fun <reified T : Any> loadConfiguration(args: Array<String> = arrayOf()): T = ConfigLoaderBuilder.default()
.addEnvironmentSource()
.addCommandLineSource(args)
.withExplicitSealedTypes()
.withEnvironment(AppEnv.toEnv())
.apply {
when (AppEnv.current()) {
AppEnv.Local -> {
addResourceSource("/application.yaml", optional = true)
}

AppEnv.Prod -> {
addResourceSource("/application-prod.yaml", optional = true)
addResourceSource("/application.yaml", optional = true)
}

else -> {
addResourceSource("/application.yaml", optional = true)
}
}
}
.build()
.loadConfigOrThrow<T>()

data class AppConfiguration(
val port: Int,
val database: DatabaseConfiguration,
val kafka: KafkaConfiguration
)

data class DatabaseConfiguration(
val host: String,
val port: Int,
val name: String,
val jdbcUrl: String,
val username: String,
val password: String
)

data class KafkaConfiguration(
val bootstrapServers: String,
val groupId: String,
val clientId: String,
val interceptorClasses: List<String>,
val topics: Map<String, TopicConfiguration>
)

data class TopicConfiguration(
val topic: String,
val retry: String,
val error: String
)

enum class AppEnv(val env: String) {
Unspecified(""),
Local(Environment.local.name),
Prod(Environment.prod.name)
;

companion object {
fun current(): AppEnv = when (System.getenv("ENVIRONMENT")) {
Unspecified.env -> Unspecified
Local.env -> Local
Prod.env -> Prod
else -> Local
}

fun toEnv(): Environment = when (current()) {
Local -> Environment.local
Prod -> Environment.prod
else -> Environment.local
}
}

fun isLocal(): Boolean {
return this === Local
}

fun isProd(): Boolean {
return this === Prod
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package stove.ktor.example.app

import io.r2dbc.postgresql.*
import org.koin.core.context.GlobalContext.get
import org.koin.dsl.module
import stove.ktor.example.CONNECT_TIMEOUT_SECONDS
import java.time.Duration

fun postgresql(args: Array<String>) = module {
val map = args.associate { it.split("=")[0] to it.split("=")[1] }
fun postgresql() = module {
single {
val config = get<AppConfiguration>()
val builder = PostgresqlConnectionConfiguration.builder().apply {
host(map["database.host"]!!)
database(map["database.databaseName"]!!)
port(map["database.port"]!!.toInt())
password(map["database.password"]!!)
username(map["database.username"]!!)
host(config.database.host)
database(config.database.name)
port(config.database.port)
password(config.database.password)
username(config.database.username)
}

PostgresqlConnectionFactory(builder.connectTimeout(Duration.ofSeconds(CONNECT_TIMEOUT_SECONDS)).build())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package stove.ktor.example.app

import com.fasterxml.jackson.module.kotlin.readValue
import io.github.nomisRev.kafka.publisher.*
import io.github.nomisRev.kafka.receiver.*
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.*
import org.koin.core.module.Module
import org.koin.dsl.module
import stove.ktor.example.application.ExampleAppConsumer
import java.util.*

fun kafka(): Module = module {
single { createReceiver<Any>(get()) }
single { createPublisher(get()) }
single { ExampleAppConsumer<String, Any>(get(), get()) }
}

private fun <V : Any> createReceiver(config: AppConfiguration): KafkaReceiver<String, V> {
val settings = ReceiverSettings(
config.kafka.bootstrapServers,
StringDeserializer(),
ExampleAppKafkaValueDeserializer<V>(),
config.kafka.groupId,
autoOffsetReset = AutoOffsetReset.Earliest,
properties = Properties().apply {
put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, config.kafka.interceptorClasses)
put(ConsumerConfig.CLIENT_ID_CONFIG, config.kafka.clientId)
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, true)
put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000")
}
)
return KafkaReceiver(settings)
}

private fun createPublisher(config: AppConfiguration): KafkaPublisher<String, Any> = PublisherSettings(
config.kafka.bootstrapServers,
StringSerializer(),
ExampleAppKafkaValueSerializer(),
properties = Properties().apply {
put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, config.kafka.interceptorClasses)
put(ProducerConfig.CLIENT_ID_CONFIG, config.kafka.clientId)
}
).let { KafkaPublisher(it) }

@Suppress("UNCHECKED_CAST")
class ExampleAppKafkaValueDeserializer<T : Any> : Deserializer<T> {
override fun deserialize(
topic: String,
data: ByteArray
): T = objectMapperRef.readValue<Any>(data) as T
}

class ExampleAppKafkaValueSerializer<T : Any> : Serializer<T> {
override fun serialize(
topic: String,
data: T
): ByteArray = objectMapperRef.writeValueAsBytes(data)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import org.koin.ktor.ext.get
import stove.ktor.example.application.ProductService
import stove.ktor.example.application.UpdateProductRequest
import stove.ktor.example.application.*

fun Application.configureRouting() {
routing {
post("/products/{id}") {
val id = call.parameters["id"]!!.toLong()
try {
val id = call.parameters["id"]!!.toInt()
val request = call.receive<UpdateProductRequest>()
call.get<ProductService>().update(id, request)
call.respond(HttpStatusCode.OK)
Expand Down
Loading

0 comments on commit b77dd12

Please sign in to comment.