diff --git a/web/src/main/kotlin/no/nav/su/se/bakover/web/services/klage/FattetKlagevedtakConsumer.kt b/web/src/main/kotlin/no/nav/su/se/bakover/web/services/klage/FattetKlagevedtakConsumer.kt index 91c3b6ac07..22e6dae061 100644 --- a/web/src/main/kotlin/no/nav/su/se/bakover/web/services/klage/FattetKlagevedtakConsumer.kt +++ b/web/src/main/kotlin/no/nav/su/se/bakover/web/services/klage/FattetKlagevedtakConsumer.kt @@ -73,22 +73,24 @@ internal class FattetKlagevedtakConsumer( -> { log.error("Fattet klagevedtak: $it. Key: ${message.key()}, partition: ${message.partition()}, offset: ${message.offset()}") sikkerLogg.error("Fattet klagevedtak: $it. Key: ${message.key()}, Value: ${message.value()}, partition: ${message.partition()}, offset: ${message.offset()} ") + consumer.commitSync(offsets) // Kafka tar ikke hensyn til offsetten vi comitter før det skjer en Rebalance. // Vi kan tvinge en rebalance, dersom vi ikke ønsker neste event (som kan føre til at vi comitter lengre frem enn vi faktisk er) // Hvis dette skjer vil Kafka prøve å sende meldingen på nytt mens den blokkerer nyere meldinger. // Dersom dette oppstår kan vi vurdere om vi heller bare skal forkaste meldingen eller lagre den (da flytter vi kompleksiten inn i domenet.) consumer.enforceRebalance() - consumer.commitSync(offsets) return@breakable } is KunneIkkeMappeKlagevedtak.IkkeAktuellOpplysningstype -> { log.debug("Fattet klagevedtak: Forkastet hendelse med uaktuell kilde: ${it.kilde}, key: ${message.key()}, partition: ${message.partition()}, offset: ${message.offset()}\"") sikkerLogg.debug("Fattet klagevedtak: Forkastet hendelse med uaktuell kilde: ${it.kilde}, key: ${message.key()}, value: ${message.value()} partition: ${message.partition()}, offset: ${message.offset()}\"") - return@forEach + offsets[TopicPartition(message.topic(), message.partition())] = + OffsetAndMetadata(message.offset() + 1) } } } } + consumer.commitSync(offsets) } log.debug("Fattet klagevedtak: Prosessert ferdig meldingene.") } diff --git a/web/src/test/kotlin/no/nav/su/se/bakover/web/services/klage/FattetKlagevedtakConsumerTest.kt b/web/src/test/kotlin/no/nav/su/se/bakover/web/services/klage/FattetKlagevedtakConsumerTest.kt index 568f3af3d6..44385f7b63 100644 --- a/web/src/test/kotlin/no/nav/su/se/bakover/web/services/klage/FattetKlagevedtakConsumerTest.kt +++ b/web/src/test/kotlin/no/nav/su/se/bakover/web/services/klage/FattetKlagevedtakConsumerTest.kt @@ -15,6 +15,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer @@ -42,7 +43,7 @@ internal class FattetKlagevedtakConsumerTest { private val key = UUID.randomUUID().toString() @Test - fun `Mottar alle fatta klagevedtak`() { + fun `Lagrer aktuelle og forkaster uaktuelle`() { val kafkaConsumer = kafkaConsumer(kafkaServer, "$TOPIC1-consumer-group") val klagevedtakService = mock() FattetKlagevedtakConsumer( @@ -58,15 +59,29 @@ internal class FattetKlagevedtakConsumerTest { }, ) val producer = kafkaProducer(kafkaServer) - (0..5L).map { - producer.send(genererFattetKlagevedtaksmelding(TOPIC1, it)) - }.forEach { + listOf( + producer.send(genererFattetKlagevedtaksmelding(TOPIC1, 0, "SUPSTONAD")), + producer.send(genererFattetKlagevedtaksmelding(TOPIC1, 1, "ANNENSTONAD")), + producer.send(genererFattetKlagevedtaksmelding(TOPIC1, 2, "SUPSTONAD")), + ).forEach { // Venter til alle meldingene er sendt før vi prøver consume it.get() } val hendelser = argumentCaptor() - verify(klagevedtakService, timeout(20000).times(6)).lagre(hendelser.capture()) - hendelser.allValues.size shouldBe 6 + // Kunne alternativt brukt awaitility for å vente til currentOffset ble 3 + verify(klagevedtakService, timeout(20000).times(2)).lagre(any()) + currentOffset(TOPIC1) shouldBe 3 // last offset (2) + 1 + listOf( + producer.send(genererFattetKlagevedtaksmelding(TOPIC1, 3, "ANNENSTONAD")), + producer.send(genererFattetKlagevedtaksmelding(TOPIC1, 4, "SUPSTONAD")), + producer.send(genererFattetKlagevedtaksmelding(TOPIC1, 5, "ANNENSTONAD")), + ).forEach { + // Venter til alle meldingene er sendt før vi prøver consume + it.get() + } + + verify(klagevedtakService, timeout(20000).times(3)).lagre(hendelser.capture()) + hendelser.allValues.size shouldBe 3 hendelser.allValues.forEachIndexed { index, klagevedtak -> UprosessertFattetKlagevedtak( id = klagevedtak.id, @@ -90,6 +105,7 @@ internal class FattetKlagevedtakConsumerTest { ) } verifyNoMoreInteractions(klagevedtakService) + currentOffset(TOPIC1) shouldBe 6 // last offset (5) + 1 } @Test @@ -120,17 +136,24 @@ internal class FattetKlagevedtakConsumerTest { verify(klagevedtakService, timeout(20000).times(1)).lagre(any()) Thread.sleep(2000) // Venter deretter en liten stund til for å verifisere at det ikke kommer fler kall. verifyNoMoreInteractions(klagevedtakService) + currentOffset(TOPIC2) shouldBe 1 // last offset (0) + 1 + } + + private fun currentOffset(topic: String): Long { + return kafkaServer.adminClient!!.listConsumerGroupOffsets("funKafkaConsumeGrpID") + .partitionsToOffsetAndMetadata().get()[TopicPartition(topic, PARTITION)]!!.offset() } private fun genererFattetKlagevedtaksmelding( topic: String, offset: Long, + kilde: String = "SUPSTONAD" ): ProducerRecord { val fattetKlagevedtaksmelding = """ { "eventId": "$offset", "kildeReferanse":"$offset", - "kilde":"SUPSTONAD", + "kilde":"$kilde", "utfall":"TRUKKET", "vedtaksbrevReferanse":null, "kabalReferanse":"$offset"