From 9b136c41154a8d268e6598ab531b85bfe6f6d380 Mon Sep 17 00:00:00 2001 From: Oguzhan Soykan Date: Fri, 26 Apr 2024 15:38:50 +0200 Subject: [PATCH] feature: Standalone kafka now hosts a grpc endpoint to communicate and understand the messages --- buildSrc/build.gradle.kts | 10 +- buildSrc/settings.gradle.kts | 8 + gradle/libs.versions.toml | 15 ++ lib/stove-testing-e2e-kafka/build.gradle.kts | 61 ++++++- .../standalone/kafka/KafkaContainerOptions.kt | 19 ++ .../e2e/standalone/kafka/KafkaContext.kt | 40 +++++ .../kafka/KafkaExposedConfiguration.kt | 7 + .../e2e/standalone/kafka/KafkaSystem.kt | 170 +++++------------- .../standalone/kafka/KafkaSystemOptions.kt | 14 ++ .../e2e/standalone/kafka/SubscribeToAll.kt | 51 ------ .../kafka/intercepting/CommonOps.kt | 82 ++------- .../kafka/intercepting/ConsumingOps.kt | 103 ++++++----- .../kafka/intercepting/StoveKafkaBridge.kt | 115 ++++++++++++ .../StoveKafkaObserverGrpcServerAdapter.kt | 32 ++++ .../intercepting/TestSystemInterceptor.kt | 43 ----- .../intercepting/TestSystemMessageSink.kt | 32 ++++ .../src/main/proto/messages.proto | 38 ++++ .../standalone/kafka/setup/ProjectConfig.kt | 26 ++- .../kafka/setup/example/KafkaTestShared.kt | 19 +- .../kafka/setup/example/StoveListener.kt | 45 +++-- .../example/consumers/BacklogConsumer.kt | 5 +- .../example/consumers/ProductConsumer.kt | 5 +- .../consumers/ProductFailingConsumer.kt | 7 +- .../kafka/tests/KafkaSystemTests.kt | 17 +- 24 files changed, 575 insertions(+), 389 deletions(-) create mode 100644 buildSrc/settings.gradle.kts create mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaContainerOptions.kt create mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaContext.kt create mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaExposedConfiguration.kt create mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystemOptions.kt delete mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/SubscribeToAll.kt create mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt create mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaObserverGrpcServerAdapter.kt delete mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemInterceptor.kt create mode 100644 lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemMessageSink.kt create mode 100644 lib/stove-testing-e2e-kafka/src/main/proto/messages.proto diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 3a351192..40636e1f 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -1,7 +1,9 @@ -plugins { `kotlin-dsl` } +plugins { + `kotlin-dsl` +} repositories { - mavenCentral() - google() - gradlePluginPortal() + mavenCentral() + google() + gradlePluginPortal() } diff --git a/buildSrc/settings.gradle.kts b/buildSrc/settings.gradle.kts new file mode 100644 index 00000000..a65935dd --- /dev/null +++ b/buildSrc/settings.gradle.kts @@ -0,0 +1,8 @@ +rootProject.name = "buildSrc" +dependencyResolutionManagement { + versionCatalogs { + create("libs").from(files("../gradle/libs.versions.toml")) + } +} + +enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS") diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a9d2b300..0e89d66b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -32,6 +32,10 @@ testcontainers = "1.19.7" r2dbc-mssql = "1.0.2.RELEASE" spotless = "6.25.0" detekt = "1.23.6" +wire = "5.0.0-alpha01" +io-grpc = "1.63.0" +io-grpc-kotlin = "1.4.1" +google-protobuf-kotlin = "4.26.1" [libraries] kotlin-stdlib-jdk8 = { module = "org.jetbrains.kotlin:kotlin-stdlib-jdk8", version.ref = "kotlin" } @@ -108,6 +112,16 @@ spring-framework-6x-context = { module = "org.springframework:spring-context", v spring-framework-6x-beans = { module = "org.springframework:spring-beans", version.ref = "spring-framework-6x" } detekt-formatting = { module = "io.gitlab.arturbosch.detekt:detekt-formatting", version.ref = "detekt" } +wire-grpc-server = { module = "com.squareup.wiregrpcserver:server", version = "1.0.0-alpha03" } +wire-grpc-server-generator = { module = "com.squareup.wiregrpcserver:server-generator", version = "1.0.0-alpha03" } +wire-grpc-client = { module = "com.squareup.wire:wire-grpc-client", version.ref = "wire" } +wire-grpc-runtime = { module = "com.squareup.wire:wire-runtime", version.ref = "wire" } +io-grpc = { module = "io.grpc:grpc-core", version.ref = "io-grpc" } +io-grpc-stub = { module = "io.grpc:grpc-stub", version.ref = "io-grpc" } +io-grpc-protobuf = { module = "io.grpc:grpc-protobuf", version.ref = "io-grpc" } +io-grpc-netty = { module = "io.grpc:grpc-netty", version.ref = "io-grpc" } +io-grpc-kotlin = { module = "io.grpc:grpc-kotlin-stub", version.ref = "io-grpc-kotlin" } +google-protobuf-kotlin = { module = "com.google.protobuf:protobuf-kotlin", version.ref = "google-protobuf-kotlin" } [plugins] spring-plugin = { id = "org.jetbrains.kotlin.plugin.spring", version.ref = "kotlin" } @@ -121,4 +135,5 @@ kover = { id = "org.jetbrains.kotlinx.kover", version.ref = "kover" } gitVersioning = { id = "com.palantir.git-version", version = "3.0.0" } spotless = { id = "com.diffplug.spotless", version.ref = "spotless" } detekt = { id = "io.gitlab.arturbosch.detekt", version.ref = "detekt" } +wire = { id = "com.squareup.wire", version.ref = "wire" } diff --git a/lib/stove-testing-e2e-kafka/build.gradle.kts b/lib/stove-testing-e2e-kafka/build.gradle.kts index ae2666eb..50348c94 100644 --- a/lib/stove-testing-e2e-kafka/build.gradle.kts +++ b/lib/stove-testing-e2e-kafka/build.gradle.kts @@ -1,13 +1,58 @@ +plugins { + alias(libs.plugins.wire) +} + dependencies { - api(projects.lib.stoveTestingE2e) - api(libs.testcontainers.kafka) - implementation(libs.kafka) - implementation(libs.kotlinx.io.reactor.extensions) - implementation(libs.kotlinx.jdk8) - implementation(libs.kotlinx.core) - implementation(libs.kafkaKotlin) + api(projects.lib.stoveTestingE2e) + api(libs.testcontainers.kafka) + implementation(libs.kafka) + implementation(libs.kotlinx.io.reactor.extensions) + implementation(libs.kotlinx.jdk8) + implementation(libs.kotlinx.core) + implementation(libs.kafkaKotlin) + + api(libs.wire.grpc.server) + api(libs.wire.grpc.client) + api(libs.wire.grpc.runtime) + api(libs.io.grpc) + api(libs.io.grpc.protobuf) + api(libs.io.grpc.stub) + api(libs.io.grpc.kotlin) + api(libs.io.grpc.netty) + api(libs.google.protobuf.kotlin) } dependencies { - testImplementation(libs.slf4j.simple) + testImplementation(libs.slf4j.simple) +} + +buildscript { + dependencies { + classpath(libs.wire.grpc.server.generator) + } +} + +wire { + sourcePath("src/main/proto") + kotlin { + rpcRole = "client" + rpcCallStyle = "suspending" + exclusive = false + javaInterop = true + } + kotlin { + custom { + schemaHandlerFactory = com.squareup.wire.kotlin.grpcserver.GrpcServerSchemaHandler.Factory() + options = mapOf( + "singleMethodServices" to "false", + "rpcCallStyle" to "suspending" + ) + } + rpcRole = "server" + rpcCallStyle = "suspending" + exclusive = false + singleMethodServices = false + javaInterop = true + includes = listOf("StoveKafkaObserverService") + } } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaContainerOptions.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaContainerOptions.kt new file mode 100644 index 00000000..a07dfa06 --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaContainerOptions.kt @@ -0,0 +1,19 @@ +package com.trendyol.stove.testing.e2e.standalone.kafka + +import com.trendyol.stove.testing.e2e.containers.ContainerFn +import com.trendyol.stove.testing.e2e.containers.ContainerOptions +import com.trendyol.stove.testing.e2e.containers.DEFAULT_REGISTRY +import org.testcontainers.containers.KafkaContainer + +data class KafkaContainerOptions( + override val registry: String = DEFAULT_REGISTRY, + override val image: String = "confluentinc/cp-kafka", + override val tag: String = "latest", + val ports: List = DEFAULT_KAFKA_PORTS, + override val compatibleSubstitute: String? = null, + override val containerFn: ContainerFn = { } +) : ContainerOptions { + companion object { + val DEFAULT_KAFKA_PORTS = listOf(9092, 9093) + } +} diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaContext.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaContext.kt new file mode 100644 index 00000000..fe60abeb --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaContext.kt @@ -0,0 +1,40 @@ +package com.trendyol.stove.testing.e2e.standalone.kafka + +import arrow.core.getOrElse +import com.trendyol.stove.testing.e2e.containers.withProvidedRegistry +import com.trendyol.stove.testing.e2e.system.* +import com.trendyol.stove.testing.e2e.system.abstractions.SystemNotRegisteredException +import com.trendyol.stove.testing.e2e.system.annotations.StoveDsl +import org.testcontainers.containers.KafkaContainer + +data class KafkaContext( + val container: KafkaContainer, + val options: KafkaSystemOptions +) + +internal fun TestSystem.kafka(): KafkaSystem = getOrNone().getOrElse { + throw SystemNotRegisteredException(KafkaSystem::class) +} + +internal fun TestSystem.withKafka(options: KafkaSystemOptions = KafkaSystemOptions()): TestSystem { + val kafka = withProvidedRegistry( + options.containerOptions.imageWithTag, + options.containerOptions.registry, + options.containerOptions.compatibleSubstitute + ) { + KafkaContainer(it) + .withExposedPorts(*options.containerOptions.ports.toTypedArray()) + .apply(options.containerOptions.containerFn) + .withReuse(this.options.keepDependenciesRunning) + } + getOrRegister(KafkaSystem(this, KafkaContext(kafka, options))) + return this +} + +@StoveDsl +suspend fun ValidationDsl.kafka( + validation: @StoveDsl suspend KafkaSystem.() -> Unit +): Unit = validation(this.testSystem.kafka()) + +@StoveDsl +fun WithDsl.kafka(configure: () -> KafkaSystemOptions): TestSystem = this.testSystem.withKafka(configure()) diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaExposedConfiguration.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaExposedConfiguration.kt new file mode 100644 index 00000000..16905053 --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaExposedConfiguration.kt @@ -0,0 +1,7 @@ +package com.trendyol.stove.testing.e2e.standalone.kafka + +import com.trendyol.stove.testing.e2e.system.abstractions.ExposedConfiguration + +data class KafkaExposedConfiguration( + val bootstrapServers: String +) : ExposedConfiguration diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt index 714018ce..9f680826 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt @@ -6,79 +6,26 @@ import arrow.core.* import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.trendyol.stove.functional.* -import com.trendyol.stove.testing.e2e.containers.* import com.trendyol.stove.testing.e2e.serialization.StoveObjectMapper import com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.* -import com.trendyol.stove.testing.e2e.system.* +import com.trendyol.stove.testing.e2e.system.TestSystem import com.trendyol.stove.testing.e2e.system.abstractions.* +import com.trendyol.stove.testing.e2e.system.annotations.StoveDsl import io.github.nomisRev.kafka.* import io.github.nomisRev.kafka.publisher.* -import io.github.nomisRev.kafka.receiver.* +import io.grpc.ServerBuilder import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.admin.* -import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.* import org.slf4j.* -import org.testcontainers.containers.KafkaContainer import kotlin.reflect.KClass import kotlin.time.Duration -import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds -data class KafkaExposedConfiguration( - val bootstrapServers: String -) : ExposedConfiguration - -data class KafkaContainerOptions( - override val registry: String = DEFAULT_REGISTRY, - override val image: String = "confluentinc/cp-kafka", - override val tag: String = "latest", - val ports: List = DEFAULT_KAFKA_PORTS, - override val compatibleSubstitute: String? = null, - override val containerFn: ContainerFn = { } -) : ContainerOptions { - companion object { - val DEFAULT_KAFKA_PORTS = listOf(9092, 9093) - } -} - -class KafkaSystemOptions( - val containerOptions: KafkaContainerOptions = KafkaContainerOptions(), - val errorTopicSuffixes: List = listOf("error", "errorTopic", "retry", "retryTopic"), - val objectMapper: ObjectMapper = StoveObjectMapper.Default, - override val configureExposedConfiguration: (KafkaExposedConfiguration) -> List = { _ -> listOf() } -) : SystemOptions, ConfiguresExposedConfiguration - -data class KafkaContext( - val container: KafkaContainer, - val options: KafkaSystemOptions -) - -internal fun TestSystem.kafka(): KafkaSystem = getOrNone().getOrElse { - throw SystemNotRegisteredException(KafkaSystem::class) -} - -internal fun TestSystem.withKafka(options: KafkaSystemOptions = KafkaSystemOptions()): TestSystem { - val kafka = - withProvidedRegistry( - options.containerOptions.imageWithTag, - options.containerOptions.registry, - options.containerOptions.compatibleSubstitute - ) { - KafkaContainer(it) - .withExposedPorts(*options.containerOptions.ports.toTypedArray()) - .apply(options.containerOptions.containerFn) - .withReuse(this.options.keepDependenciesRunning) - } - getOrRegister(KafkaSystem(this, KafkaContext(kafka, options))) - return this -} - -suspend fun ValidationDsl.kafka(validation: suspend KafkaSystem.() -> Unit): Unit = validation(this.testSystem.kafka()) - -fun WithDsl.kafka(configure: () -> KafkaSystemOptions): TestSystem = this.testSystem.withKafka(configure()) +var stoveKafkaObjectMapperRef: ObjectMapper = StoveObjectMapper.Default +@StoveDsl class KafkaSystem( override val testSystem: TestSystem, private val context: KafkaContext @@ -86,42 +33,47 @@ class KafkaSystem( private lateinit var exposedConfiguration: KafkaExposedConfiguration private lateinit var adminClient: Admin private lateinit var kafkapublisher: KafkaPublisher - private lateinit var subscribeToAllConsumer: SubscribeToAll - private lateinit var interceptor: TestSystemKafkaInterceptor - private val assertedMessages: MutableList = mutableListOf() - private val assertedConditions: MutableList<(Any) -> Boolean> = mutableListOf() + private lateinit var sink: TestSystemMessageSink private val logger: Logger = LoggerFactory.getLogger(javaClass) - private val state: StateOfSystem = - StateOfSystem( - testSystem.options, - KafkaSystem::class, - KafkaExposedConfiguration::class - ) + private val state: StateOfSystem = StateOfSystem( + testSystem.options, + KafkaSystem::class, + KafkaExposedConfiguration::class + ) override suspend fun run() { - exposedConfiguration = - state.capture { - context.container.start() - KafkaExposedConfiguration(context.container.bootstrapServers) - } + exposedConfiguration = state.capture { + context.container.start() + KafkaExposedConfiguration(context.container.bootstrapServers) + } adminClient = createAdminClient(exposedConfiguration) kafkapublisher = createPublisher(exposedConfiguration) - } - - override suspend fun afterRun() { - interceptor = TestSystemKafkaInterceptor( + sink = TestSystemMessageSink( adminClient, - context.options.objectMapper, - InterceptionOptions(errorTopicSuffixes = context.options.errorTopicSuffixes) + StoveObjectMapper.Default, + InterceptionOptions(context.options.errorTopicSuffixes) ) - subscribeToAllConsumer = SubscribeToAll( - adminClient, - consumer(exposedConfiguration), - interceptor - ) - subscribeToAllConsumer.start() + startGrpcServer() + } + + private fun startGrpcServer() { + System.setProperty(STOVE_KAFKA_BRIDGE_PORT, context.options.bridgeGrpcServerPort.toString()) + Try { + ServerBuilder.forPort(context.options.bridgeGrpcServerPort) + .addService(StoveKafkaObserverGrpcServerAdapter(sink)) + .build() + .start() + }.recover { + logger.error("Failed to start Wire-Grpc-Server", it) + throw it + }.map { + logger.info("Wire-Grpc-Server started on port ${context.options.bridgeGrpcServerPort}") + } } + override suspend fun afterRun() = Unit + + @StoveDsl suspend fun publish( topic: String, message: Any, @@ -135,11 +87,11 @@ class KafkaSystem( return this } + @StoveDsl suspend fun shouldBeConsumed( atLeastIn: Duration = 5.seconds, message: Any - ): KafkaSystem = interceptor - .also { assertedMessages.add(message) } + ): KafkaSystem = sink .waitUntilConsumed(atLeastIn, message::class) { actual -> actual.isSome { it == message } } .let { this } @@ -152,18 +104,7 @@ class KafkaSystem( } @PublishedApi - internal suspend fun shouldBeConsumedOnCondition( - atLeastIn: Duration = 5.seconds, - condition: (T) -> Boolean, - clazz: KClass - ): KafkaSystem = - interceptor - .also { assertedConditions.add(condition as (Any) -> Boolean) } - .waitUntilConsumed(atLeastIn, clazz) { actual -> actual.isSome { condition(it) } } - .let { this } - - @PublishedApi - internal suspend fun shouldBeFailedOnCondition( + internal suspend fun shouldBeFailed( atLeastIn: Duration = 5.seconds, condition: (T, Throwable) -> Boolean, clazz: KClass @@ -171,26 +112,14 @@ class KafkaSystem( TODO("Not yet implemented") } - private fun consumer(cfg: KafkaExposedConfiguration): KafkaReceiver = - mapOf( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to cfg.bootstrapServers, - ConsumerConfig.GROUP_ID_CONFIG to "stove-kafka-subscribe-to-all", - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StoveKafkaValueDeserializer::class.java, - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, - ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 2.milliseconds.inWholeMilliseconds.toInt() - ).let { - KafkaReceiver( - ReceiverSettings( - cfg.bootstrapServers, - StringDeserializer(), - StoveKafkaValueDeserializer(), - SUBSCRIBE_TO_ALL_GROUP_ID, - properties = it.toProperties() - ) - ) - } + @PublishedApi + internal suspend fun shouldBeConsumed( + atLeastIn: Duration = 5.seconds, + condition: (T) -> Boolean, + clazz: KClass + ): KafkaSystem = sink + .waitUntilConsumed(atLeastIn, clazz) { actual -> actual.isSome { condition(it) } } + .let { this } private fun createPublisher(exposedConfiguration: KafkaExposedConfiguration): KafkaPublisher = PublisherSettings( exposedConfiguration.bootstrapServers, @@ -215,14 +144,13 @@ class KafkaSystem( override fun close(): Unit = runBlocking { Try { - subscribeToAllConsumer.close() kafkapublisher.close() executeWithReuseCheck { stop() } } }.recover { logger.warn("got an error while stopping: ${it.message}") }.let { } companion object { - const val SUBSCRIBE_TO_ALL_GROUP_ID = "stove-kafka-subscribe-to-all" + const val STOVE_KAFKA_BRIDGE_PORT = "STOVE_KAFKA_BRIDGE_PORT" } } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystemOptions.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystemOptions.kt new file mode 100644 index 00000000..9bb7acfc --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystemOptions.kt @@ -0,0 +1,14 @@ +package com.trendyol.stove.testing.e2e.standalone.kafka + +import com.fasterxml.jackson.databind.ObjectMapper +import com.trendyol.stove.testing.e2e.serialization.StoveObjectMapper +import com.trendyol.stove.testing.e2e.system.abstractions.ConfiguresExposedConfiguration +import com.trendyol.stove.testing.e2e.system.abstractions.SystemOptions + +class KafkaSystemOptions( + val containerOptions: KafkaContainerOptions = KafkaContainerOptions(), + val errorTopicSuffixes: List = listOf("error", "errorTopic", "retry", "retryTopic"), + val bridgeGrpcServerPort: Int = 50051, + val objectMapper: ObjectMapper = StoveObjectMapper.Default, + override val configureExposedConfiguration: (KafkaExposedConfiguration) -> List = { _ -> listOf() } +) : SystemOptions, ConfiguresExposedConfiguration diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/SubscribeToAll.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/SubscribeToAll.kt deleted file mode 100644 index a4827fac..00000000 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/SubscribeToAll.kt +++ /dev/null @@ -1,51 +0,0 @@ -package com.trendyol.stove.testing.e2e.standalone.kafka - -import com.trendyol.stove.functional.Try -import com.trendyol.stove.functional.recover -import com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.TestSystemKafkaInterceptor -import com.trendyol.stove.testing.e2e.system.abstractions.SystemConfigurationException -import io.github.nomisRev.kafka.receiver.KafkaReceiver -import kotlinx.coroutines.* -import org.apache.kafka.clients.admin.Admin -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -class SubscribeToAll( - private val adminClient: Admin, - private val receiver: KafkaReceiver, - private val interceptor: TestSystemKafkaInterceptor -) : AutoCloseable { - private lateinit var subscription: Job - private val logger: Logger = LoggerFactory.getLogger(javaClass) - - @OptIn(DelicateCoroutinesApi::class) - suspend fun start() = - coroutineScope { - val topics = adminClient.listTopics().names().get() - if (!topics.any()) { - throw SystemConfigurationException( - KafkaSystem::class, - "Topics are not found, please enable creating topics before running e2e tests on them. " + - "Stove depends on created topics to be able to understand what is consumed and published" - ) - } - - subscription = - GlobalScope.launch { - while (!subscription.isCancelled) { - receiver.withConsumer { consumer -> - receiver - .receive(topics) - .collect { - Try { interceptor.onMessage(it, consumer) } - .recover { logger.warn("$javaClass got an exception: $it") } - } - } - } - } - } - - override fun close() { - Try { subscription.cancel() } - } -} diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/CommonOps.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/CommonOps.kt index e3599b26..614faee2 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/CommonOps.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/CommonOps.kt @@ -1,23 +1,13 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting -import arrow.core.Option -import arrow.core.align -import arrow.core.toOption +import arrow.core.* import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.trendyol.stove.testing.e2e.standalone.kafka.KafkaSystem -import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.delay -import kotlinx.coroutines.withTimeout +import kotlinx.coroutines.* import org.apache.kafka.clients.admin.Admin -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.common.TopicPartition import java.util.* import java.util.concurrent.ConcurrentMap -import kotlin.math.abs import kotlin.reflect.KClass import kotlin.time.Duration -import kotlin.time.Duration.Companion.seconds data class KafkaAssertion( val clazz: KClass, @@ -54,68 +44,28 @@ internal interface CommonOps : RecordsAssertions { fun throwIfFailed( clazz: KClass, selector: (Option) -> Boolean - ): Unit = - exceptions - .filter { selector(readCatching(it.value.message.toString(), clazz).getOrNull().toOption()) } - .forEach { throw it.value.reason } + ): Unit = exceptions + .filter { selector(readCatching(it.value.message.toString(), clazz).getOrNull().toOption()) } + .forEach { throw it.value.reason } fun readCatching( json: Any, clazz: KClass - ): Result = - runCatching { - when (json) { - is String -> serde.readValue(json, clazz.java) - else -> jacksonObjectMapper().convertValue(json, clazz.java) + ): Result = runCatching { + when (json) { + is String -> { + val converted = serde.readValue(json, clazz.java) + converted } - } - - fun reset(): Unit = exceptions.clear() - - fun dumpMessages(): String - - private fun consumerOffset(): Map = - adminClient.listConsumerGroups().all().get() - .filterNot { it.groupId() == KafkaSystem.SUBSCRIBE_TO_ALL_GROUP_ID } - .flatMap { group -> - val offsets = adminClient.listConsumerGroupOffsets(group.groupId()).partitionsToOffsetAndMetadata().get() - offsets.map { it.key to it.value.offset() } - }.toMap() - private fun producerOffset( - consumer: Consumer, - consumerOffset: Map - ): Map { - val partitions = - consumerOffset.map { - TopicPartition(it.key.topic(), it.key.partition()) + else -> { + val converted = serde.convertValue(json, clazz.java) + converted } - return consumer.endOffsets(partitions) + } } - private fun computeLag( - consumerOffset: Map, - producerOffset: Map - ): List = - consumerOffset - .align(producerOffset) { - val lag = - it.value.fold(fa = { 0 }, fb = { 0 }) { a, b -> - abs(a - b) - } - LagByTopic(it.key, lag) - }.map { it.value } - - private suspend fun waitUntilLagsComputed( - consumer: Consumer, - forTopic: String - ): Collection { - return { computeLag(consumerOffset(), producerOffset(consumer, consumerOffset())) } - .waitUntilConditionMet(5.seconds, "computing lag") { it.topicPartition.topic() == forTopic } - } + fun reset(): Unit = exceptions.clear() - data class LagByTopic( - val topicPartition: TopicPartition, - val lag: Long - ) + fun dumpMessages(): String } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/ConsumingOps.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/ConsumingOps.kt index e4cf64be..7bc23e8b 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/ConsumingOps.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/ConsumingOps.kt @@ -1,18 +1,21 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting +import CommittedMessage +import ConsumedMessage +import PublishedMessage import arrow.core.* import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.Logger -import java.util.UUID +import java.util.* import java.util.concurrent.ConcurrentMap import kotlin.reflect.KClass import kotlin.time.Duration internal interface ConsumingOps : CommonOps { val logger: Logger - val consumedRecords: ConcurrentMap> + val consumedRecords: ConcurrentMap + val publishedMessages: ConcurrentMap + val committedMessages: ConcurrentMap suspend fun waitUntilConsumed( atLeastIn: Duration, @@ -20,63 +23,83 @@ internal interface ConsumingOps : CommonOps { condition: (Option) -> Boolean ) { assertions.putIfAbsent(UUID.randomUUID(), KafkaAssertion(clazz, condition)) - val getRecords = { consumedRecords.map { it.value.value() } } + val getRecords = { consumedRecords.map { it.value } } getRecords.waitUntilConditionMet(atLeastIn, "While CONSUMING ${clazz.java.simpleName}") { - val outcome = readCatching(it, clazz) + val outcome = readCatching(it.message, clazz) outcome.isSuccess && condition(outcome.getOrNull().toOption()) } throwIfFailed(clazz, condition) } - fun recordMessage( - record: ConsumerRecord, - consumer: Consumer - ): Unit = - runBlocking { - consumedRecords.putIfAbsent(UUID.randomUUID(), record) - logger.info( - """ - RECEIVED MESSAGE: - Consumer: ${consumer.groupMetadata().memberId()} | ${consumer.groupMetadata().groupId()} - Topic: ${record.topic()} - Record: ${record.value()} - Key: ${record.key()} - Headers: ${record.headers().map { Pair(it.key(), String(it.value())) }} - TestCase: ${record.headers().firstOrNone { it.key() == "testCase" }.map { String(it.value()) }.getOrElse { "" }} - """.trimIndent() - ) - } + fun recordConsumedMessage(record: ConsumedMessage): Unit = runBlocking { + consumedRecords.putIfAbsent(UUID.randomUUID(), record) + logger.info( + """ + RECEIVED MESSAGE: + Topic: ${record.topic} + Record: ${record.message} + Key: ${record.key} + Headers: ${record.headers.map { Pair(it.key, it.value) }} + TestCase: ${record.headers.firstNotNullOf { it.key == "testCase" }} + """.trimIndent() + ) + } - fun recordError(record: ConsumerRecord): Unit = + fun recordPublishedMessage(record: PublishedMessage): Unit = runBlocking { + publishedMessages.putIfAbsent(UUID.randomUUID(), record) + logger.info( + """ + PUBLISHED MESSAGE: + Topic: ${record.topic} + Record: ${record.message} + Key: ${record.key} + Headers: ${record.headers.map { Pair(it.key, it.value) }} + TestCase: ${record.headers.firstNotNullOf { it.key == "testCase" }} + """.trimIndent() + ) + } + + fun recordCommittedMessage(record: CommittedMessage): Unit = runBlocking { + committedMessages.putIfAbsent(UUID.randomUUID(), record) + logger.info( + """ + COMMITTED MESSAGE: + Topic: ${record.topic} + Offset: ${record.offset} + Partition: ${record.partition} + """.trimIndent() + ) + } + + fun recordError(record: ConsumedMessage): Unit = runBlocking { val exception = AssertionError(buildErrorMessage(record)) - exceptions.putIfAbsent(UUID.randomUUID(), Failure(record.topic(), record.value(), exception)) + exceptions.putIfAbsent(UUID.randomUUID(), Failure(record.topic, record.message, exception)) logger.error( """ CONSUMER GOT AN ERROR: - Topic: ${record.topic()} - Record: ${record.value()} - Key: ${record.key()} - Headers: ${record.headers().map { Pair(it.key(), String(it.value())) }} - TestCase: ${record.headers().firstOrNone { it.key() == "testCase" }.map { String(it.value()) }.getOrElse { "" }} + Topic: ${record.topic} + Record: ${record.message} + Key: ${record.key} + Headers: ${record.headers.map { Pair(it.key, it.value) }} + TestCase: ${record.headers.firstNotNullOf { it.key == "testCase" }} Exception: $exception """.trimIndent() ) } - private fun buildErrorMessage(record: ConsumerRecord): String = + private fun buildErrorMessage(record: ConsumedMessage): String = """ MESSAGE FAILED TO CONSUME: - Topic: ${record.topic()} - Record: ${record.value()} - Key: ${record.key()} - Headers: ${record.headers().map { Pair(it.key(), String(it.value())) }} + Topic: ${record.topic} + Record: ${record.message} + Key: ${record.key} + Headers: ${record.headers.map { Pair(it.key, it.value) }} """.trimIndent() - override fun dumpMessages(): String = - """ - CONSUMED MESSAGES: - ${consumedRecords.map { it.value.value() }.joinToString("\n")} + override fun dumpMessages(): String = """ + CONSUMED MESSAGES SO FAR: + ${consumedRecords.map { it.value }.joinToString("\n")} """.trimIndent() } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt new file mode 100644 index 00000000..21348a43 --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt @@ -0,0 +1,115 @@ +package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting + +import CommittedMessage +import ConsumedMessage +import PublishedMessage +import StoveKafkaObserverServiceClient +import com.fasterxml.jackson.databind.ObjectMapper +import com.squareup.wire.* +import com.trendyol.stove.functional.* +import com.trendyol.stove.testing.e2e.standalone.kafka.KafkaSystem.Companion.STOVE_KAFKA_BRIDGE_PORT +import com.trendyol.stove.testing.e2e.standalone.kafka.stoveKafkaObjectMapperRef +import kotlinx.coroutines.runBlocking +import okhttp3.* +import org.apache.kafka.clients.consumer.* +import org.apache.kafka.clients.producer.* +import org.apache.kafka.common.TopicPartition +import org.slf4j.Logger + +@Suppress("UNUSED") +class StoveKafkaBridge : ConsumerInterceptor, ProducerInterceptor { + private val logger: Logger = org.slf4j.LoggerFactory.getLogger(StoveKafkaBridge::class.java) + + private val client: StoveKafkaObserverServiceClient by lazy { startGrpcClient() } + private val mapper: ObjectMapper by lazy { stoveKafkaObjectMapperRef } + + override fun onSend(record: ProducerRecord): ProducerRecord = runBlocking { + record.also { send(publishedMessage(it)) } + } + + override fun onConsume(records: ConsumerRecords): ConsumerRecords = runBlocking { + records.also { consumedMessages(it).forEach { message -> send(message) } } + } + + override fun onCommit(offsets: MutableMap) = runBlocking { + committedMessages(offsets).forEach { send(it) } + } + + override fun configure(configs: MutableMap) = Unit + + override fun close() = Unit + + override fun onAcknowledgement(metadata: RecordMetadata, exception: Exception) = Unit + + private suspend fun send(consumedMessage: ConsumedMessage) { + Try { + client.onConsumedMessage().execute(consumedMessage) + }.map { + logger.info("Consumed message sent to Stove Kafka Bridge: $consumedMessage") + }.recover { e -> + logger.error("Failed to send consumed message to Stove Kafka Bridge: $consumedMessage", e) + } + } + + private suspend fun send(committedMessage: CommittedMessage) { + Try { + client.onCommittedMessage().execute(committedMessage) + }.map { + logger.info("Committed message sent to Stove Kafka Bridge: $committedMessage") + }.recover { e -> + logger.error("Failed to send committed message to Stove Kafka Bridge: $committedMessage", e) + } + } + + private suspend fun send(publishedMessage: PublishedMessage) { + Try { + client.onPublishedMessage().execute(publishedMessage) + }.map { + logger.info("Published message sent to Stove Kafka Bridge: $publishedMessage") + }.recover { e -> + logger.error("Failed to send published message to Stove Kafka Bridge: $publishedMessage", e) + } + } + + private fun consumedMessages(records: ConsumerRecords) = records.map { + ConsumedMessage( + key = it.key().toString(), + message = mapper.writeValueAsString(it.value()), + topic = it.topic(), + headers = it.headers().associate { it.key() to it.value().toString() } + ) + } + + private fun publishedMessage(record: ProducerRecord) = ConsumedMessage( + key = record.key().toString(), + message = mapper.writeValueAsString(record.value()), + topic = record.topic(), + headers = record.headers().associate { it.key() to it.value().toString() } + ) + + private fun committedMessages( + offsets: MutableMap + ): List = offsets.map { + CommittedMessage( + topic = it.key.topic(), + partition = it.key.partition(), + offset = it.value.offset() + ) + } + + private fun startGrpcClient(): StoveKafkaObserverServiceClient { + val onPort = System.getenv(STOVE_KAFKA_BRIDGE_PORT) ?: "50051" + logger.info("Connecting to Stove Kafka Bridge on port $onPort") + return Try { createClient(onPort) } + .map { + logger.info("Stove Kafka Observer Client created on port $onPort") + it + }.getOrElse { error("failed to connect Stove Kafka observer client") } + } + + private fun createClient(onPort: String) = GrpcClient.Builder() + .client(OkHttpClient.Builder().protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE)).build()) + .baseUrl("http://localhost:$onPort".toHttpUrl()) + .build() + .create() +} diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaObserverGrpcServerAdapter.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaObserverGrpcServerAdapter.kt new file mode 100644 index 00000000..f902965e --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaObserverGrpcServerAdapter.kt @@ -0,0 +1,32 @@ +package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting + +import CommittedMessage +import ConsumedMessage +import PublishedMessage +import Reply +import StoveKafkaObserverServiceWireGrpc +import org.slf4j.* + +class StoveKafkaObserverGrpcServerAdapter( + private val sink: TestSystemMessageSink +) : StoveKafkaObserverServiceWireGrpc.StoveKafkaObserverServiceImplBase() { + private val logger: Logger = LoggerFactory.getLogger(javaClass) + + override suspend fun onPublishedMessage(request: PublishedMessage): Reply { + logger.info("Received published message: $request") + sink.onMessagePublished(request) + return Reply(status = 200) + } + + override suspend fun onConsumedMessage(request: ConsumedMessage): Reply { + logger.info("Received consumed message: $request") + sink.onMessageConsumed(request) + return Reply(status = 200) + } + + override suspend fun onCommittedMessage(request: CommittedMessage): Reply { + logger.info("Received committed message: $request") + sink.onMessageCommitted(request) + return Reply(status = 200) + } +} diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemInterceptor.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemInterceptor.kt deleted file mode 100644 index 3f6e8427..00000000 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemInterceptor.kt +++ /dev/null @@ -1,43 +0,0 @@ -package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting - -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.kafka.clients.admin.Admin -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import java.util.UUID -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentMap - -class TestSystemKafkaInterceptor( - override val adminClient: Admin, - override val serde: ObjectMapper, - private val options: InterceptionOptions -) : ConsumingOps, CommonOps, AutoCloseable { - override val logger: Logger = LoggerFactory.getLogger(javaClass) - override val consumedRecords: ConcurrentMap> = ConcurrentHashMap() - override val exceptions: ConcurrentMap = ConcurrentHashMap() - override val assertions: ConcurrentMap> = ConcurrentHashMap() - - // TODO: Start a hook for failed kafka events - - fun onMessage( - record: ConsumerRecord, - consumer: Consumer - ): Unit = - when { - options.isErrorTopic(record.topic()) -> recordError(record) - else -> recordMessage(record, consumer) - } - - private lateinit var canceller: (message: String, cause: Throwable) -> Unit - - fun registerCancellationFunc(canceller: (message: String, cause: Throwable) -> Unit) { - this.canceller = canceller - } - - override fun close() { - TODO("Not yet implemented") - } -} diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemMessageSink.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemMessageSink.kt new file mode 100644 index 00000000..f78b762b --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/TestSystemMessageSink.kt @@ -0,0 +1,32 @@ +package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting + +import CommittedMessage +import ConsumedMessage +import PublishedMessage +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.admin.Admin +import org.slf4j.* +import java.util.* +import java.util.concurrent.* + +class TestSystemMessageSink( + override val adminClient: Admin, + override val serde: ObjectMapper, + private val options: InterceptionOptions +) : ConsumingOps, CommonOps { + override val logger: Logger = LoggerFactory.getLogger(javaClass) + override val consumedRecords: ConcurrentMap = ConcurrentHashMap() + override val publishedMessages: ConcurrentMap = ConcurrentHashMap() + override val committedMessages: ConcurrentMap = ConcurrentHashMap() + override val exceptions: ConcurrentMap = ConcurrentHashMap() + override val assertions: ConcurrentMap> = ConcurrentHashMap() + + fun onMessageConsumed(record: ConsumedMessage): Unit = when { + options.isErrorTopic(record.topic) -> recordError(record) + else -> recordConsumedMessage(record) + } + + fun onMessagePublished(record: PublishedMessage): Unit = recordPublishedMessage(record) + + fun onMessageCommitted(record: CommittedMessage): Unit = recordCommittedMessage(record) +} diff --git a/lib/stove-testing-e2e-kafka/src/main/proto/messages.proto b/lib/stove-testing-e2e-kafka/src/main/proto/messages.proto new file mode 100644 index 00000000..3d997fe1 --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/proto/messages.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +message ConsumedMessage { + string message = 1; + string topic = 2; + string partition = 3; + string offset = 4; + string key = 5; + map headers = 8; +} + +message PublishedMessage { + string message = 1; + string topic = 2; + string key = 3; + map headers = 4; +} + +message CommittedMessage { + string topic = 1; + int32 partition = 2; + int64 offset = 3; +} + +message Reply { + int32 status = 3; +} + +service StoveKafkaObserverService { + // buf:lint:ignore RPC_REQUEST_RESPONSE_UNIQUE + rpc onConsumedMessage(ConsumedMessage) returns (Reply) {} + + // buf:lint:ignore RPC_REQUEST_RESPONSE_UNIQUE + rpc onPublishedMessage(PublishedMessage) returns (Reply) {} + + // buf:lint:ignore RPC_REQUEST_RESPONSE_UNIQUE + rpc onCommittedMessage(CommittedMessage) returns (Reply) {} +} diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/ProjectConfig.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/ProjectConfig.kt index 47983da8..2b21942a 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/ProjectConfig.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/ProjectConfig.kt @@ -10,7 +10,9 @@ import io.kotest.core.config.AbstractProjectConfig import io.kotest.core.listeners.* import org.apache.kafka.clients.admin.* import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.* +import java.util.* class KafkaApplicationUnderTest : ApplicationUnderTest { private lateinit var client: AdminClient @@ -31,19 +33,25 @@ class KafkaApplicationUnderTest : ApplicationUnderTest { } private suspend fun startConsumers(bootStrapServers: String) { - val consumerSettings = - mapOf( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootStrapServers, - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StoveKafkaValueDeserializer::class.java, - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, - ConsumerConfig.GROUP_ID_CONFIG to "stove-application-consumers" - ) + val consumerSettings = mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootStrapServers, + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StoveKafkaValueDeserializer::class.java, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.GROUP_ID_CONFIG to "stove-application-consumers", + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf("com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.StoveKafkaBridge") + ) val producerSettings = PublisherSettings( bootStrapServers, StringSerializer(), - StoveKafkaValueSerializer() + StoveKafkaValueSerializer(), + properties = Properties().apply { + put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + listOf("com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.StoveKafkaBridge") + ) + } ) val listeners = KafkaTestShared.consumers(consumerSettings, producerSettings) diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/KafkaTestShared.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/KafkaTestShared.kt index f96605ed..b780f12e 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/KafkaTestShared.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/KafkaTestShared.kt @@ -15,13 +15,14 @@ object KafkaTestShared { TopicDefinition("productFailing", "productFailing.retry", "productFailing.error"), TopicDefinition("backlog", "backlog.retry", "backlog.error") ) - val consumers: - (consumerSettings: Map, producerSettings: PublisherSettings) -> List = - { a, b -> - listOf( - ProductConsumer(a, b), - BacklogConsumer(a, b), - ProductFailingConsumer(a, b) - ) - } + val consumers: ( + consumerSettings: Map, + producerSettings: PublisherSettings + ) -> List = { a, b -> + listOf( + ProductConsumer(a, b), + BacklogConsumer(a, b), + ProductFailingConsumer(a, b) + ) + } } diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt index 48a94c0a..6c833716 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt @@ -27,31 +27,30 @@ abstract class StoveListener( suspend fun start() { consumer.subscribe(listOf(topicDefinition.topic, topicDefinition.retryTopic, topicDefinition.retryTopic)) val retryStrategy = mutableMapOf() - consuming = - GlobalScope.launch { - while (!consuming.isCancelled) { - consumer - .poll(Duration.ofMillis(100)) - .asSequence() - .asFlow() - .collect { message -> - logger.info("Message RECEIVED on the application side: ${message.value()}") - Try { listen(message) } - .map { - consumer.commitSync() - logger.info("Message COMMITTED on the application side: ${message.value()}") + consuming = GlobalScope.launch { + while (!consuming.isCancelled) { + consumer + .poll(Duration.ofMillis(100)) + .asSequence() + .asFlow() + .collect { message -> + logger.info("Message RECEIVED on the application side: ${message.value()}") + Try { listen(message) } + .map { + consumer.commitSync() + logger.info("Message COMMITTED on the application side: ${message.value()}") + } + .recover { + logger.warn("CONSUMER GOT an ERROR on the application side, exception: $it") + retryStrategy[message.value()] = retryStrategy.getOrPut(message.value()) { 0 } + 1 + if (retryStrategy[message.value()]!! < 3) { + logger.warn("CONSUMER GOT an ERROR, retrying...") + publisher.publishScope { offer(ProducerRecord(topicDefinition.retryTopic, message.value())) } } - .recover { - logger.warn("CONSUMER GOT an ERROR on the application side, exception: $it") - retryStrategy[message.value()] = retryStrategy.getOrPut(message.value()) { 0 } + 1 - if (retryStrategy[message.value()]!! < 3) { - logger.warn("CONSUMER GOT an ERROR, retrying...") - publisher.publishScope { offer(ProducerRecord(topicDefinition.retryTopic, message.value())) } - } - } - } - } + } + } } + } } abstract suspend fun listen(record: ConsumerRecord) diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/BacklogConsumer.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/BacklogConsumer.kt index 0566ef0d..54bf5cb6 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/BacklogConsumer.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/BacklogConsumer.kt @@ -9,7 +9,10 @@ class BacklogConsumer( consumerSettings: Map, producerSettings: PublisherSettings ) : StoveListener(consumerSettings, producerSettings) { + private val logger = org.slf4j.LoggerFactory.getLogger(javaClass) override val topicDefinition: TopicDefinition = TopicDefinition("backlog", "backlog.retry", "backlog.error") - override suspend fun listen(record: ConsumerRecord) = Unit + override suspend fun listen(record: ConsumerRecord) { + logger.info("Backlog consumed: ${record.value()}") + } } diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductConsumer.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductConsumer.kt index ee0c6abd..166d782e 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductConsumer.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductConsumer.kt @@ -9,7 +9,10 @@ class ProductConsumer( consumerSettings: Map, producerSettings: PublisherSettings ) : StoveListener(consumerSettings, producerSettings) { + private val logger = org.slf4j.LoggerFactory.getLogger(javaClass) override val topicDefinition: TopicDefinition = TopicDefinition("product", "product.retry", "product.error") - override suspend fun listen(record: ConsumerRecord) = Unit + override suspend fun listen(record: ConsumerRecord) { + logger.info("Product consumed: ${record.value()}") + } } diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductFailingConsumer.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductFailingConsumer.kt index d7688a5e..d26264e4 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductFailingConsumer.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductFailingConsumer.kt @@ -10,8 +10,11 @@ class ProductFailingConsumer( consumerSettings: Map, producerSettings: PublisherSettings ) : StoveListener(consumerSettings, producerSettings) { - override val topicDefinition: TopicDefinition = - TopicDefinition("productFailing", "productFailing.retry", "productFailing.error") + override val topicDefinition: TopicDefinition = TopicDefinition( + "productFailing", + "productFailing.retry", + "productFailing.error" + ) override suspend fun listen(record: ConsumerRecord) { throw Exception("exception occurred on purpose") diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/tests/KafkaSystemTests.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/tests/KafkaSystemTests.kt index 84fc2875..ea943142 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/tests/KafkaSystemTests.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/tests/KafkaSystemTests.kt @@ -3,31 +3,26 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.tests import com.trendyol.stove.testing.e2e.standalone.kafka.kafka import com.trendyol.stove.testing.e2e.standalone.kafka.setup.DomainEvents.ProductCreated import com.trendyol.stove.testing.e2e.standalone.kafka.setup.DomainEvents.ProductFailingCreated -import com.trendyol.stove.testing.e2e.system.TestSystem +import com.trendyol.stove.testing.e2e.system.TestSystem.Companion.validate import io.kotest.core.spec.style.FunSpec class KafkaSystemTests : FunSpec({ - xtest("When publish then it should work") { - TestSystem.validate { + test("When publish then it should work") { + validate { kafka { publish("product", ProductCreated("1")) - publish("product", ProductCreated("2")) - publish("product", ProductCreated("3")) + shouldBeConsumed(message = ProductCreated("1")) } } - - // delay(5000) } - xtest("When publish to a failing consumer should end-up throwing exception") { - TestSystem.validate { + test("When publish to a failing consumer should end-up throwing exception") { + validate { kafka { publish("productFailing", ProductFailingCreated("1")) shouldBeConsumed(message = ProductFailingCreated("1")) } } - - // delay(5000) } })