Skip to content

Commit

Permalink
renovate: (couchbase.client) fix deprecated
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed Nov 23, 2023
1 parent 756fe6f commit baf8a98
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 114 deletions.
15 changes: 10 additions & 5 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ root = true

[*]
insert_final_newline = true
ktlint_standard_no-wildcard-imports = disabled
ktlint_standard_package-name = disabled
ktlint_standard_filename = disabled
ktlint_standard_no-wildcard-imports = disabled
ktlint_standard_multiline-expression-wrapping = disabled
ktlint_standard_string-template-indent = disabled
ktlint_standard_function-signature = disabled

[{*.kt,*.kts}]
indent_style = space
max_line_length = 140
ij_kotlin_code_style_defaults = KOTLIN_OFFICIAL
ij_continuation_indent_size = 2
ij_kotlin_allow_trailing_comma = false
ij_kotlin_allow_trailing_comma_on_call_site = false
max_line_length = 140
ij_kotlin_name_count_to_use_star_import = 5
ij_kotlin_name_count_to_use_star_import = 2
ij_kotlin_name_count_to_use_star_import_for_members = 2

[{**/test/**.kt, **/test-e2e/**.kt, **/test-int/**.kt}]
max_line_length = 200
[{**/test/**.kt,**/test-e2e/**.kt,**/test-int/**.kt}]
max_line_length = 240
ktlint_standard_no-consecutive-comments = disabled
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.trendyol.stove.testing.e2e.couchbase

import com.couchbase.client.java.ReactiveCluster
import com.couchbase.client.java.manager.collection.CollectionSpec
import com.couchbase.client.java.manager.collection.CreateCollectionSettings
import com.trendyol.stove.testing.e2e.database.migrations.DatabaseMigration
import com.trendyol.stove.testing.e2e.database.migrations.MigrationPriority
import com.trendyol.stove.testing.e2e.system.TestSystem
Expand Down Expand Up @@ -51,7 +51,7 @@ class DefaultMigration : DatabaseMigration<ReactiveCluster> {
override suspend fun execute(connection: ReactiveCluster) {
connection
.bucket(TEST_BUCKET)
.collections().createCollection(CollectionSpec.create("another", "_default"))
.collections().createCollection("another", "_default", CreateCollectionSettings.createCollectionSettings())
.awaitFirstOrNull()

connection.bucket(TEST_BUCKET).waitUntilReady(Duration.ofSeconds(30)).awaitFirstOrNull()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
package com.trendyol.stove.testing.e2e.kafka

import arrow.core.firstOrNone
import arrow.core.getOrElse
import arrow.core.toOption
import arrow.core.*
import com.fasterxml.jackson.databind.ObjectMapper
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.CompositeRecordInterceptor
import org.springframework.kafka.listener.ListenerExecutionFailedException
import kotlinx.coroutines.*
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.clients.producer.*
import org.slf4j.*
import org.springframework.kafka.listener.*
import org.springframework.kafka.support.ProducerListener
import org.springframework.stereotype.Component
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.*
import kotlin.reflect.KClass
import kotlin.time.Duration

Expand All @@ -36,91 +26,87 @@ class TestSystemKafkaInterceptor(private val objectMapper: ObjectMapper) :
override fun success(
record: ConsumerRecord<String, String>,
consumer: Consumer<String, String>
): Unit =
runBlocking {
consumedRecords.putIfAbsent(UUID.randomUUID(), record)
logger.info(
"""
): Unit = runBlocking {
consumedRecords.putIfAbsent(UUID.randomUUID(), record)
logger.info(
"""
SUCCESSFULLY CONSUMED:
Consumer: ${consumer.groupMetadata().memberId()}
Topic: ${record.topic()}
Record: ${record.value()}
Key: ${record.key()}
Headers: ${record.headers().map { Pair(it.key(), String(it.value())) }}
TestCase: ${record.headers().firstOrNone { it.key() == "testCase" }.map { String(it.value()) }.getOrElse { "" }}
""".trimIndent()
)
}
""".trimIndent()
)
}

override fun onSuccess(
record: ProducerRecord<String, Any>,
recordMetadata: RecordMetadata
): Unit =
runBlocking {
producedRecords.putIfAbsent(UUID.randomUUID(), record)
logger.info(
"""
): Unit = runBlocking {
producedRecords.putIfAbsent(UUID.randomUUID(), record)
logger.info(
"""
SUCCESSFULLY PUBLISHED:
Topic: ${record.topic()}
Record: ${record.value()}
Key: ${record.key()}
Headers: ${record.headers().map { Pair(it.key(), String(it.value())) }}
TestCase: ${record.headers().firstOrNone { it.key() == "testCase" }.map { String(it.value()) }.getOrElse { "" }}
""".trimIndent()
)
}
""".trimIndent()
)
}

override fun onError(
record: ProducerRecord<String, Any>,
recordMetadata: RecordMetadata?,
exception: Exception
): Unit =
runBlocking {
exceptions.putIfAbsent(
UUID.randomUUID(),
Failure(
ObservedMessage(record.value().toString(), record.toMetadata()),
extractCause(exception)
)
): Unit = runBlocking {
exceptions.putIfAbsent(
UUID.randomUUID(),
Failure(
ObservedMessage(record.value().toString(), record.toMetadata()),
extractCause(exception)
)
logger.error(
"""
)
logger.error(
"""
PRODUCER GOT AN ERROR:
Topic: ${record.topic()}
Record: ${record.value()}
Key: ${record.key()}
Headers: ${record.headers().map { Pair(it.key(), String(it.value())) }}
TestCase: ${record.headers().firstOrNone { it.key() == "testCase" }.map { String(it.value()) }.getOrElse { "" }}
Exception: $exception
""".trimIndent()
)
}
""".trimIndent()
)
}

override fun failure(
record: ConsumerRecord<String, String>,
exception: Exception,
consumer: Consumer<String, String>
): Unit =
runBlocking {
exceptions.putIfAbsent(
UUID.randomUUID(),
Failure(
ObservedMessage(record.value().toString(), record.toMetadata()),
extractCause(exception)
)
): Unit = runBlocking {
exceptions.putIfAbsent(
UUID.randomUUID(),
Failure(
ObservedMessage(record.value().toString(), record.toMetadata()),
extractCause(exception)
)
logger.error(
"""
)
logger.error(
"""
CONSUMER GOT AN ERROR:
Topic: ${record.topic()}
Record: ${record.value()}
Key: ${record.key()}
Headers: ${record.headers().map { Pair(it.key(), String(it.value())) }}
TestCase: ${record.headers().firstOrNone { it.key() == "testCase" }.map { String(it.value()) }.getOrElse { "" }}
Exception: $exception
""".trimIndent()
)
}
""".trimIndent()
)
}

internal suspend fun <T : Any> waitUntilConsumed(
atLeastIn: Duration,
Expand Down Expand Up @@ -201,55 +187,52 @@ class TestSystemKafkaInterceptor(private val objectMapper: ObjectMapper) :
private fun <T : Any> throwIfSucceeded(
clazz: KClass<T>,
selector: (FailedParsedMessage<T>) -> Boolean
): Unit =
consumedRecords
.filter { record ->
selector(
FailedParsedMessage(
ParsedMessage(readCatching(record.value.value(), clazz).getOrNull().toOption(), record.value.toMetadata()),
getExceptionFor(clazz, selector)
)
): Unit = consumedRecords
.filter { record ->
selector(
FailedParsedMessage(
ParsedMessage(readCatching(record.value.value(), clazz).getOrNull().toOption(), record.value.toMetadata()),
getExceptionFor(clazz, selector)
)
}
.forEach { throw AssertionError("Expected to fail but succeeded: $it") }
)
}
.forEach { throw AssertionError("Expected to fail but succeeded: $it") }

private fun <T : Any> getExceptionFor(
clazz: KClass<T>,
selector: (message: FailedParsedMessage<T>) -> Boolean
): Throwable =
exceptions
.map { it.value }
.first {
selector(
FailedParsedMessage(
ParsedMessage(
readCatching(it.message.actual.toString(), clazz).getOrNull().toOption(),
it.message.metadata
),
it.reason
)
): Throwable = exceptions
.map { it.value }
.first {
selector(
FailedParsedMessage(
ParsedMessage(
readCatching(it.message.actual.toString(), clazz).getOrNull().toOption(),
it.message.metadata
),
it.reason
)
}
.reason
)
}
.reason

private suspend fun <T : Any> (() -> Collection<T>).waitUntilConditionMet(
duration: Duration,
subject: String,
condition: (T) -> Boolean
): Collection<T> =
runCatching {
val collectionFunc = this
withTimeout(duration) { while (!collectionFunc().any { condition(it) }) delay(50) }
return collectionFunc().filter { condition(it) }
}.recoverCatching {
when (it) {
is TimeoutCancellationException -> throw AssertionError("GOT A TIMEOUT: $subject. ${dumpMessages()}")
is ConcurrentModificationException ->
Result.success(waitUntilConditionMet(duration, subject, condition))
): Collection<T> = runCatching {
val collectionFunc = this
withTimeout(duration) { while (!collectionFunc().any { condition(it) }) delay(50) }
return collectionFunc().filter { condition(it) }
}.recoverCatching {
when (it) {
is TimeoutCancellationException -> throw AssertionError("GOT A TIMEOUT: $subject. ${dumpMessages()}")
is ConcurrentModificationException ->
Result.success(waitUntilConditionMet(duration, subject, condition))

else -> throw it
}.getOrThrow()
else -> throw it
}.getOrThrow()
}.getOrThrow()

private fun dumpMessages(): String =
"""
Expand All @@ -262,16 +245,14 @@ class TestSystemKafkaInterceptor(private val objectMapper: ObjectMapper) :
""".trimIndent()
}

internal fun <K, V : Any> ProducerRecord<K, V>.toMetadata(): MessageMetadata =
MessageMetadata(
this.topic(),
this.key().toString(),
this.headers().associate { h -> Pair(h.key(), String(h.value())) }
)
internal fun <K, V : Any> ProducerRecord<K, V>.toMetadata(): MessageMetadata = MessageMetadata(
this.topic(),
this.key().toString(),
this.headers().associate { h -> Pair(h.key(), String(h.value())) }
)

internal fun <K, V : Any> ConsumerRecord<K, V>.toMetadata(): MessageMetadata =
MessageMetadata(
this.topic(),
this.key().toString(),
this.headers().associate { h -> Pair(h.key(), String(h.value())) }
)
internal fun <K, V : Any> ConsumerRecord<K, V>.toMetadata(): MessageMetadata = MessageMetadata(
this.topic(),
this.key().toString(),
this.headers().associate { h -> Pair(h.key(), String(h.value())) }
)

0 comments on commit baf8a98

Please sign in to comment.