Skip to content

Commit

Permalink
spring-kafka: publish method supports partition
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 2, 2024
1 parent 318c02c commit 6b332ff
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.trendyol.stove.testing.e2e.kafka

import arrow.core.Option
import com.trendyol.stove.testing.e2e.messaging.MessageMetadata
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
Expand All @@ -19,37 +20,40 @@ internal fun <K, V : Any> ConsumerRecord<K, V>.toMetadata(): MessageMetadata = M
data class StoveConsumedMessage(
val topic: String,
val value: String,
val offset: Long,
val partition: Int,
val key: String,
val metadata: MessageMetadata,
val timeStamp: Long
val offset: Long?,
val partition: Int?,
val key: String?,
val timeStamp: Long?
)

data class StovePublishedMessage(
val topic: String,
val value: String,
val partition: Int,
val key: String,
val metadata: MessageMetadata,
val timeStamp: Long
val partition: Int?,
val key: String?,
val timeStamp: Long?
)

internal fun <K, V : Any> ConsumerRecord<K, V>.toStoveConsumedMessage(): StoveConsumedMessage = StoveConsumedMessage(
this.topic(),
this.value().toString(),
this.toMetadata(),
this.offset(),
this.partition(),
this.key().toString(),
this.toMetadata(),
this.timestamp()
)

internal fun <K, V : Any> ProducerRecord<K, V>.toStovePublishedMessage(): StovePublishedMessage = StovePublishedMessage(
this.topic(),
this.value().toString(),
this.toMetadata(),
this.partition(),
this.key().toString(),
this.toMetadata(),
this.timestamp()
)

internal fun (MutableMap<String, String>).addTestCase(testCase: Option<String>): MutableMap<String, String> =
if (this.containsKey("testCase")) this else testCase.map { this["testCase"] = it }.let { this }
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import org.slf4j.*
import org.springframework.beans.factory.getBean
import org.springframework.context.ApplicationContext
import org.springframework.kafka.core.KafkaTemplate
import kotlin.collections.set
import kotlin.reflect.KClass
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
Expand All @@ -27,8 +26,11 @@ class KafkaSystem(
private lateinit var kafkaTemplate: KafkaTemplate<String, Any>
private lateinit var exposedConfiguration: KafkaExposedConfiguration
val getInterceptor: () -> TestSystemKafkaInterceptor = { applicationContext.getBean() }
private val state: StateOfSystem<KafkaSystem, KafkaExposedConfiguration> =
StateOfSystem(testSystem.options, javaClass.kotlin, KafkaExposedConfiguration::class)
private val state: StateOfSystem<KafkaSystem, KafkaExposedConfiguration> = StateOfSystem(
testSystem.options,
javaClass.kotlin,
KafkaExposedConfiguration::class
)

override suspend fun beforeRun() = Unit

Expand All @@ -46,32 +48,19 @@ class KafkaSystem(
kafkaTemplate.setProducerListener(getInterceptor())
}

override fun configuration(): List<String> = context.options.configureExposedConfiguration(exposedConfiguration)

override suspend fun stop(): Unit = context.container.stop()

override fun close(): Unit =
runBlocking {
Try {
kafkaTemplate.destroy()
executeWithReuseCheck { stop() }
}.recover {
logger.warn("got an error while closing KafkaSystem", it)
}
}

@KafkaDsl
suspend fun publish(
topic: String,
message: Any,
key: Option<String> = None,
partition: Option<Int> = None,
headers: Map<String, String> = mapOf(),
testCase: Option<String> = None
): KafkaSystem {
val record = ProducerRecord<String, Any>(
topic,
0,
key.getOrElse { "" },
partition.getOrNull(),
key.getOrNull(),
context.objectMapper.writeValueAsString(message),
headers.toMutableMap().addTestCase(testCase).map { RecordHeader(it.key, it.value.toByteArray()) }
)
Expand Down Expand Up @@ -138,7 +127,17 @@ class KafkaSystem(
atLeastIn: Duration,
condition: (message: ParsedMessage<T>) -> Boolean
): Unit = coroutineScope { getInterceptor().waitUntilPublished(atLeastIn, clazz, condition) }
}

private fun (MutableMap<String, String>).addTestCase(testCase: Option<String>): MutableMap<String, String> =
if (this.containsKey("testCase")) this else testCase.map { this["testCase"] = it }.let { this }
override fun configuration(): List<String> = context.options.configureExposedConfiguration(exposedConfiguration)

override suspend fun stop(): Unit = context.container.stop()

override fun close(): Unit = runBlocking {
Try {
kafkaTemplate.destroy()
executeWithReuseCheck { stop() }
}.recover {
logger.warn("got an error while closing KafkaSystem", it)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ open class KafkaTestSpringBotApplication {
@Suppress("TooGenericExceptionThrown")
fun listen_failed(message: String) {
logger.info("Received Message in failed consumer: $message")
throw RuntimeException("Failed to consume message")
throw StoveBusinessException("This exception is thrown intentionally for testing purposes.")
}

@KafkaListener(topics = ["topic-failed.DLT"], groupId = "group_id")
Expand All @@ -118,6 +118,8 @@ open class KafkaTestSpringBotApplication {
}
}

class StoveBusinessException(message: String) : Exception(message)

class Setup : AbstractProjectConfig() {
override suspend fun beforeProject(): Unit =
TestSystem()
Expand Down Expand Up @@ -177,7 +179,7 @@ class KafkaSystemTests : ShouldSpec({
actual == message && this.metadata.headers["x-user-id"] == "1" && this.metadata.topic == "topic-failed"
}
shouldBeFailed<Any> {
actual == message && this.metadata.headers["x-user-id"] == "1" && this.metadata.topic == "topic-failed" && reason is RuntimeException
actual == message && this.metadata.headers["x-user-id"] == "1" && this.metadata.topic == "topic-failed" && reason is StoveBusinessException
}
}
}
Expand Down

0 comments on commit 6b332ff

Please sign in to comment.