Skip to content

Commit

Permalink
Fjern Confluent dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
hestad committed Jan 25, 2024
1 parent 9fbc1f8 commit 96c7005
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 62 deletions.
5 changes: 0 additions & 5 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ updates:
ignore:
# Versions follow the Postgres version and should be updated manually
- dependency-name: io.zonky.test.postgres:embedded-postgres-binaries-bom
# We want to stay on the apache versions instead of confluent's (dependabot does not support regex in versions yet.)
- dependency-name: org.apache.kafka:kafka-clients
versions:
- "*-ce"
- "*-ccs"
- package-ecosystem: "docker"
directory: "/"
schedule:
Expand Down
1 change: 0 additions & 1 deletion application/src/test/resources/logbackConfig-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
<logger name="org.apache.kafka" level="OFF"/>
<logger name="org.apache.zookeeper" level="OFF"/>
<logger name="org.eclipse" level="ERROR"/>
<logger name="io.confluent" level="OFF"/>
<logger name="kafka" level="OFF"/>
<logger name="state.change.logger" level="WARN"/>
<logger name="io.micrometer" level="ERROR"/>
Expand Down
14 changes: 0 additions & 14 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ subprojects {
mavenCentral()
maven("https://jitpack.io")
maven("https://oss.sonatype.org/content/repositories/releases/")
maven("https://packages.confluent.io/maven/")
}
val kotestVersion = "5.8.0"
val jacksonVersion = "2.16.1"
val confluentVersion = "7.5.3"
dependencies {
implementation(rootProject.libs.kotlin.reflect)
implementation(rootProject.libs.kotlin.script.runtime)
Expand Down Expand Up @@ -51,18 +49,6 @@ subprojects {
exclude("org.apache.kafka", "kafka-storage-api")
exclude("org.apache.kafka", "kafka-streams")
}
implementation("io.confluent:kafka-avro-serializer:$confluentVersion") {
exclude("org.apache.kafka", "kafka-clients")
exclude("io.confluent", "common-utils")
exclude("io.confluent", "logredactor")
exclude("org.apache.avro", "avro")
exclude("org.apache.commons", "commons-compress")
exclude("com.google.errorprone")
exclude("org.checkerframework")
exclude("com.google.j2objc")
exclude("com.google.code.findbugs")
exclude("io.swagger.core.v3")
}
implementation("org.apache.avro:avro:1.11.3")
implementation("com.github.ben-manes.caffeine:caffeine:3.1.8")
implementation("io.micrometer:micrometer-core:1.12.2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.errors.AuthorizationException
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
Expand All @@ -17,7 +18,13 @@ import kotlin.concurrent.timer
internal class KafkaPublisherClient(
private val producerConfig: ApplicationConfig.KafkaConfig.ProducerCfg,
private val log: Logger = LoggerFactory.getLogger(KafkaPublisherClient::class.java),
private val initProducer: () -> Producer<String, String> = { KafkaProducer(producerConfig.kafkaConfig) },
private val initProducer: () -> Producer<String, String> = {
KafkaProducer(
producerConfig.kafkaConfig,
StringSerializer(),
StringSerializer(),
)
},
) : KafkaPublisher {
private var producer: Producer<String, String> = initProducer()
private val failed: Queue<ProducerRecord<String, String>> = LinkedList()
Expand All @@ -38,6 +45,7 @@ internal class KafkaPublisherClient(
producer = initProducer()
failed.add(record)
}

else -> {
log.error("Ukjent feil ved publisering av melding til topic:$topic", exception)
failed.add(record)
Expand Down
1 change: 1 addition & 0 deletions common/infrastructure/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Holder Kafka/Avro til en egen modul, da den har en del avhengigheter som sannsynligvis ikke er nødvendig for alle.
17 changes: 17 additions & 0 deletions common/infrastructure/kafka/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
dependencies {
implementation(project(":common:domain"))
implementation(project(":common:infrastructure"))

implementation("org.apache.kafka:kafka-clients:3.6.1") {
exclude("org.apache.kafka", "kafka-raft")
exclude("org.apache.kafka", "kafka-server-common")
exclude("org.apache.kafka", "kafka-storage")
exclude("org.apache.kafka", "kafka-storage-api")
exclude("org.apache.kafka", "kafka-streams")
}
implementation("org.apache.avro:avro:1.11.3")
}

tasks.named<Jar>("jar") {
archiveBaseName.set("common-infrastructure-avro")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package common.infrastructure.kafka

import arrow.core.Either
import arrow.core.getOrElse
import org.apache.avro.Schema
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization.Deserializer

class AvroDeserializer<T : SpecificRecord>(
private val schema: Schema,
) : Deserializer<T> {
val reader = SpecificDatumReader<T>(schema)
override fun deserialize(topic: String, data: ByteArray): T {
return Either.catch {
val decoder = DecoderFactory.get().binaryDecoder(data, null)
/*
KafkaAvroSerializer legger på 5 bytes, 1 magic byte og 4 som sier noe om hvilke entry i schema registeret som
brukes. Siden vi ikke ønsker å ha et dependency til schema registryet har vi en egen deserializer og skipper de
5 første bytene
https://docs.confluent.io/3.2.0/schema-registry/docs/serializer-formatter.html#wire-format
*/
// Check if the magic byte is present
if (data[0].toInt() == 0) {
// Skip the first 5 bytes (1 for magic byte, 4 for schema ID)
// val payload = data.copyOfRange(5, data.size)
decoder.skipFixed(5)
// reader.read(null, payload)
} else {
// reader.read(null, data)
}
reader.read(null, decoder)
}.getOrElse {
throw RuntimeException(
"Kunne ikke deserialisere avromelding fra topic: $topic. Antall bytes: ${data.size}. Første 10 bytes: ${data.take(10).joinToString(",")}. Skjema: ${schema.toString(true)}",
it,
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package common.infrastructure.kafka

import arrow.core.Either
import arrow.core.getOrElse
import org.apache.avro.io.BinaryEncoder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization.Serializer
import java.io.ByteArrayOutputStream

class AvroSerializer<T : SpecificRecord> : Serializer<T> {
private val encoderFactory = EncoderFactory.get()

override fun serialize(topic: String?, data: T): ByteArray {
return Either.catch {
val writer = SpecificDatumWriter<T>(data.schema)
val bytesOut = ByteArrayOutputStream()
val encoder: BinaryEncoder = encoderFactory.binaryEncoder(bytesOut, null)
writer.write(data, encoder)
encoder.flush()
// // KafkaAvroSerializer legger på fem bytes, 1 magic byte og 4 som sier noe om hvilke entry i schema registeret,
// // https://docs.confluent.io/3.2.0/schema-registry/docs/serializer-formatter.html#wire-format
byteArrayOf(0, 0, 0, 0, 0) + bytesOut.toByteArray()
}.getOrElse {
throw RuntimeException(
"Kunne ikke serialisere avromelding til topic: $topic. Skjema: ${data.schema.toString(true)}",
it,
)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package no.nav.su.se.bakover.common.infrastructure.config

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import io.github.cdimascio.dotenv.dotenv
import no.nav.su.se.bakover.common.domain.config.ServiceUserConfig
import no.nav.su.se.bakover.common.domain.config.TilbakekrevingConfig
Expand All @@ -15,8 +13,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.LocalTime
Expand Down Expand Up @@ -382,22 +378,12 @@ data class ApplicationConfig(
producerCfg = ProducerCfg(
kafkaConfig = CommonAivenKafkaConfig().configure() + mapOf(
ProducerConfig.ACKS_CONFIG to "all",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
),
),
consumerCfg = ConsumerCfg(
CommonAivenKafkaConfig().configure() +
commonConsumerConfig(
keyDeserializer = StringDeserializer::class.java,
valueDeserializer = KafkaAvroDeserializer::class.java,
clientIdConfig = getEnvironmentVariableOrThrow("HOSTNAME"),
) +
mapOf(
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG to true,
KafkaAvroDeserializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE to "USER_INFO",
KafkaAvroDeserializerConfig.USER_INFO_CONFIG to ConsumerCfg.getUserInfoConfig(),
KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG to getEnvironmentVariableOrThrow("KAFKA_SCHEMA_REGISTRY"),
),
),
)
Expand All @@ -417,6 +403,8 @@ data class ApplicationConfig(
val kafkaConfig: Map<String, Any>,
) {
companion object {
// TODO jah: Trenger vi denne?
@Suppress("unused")
fun getUserInfoConfig() = "${
getEnvironmentVariableOrDefault(
"KAFKA_SCHEMA_REGISTRY_USER",
Expand Down Expand Up @@ -523,8 +511,6 @@ data class ApplicationConfig(
fun createFromEnvironmentVariables() = KabalKafkaConfig(
kafkaConfig = KafkaConfig.CommonAivenKafkaConfig().configure() +
commonConsumerConfig(
keyDeserializer = StringDeserializer::class.java,
valueDeserializer = StringDeserializer::class.java,
clientIdConfig = getEnvironmentVariableOrThrow("HOSTNAME"),
),
)
Expand All @@ -543,8 +529,6 @@ data class ApplicationConfig(
fun createFromEnvironmentVariables() = InstitusjonsoppholdKafkaConfig(
kafkaConfig = KafkaConfig.CommonAivenKafkaConfig().configure() +
commonConsumerConfig(
keyDeserializer = StringDeserializer::class.java,
valueDeserializer = StringDeserializer::class.java,
clientIdConfig = getEnvironmentVariableOrThrow("HOSTNAME"),
),
topicName = getEnvironmentVariableOrThrow("INSTITUSJONSOPPHOLD_TOPIC"),
Expand All @@ -564,16 +548,12 @@ fun commonConsumerConfig(
enableAutoCommitConfig: Boolean = false,
clientIdConfig: String,
autoOffsetResetConfig: String = "earliest",
keyDeserializer: Class<*>,
valueDeserializer: Class<*>,
): Map<String, Any> {
return mapOf(
ConsumerConfig.GROUP_ID_CONFIG to groupIdConfig,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to maxPollRecordsConfig,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to enableAutoCommitConfig.toString(),
ConsumerConfig.CLIENT_ID_CONFIG to clientIdConfig,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to autoOffsetResetConfig,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to keyDeserializer,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to valueDeserializer,
)
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package no.nav.su.se.bakover.common.infrastructure.config

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.kotest.extensions.system.withEnvironment
import io.kotest.matchers.shouldBe
import no.nav.su.se.bakover.common.SU_SE_BAKOVER_CONSUMER_ID
import no.nav.su.se.bakover.common.domain.config.ServiceUserConfig
import no.nav.su.se.bakover.common.domain.config.TilbakekrevingConfig
import no.nav.su.se.bakover.common.infrastructure.brukerrolle.AzureGroups
import no.nav.su.se.bakover.common.infrastructure.git.GitCommit
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
Expand All @@ -36,8 +33,6 @@ class ApplicationConfigTest {
"client.id" to "hostname",
"enable.auto.commit" to "false",
"auto.offset.reset" to "earliest",
"key.deserializer" to StringDeserializer::class.java,
"value.deserializer" to StringDeserializer::class.java,
"max.poll.records" to 100,
)

Expand Down Expand Up @@ -133,19 +128,11 @@ class ApplicationConfigTest {
"bootstrap.servers" to "brokers",
"security.protocol" to "SSL",
"acks" to "all",
"key.serializer" to StringSerializer::class.java,
"value.serializer" to StringSerializer::class.java,
),
retryTaskInterval = Duration.ofSeconds(15),
),
consumerCfg = ApplicationConfig.KafkaConfig.ConsumerCfg(
kafkaConfig + mapOf(
"value.deserializer" to KafkaAvroDeserializer::class.java,
"specific.avro.reader" to true,
"schema.registry.url" to "some-schema-url",
"basic.auth.credentials.source" to "USER_INFO",
"basic.auth.user.info" to "usr:pwd",
),
kafkaConfig,
),
),
kabalKafkaConfig = ApplicationConfig.KabalKafkaConfig(
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ include("client")
include("common:domain")
include("common:infrastructure")
include("common:infrastructure:cxf")
include("common:infrastructure:kafka")
include("common:presentation")
include("database")
include("datapakker:soknad")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Clock
Expand All @@ -32,7 +33,7 @@ class InstitusjonsoppholdConsumer private constructor(
private val clock: Clock,
private val log: Logger = LoggerFactory.getLogger(InstitusjonsoppholdConsumer::class.java),
private val sikkerLogg: Logger = no.nav.su.se.bakover.common.sikkerLogg,
private val consumer: KafkaConsumer<String, String> = KafkaConsumer(config.kafkaConfig),
private val consumer: KafkaConsumer<String, String> = KafkaConsumer(config.kafkaConfig, StringDeserializer(), StringDeserializer()),
) {
constructor(
config: ApplicationConfig.InstitusjonsoppholdKafkaConfig,
Expand Down
1 change: 1 addition & 0 deletions web/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation(project(":client"))
implementation(project(":common:domain"))
implementation(project(":common:infrastructure"))
implementation(project(":common:infrastructure:kafka"))
implementation(project(":common:presentation"))
implementation(project(":database"))
implementation(project(":dokument:application"))
Expand Down
15 changes: 13 additions & 2 deletions web/src/main/kotlin/no/nav/su/se/bakover/web/JobberOgConsumers.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package no.nav.su.se.bakover.web

import common.infrastructure.kafka.AvroDeserializer
import no.nav.person.pdl.leesah.Personhendelse
import no.nav.su.se.bakover.client.Clients
import no.nav.su.se.bakover.common.extensions.april
import no.nav.su.se.bakover.common.extensions.august
Expand Down Expand Up @@ -45,6 +47,7 @@ import no.nav.su.se.bakover.web.services.personhendelser.PersonhendelseOppgaveJo
import no.nav.su.se.bakover.web.services.tilbakekreving.LokalMottaKravgrunnlagJob
import no.nav.su.se.bakover.web.services.tilbakekreving.SendTilbakekrevingsvedtakForRevurdering
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import tilbakekreving.presentation.Tilbakekrevingskomponenter
import tilbakekreving.presentation.consumer.KravgrunnlagIbmMqConsumer
import tilbakekreving.presentation.job.Tilbakekrevingsjobber
Expand Down Expand Up @@ -134,11 +137,19 @@ fun startJobberOgConsumers(
kvitteringConsumer = consumers.utbetalingKvitteringConsumer,
)
PersonhendelseConsumer(
consumer = KafkaConsumer(applicationConfig.kafkaConfig.consumerCfg.kafkaConfig),
consumer = KafkaConsumer(
applicationConfig.kafkaConfig.consumerCfg.kafkaConfig,
StringDeserializer(),
AvroDeserializer(Personhendelse.getClassSchema()),
),
personhendelseService = personhendelseService,
)
KlageinstanshendelseConsumer(
consumer = KafkaConsumer(applicationConfig.kabalKafkaConfig.kafkaConfig),
consumer = KafkaConsumer(
applicationConfig.kabalKafkaConfig.kafkaConfig,
StringDeserializer(),
StringDeserializer(),
),
klageinstanshendelseService = services.klageinstanshendelseService,
clock = clock,
)
Expand Down
Loading

0 comments on commit 96c7005

Please sign in to comment.