Skip to content

Commit

Permalink
use couchbase kotlin client (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan authored Feb 8, 2024
1 parent 643841a commit e010b1b
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 166 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ subprojects.of("lib", "spring", "examples", "ktor") {
}
dependsOn(formatKotlin)
useJUnitPlatform()
ignoreFailures = true
testlogger {
setTheme("mocha")
showStandardStreams = true
Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ spring-kafka = "2.9.13"
spring-kafka-3x = "3.1.1"
couchbase-client = "3.5.3"
couchbase-client-metrics = "0.5.3"
couchbase-kotlin = "1.2.3"
jackson = "2.16.1"
arrow = "1.2.1"
io-reactor = "3.6.2"
Expand Down Expand Up @@ -44,6 +45,7 @@ kotlinx-io-reactor-extensions = { module = "io.projectreactor.kotlin:reactor-kot
arrow-core = { module = "io.arrow-kt:arrow-core", version.ref = "arrow" }
kafka = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" }
kafkaKotlin = { module = "io.github.nomisrev:kotlin-kafka", version.ref = "kafka-kotlin" }
couchbase-kotlin = { module = "com.couchbase.client:kotlin-client", version.ref = "couchbase-kotlin" }
couchbase-client = { module = "com.couchbase.client:java-client", version.ref = "couchbase-client" }
couchbase-client-metrics = { module = "com.couchbase.client:metrics-micrometer", version.ref = "couchbase-client-metrics" }
jackson-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
Expand Down
3 changes: 1 addition & 2 deletions lib/stove-testing-e2e-couchbase/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
dependencies {
api(projects.lib.stoveTestingE2e)
implementation(libs.couchbase.kotlin)
implementation(libs.testcontainers.couchbase)
implementation(libs.couchbase.client)
implementation(libs.kotlinx.reactive)
}

dependencies {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
package com.trendyol.stove.testing.e2e.couchbase

import com.couchbase.client.core.error.DocumentNotFoundException
import com.couchbase.client.core.msg.kv.DurabilityLevel.PERSIST_TO_MAJORITY
import com.couchbase.client.java.*
import com.couchbase.client.java.codec.JacksonJsonSerializer
import com.couchbase.client.java.env.ClusterEnvironment
import com.couchbase.client.java.json.JsonObject
import com.couchbase.client.java.kv.InsertOptions
import com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS
import com.couchbase.client.kotlin.*
import com.couchbase.client.kotlin.Collection
import com.couchbase.client.kotlin.codec.*
import com.couchbase.client.kotlin.query.execute
import com.fasterxml.jackson.databind.ObjectMapper
import com.trendyol.stove.functional.*
import com.trendyol.stove.testing.e2e.couchbase.ClusterExtensions.executeQueryAs
import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.system.abstractions.*
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.runBlocking
import org.slf4j.*
import reactor.core.publisher.Mono

@CouchbaseDsl
class CouchbaseSystem internal constructor(
override val testSystem: TestSystem,
val context: CouchbaseContext
) : PluggedSystem, RunAware, ExposesConfiguration {
@PublishedApi
internal lateinit var cluster: ReactiveCluster
internal lateinit var cluster: Cluster

@PublishedApi
internal lateinit var collection: ReactiveCollection
internal lateinit var collection: Collection

@PublishedApi
internal val objectMapper: ObjectMapper = context.options.objectMapper

private lateinit var exposedConfiguration: CouchbaseExposedConfiguration
private val logger: Logger = LoggerFactory.getLogger(javaClass)
private val state: StateOfSystem<CouchbaseSystem, CouchbaseExposedConfiguration> =
StateOfSystem(
testSystem.options,
CouchbaseSystem::class,
CouchbaseExposedConfiguration::class
)
private val state: StateOfSystem<CouchbaseSystem, CouchbaseExposedConfiguration> = StateOfSystem(
testSystem.options,
CouchbaseSystem::class,
CouchbaseExposedConfiguration::class
)

override suspend fun run() {
exposedConfiguration =
Expand Down Expand Up @@ -71,18 +63,18 @@ class CouchbaseSystem internal constructor(
"couchbase.password=${exposedConfiguration.password}"
)

@Suppress("unused")
@CouchbaseDsl
suspend inline fun <reified T : Any> shouldQuery(
query: String,
assertion: (List<T>) -> Unit
): CouchbaseSystem {
val result = cluster.executeQueryAs<Any>(query) { queryOptions -> queryOptions.scanConsistency(REQUEST_PLUS) }

val objects =
result
.map { objectMapper.writeValueAsString(it) }
.map { objectMapper.readValue(it, T::class.java) }
val result = cluster.query(
statement = query,
metrics = false
).execute().rows.map { it.contentAs<T>() }
val objects = result
.map { objectMapper.writeValueAsString(it) }
.map { objectMapper.readValue(it, T::class.java) }

assertion(objects)
return this
Expand All @@ -94,7 +86,7 @@ class CouchbaseSystem internal constructor(
assertion: (T) -> Unit
): CouchbaseSystem =
collection.get(key)
.awaitSingle().contentAs(T::class.java)
.contentAs<T>()
.let(assertion)
.let { this }

Expand All @@ -106,81 +98,56 @@ class CouchbaseSystem internal constructor(
): CouchbaseSystem =
cluster.bucket(context.bucket.name)
.collection(collection)
.get(key).awaitSingle()
.contentAs(T::class.java)
.get(key)
.contentAs<T>()
.let(assertion)
.let { this }

@CouchbaseDsl
suspend fun shouldNotExist(key: String): CouchbaseSystem =
when (
collection.get(key)
.onErrorResume { throwable ->
when (throwable) {
is DocumentNotFoundException -> Mono.empty()
else -> throw throwable
}
}.awaitFirstOrNull()
) {
null -> this
else -> throw AssertionError("The document with the given id($key) was not expected, but found!")
}
suspend fun shouldNotExist(key: String): CouchbaseSystem = when (collection.getOrNull(key)) {
null -> this
else -> throw AssertionError("The document with the given id($key) was not expected, but found!")
}

@CouchbaseDsl
suspend fun shouldNotExist(
collection: String,
key: String
): CouchbaseSystem =
when (
cluster
.bucket(context.bucket.name)
.collection(collection)
.get(key)
.onErrorResume { throwable ->
when (throwable) {
is DocumentNotFoundException -> Mono.empty()
else -> throw throwable
}
}.awaitFirstOrNull()
) {
null -> this
else -> throw AssertionError("The document with the given id($key) was not expected, but found!")
}
): CouchbaseSystem = when (
cluster
.bucket(context.bucket.name)
.collection(collection)
.getOrNull(key)
) {
null -> this
else -> throw AssertionError("The document with the given id($key) was not expected, but found!")
}

@CouchbaseDsl
suspend fun shouldDelete(key: String): CouchbaseSystem =
collection.remove(key).awaitSingle()
.let { this }
suspend fun shouldDelete(key: String): CouchbaseSystem = collection.remove(key).let { this }

@CouchbaseDsl
suspend fun shouldDelete(
collection: String,
key: String
): CouchbaseSystem =
cluster.bucket(context.bucket.name)
.collection(collection)
.remove(key)
.awaitSingle().let { this }
): CouchbaseSystem = cluster.bucket(context.bucket.name)
.collection(collection)
.remove(key)
.let { this }

/**
* Saves the [instance] with given [id] to the [collection]
* To save to the default collection use [saveToDefaultCollection]
*/
@CouchbaseDsl
suspend fun <T : Any> save(
suspend inline fun <reified T : Any> save(
collection: String,
id: String,
instance: T
): CouchbaseSystem =
cluster
.bucket(context.bucket.name)
.collection(collection)
.insert(
id,
JsonObject.fromJson(objectMapper.writeValueAsString(instance)),
InsertOptions.insertOptions().durability(PERSIST_TO_MAJORITY)
)
.awaitSingle().let { this }
): CouchbaseSystem = cluster
.bucket(context.bucket.name)
.collection(collection)
.insert(id, instance).let { this }

/**
* Saves the [instance] with given [id] to the default collection
Expand All @@ -192,40 +159,32 @@ class CouchbaseSystem internal constructor(
instance: T
): CouchbaseSystem = this.save("_default", id, instance)

override fun close(): Unit =
runBlocking {
Try {
cluster.disconnect().awaitSingle()
executeWithReuseCheck { stop() }
}.recover {
logger.warn("Disconnecting the couchbase cluster got an error: $it")
}
override fun close(): Unit = runBlocking {
Try {
cluster.disconnect()
executeWithReuseCheck { stop() }
}.recover {
logger.warn("Disconnecting the couchbase cluster got an error: $it")
}
}

private fun createCluster(exposedConfiguration: CouchbaseExposedConfiguration): ReactiveCluster =
ClusterEnvironment.builder()
.jsonSerializer(JacksonJsonSerializer.create(objectMapper))
.build()
.let {
Cluster.connect(
exposedConfiguration.hostsWithPort,
ClusterOptions
.clusterOptions(exposedConfiguration.username, exposedConfiguration.password)
.environment(it)
).reactive()
}
private fun createCluster(exposedConfiguration: CouchbaseExposedConfiguration): Cluster {
val jackson = JacksonJsonSerializer(objectMapper)
return Cluster.connect(
exposedConfiguration.hostsWithPort,
exposedConfiguration.username,
exposedConfiguration.password
) {
jsonSerializer = jackson
transcoder = JsonTranscoder(jackson)
}
}

companion object {
/**
* Exposes the [ReactiveCluster] of the [CouchbaseSystem]
*/
@CouchbaseDsl
fun CouchbaseSystem.cluster(): ReactiveCluster = this.cluster
fun CouchbaseSystem.cluster(): Cluster = this.cluster

/**
* Exposes the [ReactiveBucket] of the [CouchbaseSystem]
*/
@CouchbaseDsl
fun CouchbaseSystem.bucket(): ReactiveBucket = this.cluster.bucket(this.context.bucket.name)
fun CouchbaseSystem.bucket(): Bucket = this.cluster.bucket(this.context.bucket.name)
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
package com.trendyol.stove.testing.e2e.couchbase

import arrow.core.getOrElse
import com.couchbase.client.java.ReactiveCluster
import com.couchbase.client.java.json.JsonValueModule
import com.couchbase.client.kotlin.Cluster
import com.fasterxml.jackson.databind.ObjectMapper
import com.trendyol.stove.testing.e2e.containers.DEFAULT_REGISTRY
import com.trendyol.stove.testing.e2e.containers.withProvidedRegistry
import com.trendyol.stove.testing.e2e.database.migrations.DatabaseMigration
import com.trendyol.stove.testing.e2e.database.migrations.MigrationCollection
import com.trendyol.stove.testing.e2e.containers.*
import com.trendyol.stove.testing.e2e.database.migrations.*
import com.trendyol.stove.testing.e2e.serialization.StoveObjectMapper
import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.system.ValidationDsl
import com.trendyol.stove.testing.e2e.system.WithDsl
import com.trendyol.stove.testing.e2e.system.*
import com.trendyol.stove.testing.e2e.system.abstractions.*
import com.trendyol.stove.testing.e2e.system.annotations.StoveDsl
import org.testcontainers.couchbase.BucketDefinition
import org.testcontainers.couchbase.CouchbaseContainer
import org.testcontainers.couchbase.*

data class CouchbaseExposedConfiguration(
val connectionString: String,
Expand All @@ -29,17 +23,17 @@ data class CouchbaseSystemOptions(
val defaultBucket: String,
val containerOptions: ContainerOptions = ContainerOptions(),
override val configureExposedConfiguration: (CouchbaseExposedConfiguration) -> List<String> = { _ -> listOf() },
val objectMapper: ObjectMapper = StoveObjectMapper.byConfiguring { registerModule(JsonValueModule()) }
val objectMapper: ObjectMapper = StoveObjectMapper.Default
) : SystemOptions, ConfiguresExposedConfiguration<CouchbaseExposedConfiguration> {
internal val migrationCollection: MigrationCollection<ReactiveCluster> = MigrationCollection()
internal val migrationCollection: MigrationCollection<Cluster> = MigrationCollection()

/**
* Helps for registering migrations before the tests run.
* @see MigrationCollection
* @see DatabaseMigration
*/
@StoveDsl
fun migrations(migration: MigrationCollection<ReactiveCluster>.() -> Unit): CouchbaseSystemOptions =
fun migrations(migration: MigrationCollection<Cluster>.() -> Unit): CouchbaseSystemOptions =
migration(
migrationCollection
).let { this }
Expand All @@ -60,11 +54,14 @@ data class ContainerOptions(

internal fun TestSystem.withCouchbase(options: CouchbaseSystemOptions): TestSystem {
val bucketDefinition = BucketDefinition(options.defaultBucket)
val couchbaseContainer =
withProvidedRegistry("couchbase/server:${options.containerOptions.imageVersion}", options.containerOptions.registry) {
CouchbaseContainer(it).withBucket(bucketDefinition)
.withReuse(this.options.keepDependenciesRunning)
}
val couchbaseContainer = withProvidedRegistry(
imageName = "couchbase/server:${options.containerOptions.imageVersion}",
registry = options.containerOptions.registry
) {
CouchbaseContainer(it)
.withBucket(bucketDefinition)
.withReuse(this.options.keepDependenciesRunning)
}
this.getOrRegister(
CouchbaseSystem(this, CouchbaseContext(bucketDefinition, couchbaseContainer, options))
)
Expand Down
Loading

0 comments on commit e010b1b

Please sign in to comment.