From d31b578ab03a19d9e34b66a6062fa5d3d8e521c7 Mon Sep 17 00:00:00 2001 From: vityaman Date: Thu, 13 Jun 2024 16:18:35 +0300 Subject: [PATCH] #72 Add notification task --- .../app/spring/task/SpringNotificationTask.kt | 48 +++++++++++++++++++ .../spring/task/SpringPublicationConsumer.kt | 2 +- .../spring/task/SpringPublicationSupplier.kt | 2 +- .../app/spring/task/SpringPublicationTask.kt | 27 +---------- .../botalka/commons/ReactiveReplacement.kt | 9 ++++ .../publication/LoggingPublicationConsumer.kt | 12 ----- .../core/publication/PublicationConsumer.kt | 5 +- .../lms/botalka/core/publication/Runnable.kt | 5 -- .../{ => kafka}/KafkaPublicationConsumer.kt | 3 +- .../{ => kafka}/KafkaPublicationSupplier.kt | 3 +- .../logging/LoggingNotificationCallbacks.kt | 19 ++++++++ .../LoggingPublicationCallbacks.kt | 3 +- .../core/publication/task/NotificationTask.kt | 42 ++++++++++++++++ .../publication/{ => task}/PublicationTask.kt | 4 +- botalka/src/main/resources/application.yml | 5 +- 15 files changed, 137 insertions(+), 52 deletions(-) create mode 100644 botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringNotificationTask.kt create mode 100644 botalka/src/main/kotlin/ru/vityaman/lms/botalka/commons/ReactiveReplacement.kt delete mode 100644 botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/LoggingPublicationConsumer.kt delete mode 100644 botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/Runnable.kt rename botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/{ => kafka}/KafkaPublicationConsumer.kt (73%) rename botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/{ => kafka}/KafkaPublicationSupplier.kt (76%) create mode 100644 botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/logging/LoggingNotificationCallbacks.kt rename botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/{ => logging}/LoggingPublicationCallbacks.kt (78%) create mode 100644 botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/task/NotificationTask.kt rename botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/{ => task}/PublicationTask.kt (90%) diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringNotificationTask.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringNotificationTask.kt new file mode 100644 index 0000000..e4f5248 --- /dev/null +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringNotificationTask.kt @@ -0,0 +1,48 @@ +package ru.vityaman.lms.botalka.app.spring.task + +import kotlinx.coroutines.runBlocking +import org.springframework.beans.factory.annotation.Value +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Component +import ru.vityaman.lms.botalka.commons.Consumer +import ru.vityaman.lms.botalka.core.logging.Slf4jLog +import ru.vityaman.lms.botalka.core.model.Homework +import ru.vityaman.lms.botalka.core.publication.PublicationSupplier +import ru.vityaman.lms.botalka.core.publication.logging.loggingNotificationCallbacks +import ru.vityaman.lms.botalka.core.publication.task.NotificationTask +import java.util.concurrent.TimeUnit +import kotlin.time.Duration.Companion.seconds + +@Component +class SpringNotificationTask( + @Value("\${task.scheduled.notification.batch-duration-seconds}") + batchDurationSeconds: Int, + + supplier: PublicationSupplier, +) { + private val log = Slf4jLog("NotificationTask") + + private val logic = NotificationTask( + supplier = supplier, + consumer = object : Consumer { + override suspend fun accept(value: Homework) { + log.info("Consumed $value") + } + }, + config = NotificationTask.Config( + batchDuration = batchDurationSeconds.seconds, + ), + callbacks = loggingNotificationCallbacks(log), + ) + + @Scheduled( + fixedRateString = + "\${task.scheduled.notification.precision-seconds}", + initialDelayString = + "\${task.scheduled.notification.precision-seconds}", + timeUnit = TimeUnit.SECONDS, + ) + fun run(): Unit = runBlocking { start() } + + suspend fun start(): Unit = logic.run() +} diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationConsumer.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationConsumer.kt index baa1c47..50d6abe 100644 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationConsumer.kt +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationConsumer.kt @@ -4,8 +4,8 @@ import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import ru.vityaman.lms.botalka.core.model.Homework -import ru.vityaman.lms.botalka.core.publication.KafkaPublicationConsumer import ru.vityaman.lms.botalka.core.publication.PublicationConsumer +import ru.vityaman.lms.botalka.core.publication.kafka.KafkaPublicationConsumer import ru.vityaman.lms.botalka.storage.kafka.KafkaProducer import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationSupplier.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationSupplier.kt index 163ae24..35e0932 100644 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationSupplier.kt +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationSupplier.kt @@ -4,8 +4,8 @@ import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import ru.vityaman.lms.botalka.core.model.Homework -import ru.vityaman.lms.botalka.core.publication.KafkaPublicationSupplier import ru.vityaman.lms.botalka.core.publication.PublicationSupplier +import ru.vityaman.lms.botalka.core.publication.kafka.KafkaPublicationSupplier import ru.vityaman.lms.botalka.storage.kafka.KafkaConsumer import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationTask.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationTask.kt index e8d53e3..4169752 100644 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationTask.kt +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/app/spring/task/SpringPublicationTask.kt @@ -1,24 +1,18 @@ package ru.vityaman.lms.botalka.app.spring.task -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import org.springframework.beans.factory.annotation.Qualifier import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component import ru.vityaman.lms.botalka.app.spring.storage.MainR2dbcConfig -import ru.vityaman.lms.botalka.commons.takeDuring import ru.vityaman.lms.botalka.core.logging.Slf4jLog import ru.vityaman.lms.botalka.core.publication.PublicationConsumer -import ru.vityaman.lms.botalka.core.publication.PublicationSupplier -import ru.vityaman.lms.botalka.core.publication.PublicationTask -import ru.vityaman.lms.botalka.core.publication.loggingPublicationCallbacks +import ru.vityaman.lms.botalka.core.publication.logging.loggingPublicationCallbacks +import ru.vityaman.lms.botalka.core.publication.task.PublicationTask import ru.vityaman.lms.botalka.core.storage.HomeworkStorage import ru.vityaman.lms.botalka.core.tx.TxEnv import java.time.Clock import java.util.concurrent.TimeUnit -import kotlin.time.Duration.Companion.seconds @Component class SpringPublicationTask( @@ -26,8 +20,6 @@ class SpringPublicationTask( consumer: PublicationConsumer, - private val supplier: PublicationSupplier, - @Qualifier(MainR2dbcConfig.BeanName.TX_ENV) mainTx: TxEnv, @@ -49,19 +41,4 @@ class SpringPublicationTask( fun run(): Unit = runBlocking { start() } suspend fun start(): Unit = logic.run() - - @Scheduled( - fixedRateString = "\${task.scheduled.publication.precision-seconds}", - initialDelayString = "\${task.scheduled.publication.precision-seconds}", - timeUnit = TimeUnit.SECONDS, - ) - fun consume(): Unit = runBlocking { - val events = supplier - .events() - .takeDuring(5.seconds) - .onEach { it.acknowledge() } - .map { it.payload.title.text } - .toList(mutableListOf()) - println("Consumed $events") - } } diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/commons/ReactiveReplacement.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/commons/ReactiveReplacement.kt new file mode 100644 index 0000000..020d3ff --- /dev/null +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/commons/ReactiveReplacement.kt @@ -0,0 +1,9 @@ +package ru.vityaman.lms.botalka.commons + +interface Runnable { + suspend fun run() +} + +interface Consumer { + suspend fun accept(value: V) +} diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/LoggingPublicationConsumer.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/LoggingPublicationConsumer.kt deleted file mode 100644 index 22f623d..0000000 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/LoggingPublicationConsumer.kt +++ /dev/null @@ -1,12 +0,0 @@ -package ru.vityaman.lms.botalka.core.publication - -import ru.vityaman.lms.botalka.core.logging.Log -import ru.vityaman.lms.botalka.core.model.Homework - -class LoggingPublicationConsumer( - private val log: Log, -) : PublicationConsumer { - override suspend fun accept(homework: Homework) { - log.warn("Published $homework") - } -} diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/PublicationConsumer.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/PublicationConsumer.kt index 9491f0c..eb98b10 100644 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/PublicationConsumer.kt +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/PublicationConsumer.kt @@ -1,7 +1,6 @@ package ru.vityaman.lms.botalka.core.publication +import ru.vityaman.lms.botalka.commons.Consumer import ru.vityaman.lms.botalka.core.model.Homework -interface PublicationConsumer { - suspend fun accept(homework: Homework) -} +interface PublicationConsumer : Consumer diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/Runnable.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/Runnable.kt deleted file mode 100644 index a5c669b..0000000 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/Runnable.kt +++ /dev/null @@ -1,5 +0,0 @@ -package ru.vityaman.lms.botalka.core.publication - -interface Runnable { - suspend fun run() -} diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/KafkaPublicationConsumer.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/kafka/KafkaPublicationConsumer.kt similarity index 73% rename from botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/KafkaPublicationConsumer.kt rename to botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/kafka/KafkaPublicationConsumer.kt index b92b17a..cac6ee1 100644 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/KafkaPublicationConsumer.kt +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/kafka/KafkaPublicationConsumer.kt @@ -1,6 +1,7 @@ -package ru.vityaman.lms.botalka.core.publication +package ru.vityaman.lms.botalka.core.publication.kafka import ru.vityaman.lms.botalka.core.model.Homework +import ru.vityaman.lms.botalka.core.publication.PublicationConsumer import ru.vityaman.lms.botalka.storage.kafka.KafkaProducer class KafkaPublicationConsumer( diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/KafkaPublicationSupplier.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/kafka/KafkaPublicationSupplier.kt similarity index 76% rename from botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/KafkaPublicationSupplier.kt rename to botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/kafka/KafkaPublicationSupplier.kt index 6c0e142..b8c5a08 100644 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/KafkaPublicationSupplier.kt +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/kafka/KafkaPublicationSupplier.kt @@ -1,8 +1,9 @@ -package ru.vityaman.lms.botalka.core.publication +package ru.vityaman.lms.botalka.core.publication.kafka import kotlinx.coroutines.flow.Flow import ru.vityaman.lms.botalka.core.event.Event import ru.vityaman.lms.botalka.core.model.Homework +import ru.vityaman.lms.botalka.core.publication.PublicationSupplier import ru.vityaman.lms.botalka.storage.kafka.KafkaConsumer class KafkaPublicationSupplier( diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/logging/LoggingNotificationCallbacks.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/logging/LoggingNotificationCallbacks.kt new file mode 100644 index 0000000..8564c0f --- /dev/null +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/logging/LoggingNotificationCallbacks.kt @@ -0,0 +1,19 @@ +package ru.vityaman.lms.botalka.core.publication.logging + +import ru.vityaman.lms.botalka.core.logging.Log +import ru.vityaman.lms.botalka.core.publication.task.NotificationTask + +fun loggingNotificationCallbacks(log: Log) = NotificationTask.Callbacks( + onStart = { log.info("Starting...") }, + onNext = { + buildString { + append("Received homework with ") + append("id '${it.id}', ") + append("title '${it.title}', ") + append("moment '${it.publicationMoment}'") + }.let { log.info(it) } + }, + onSuccess = { log.info("Processed homework with id '${it.id}'") }, + onError = { log.warn("Task failed because '${it.message}'") }, + onFinish = { log.info("Finished.") }, +) diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/LoggingPublicationCallbacks.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/logging/LoggingPublicationCallbacks.kt similarity index 78% rename from botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/LoggingPublicationCallbacks.kt rename to botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/logging/LoggingPublicationCallbacks.kt index 73f0550..9cd38d0 100644 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/LoggingPublicationCallbacks.kt +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/logging/LoggingPublicationCallbacks.kt @@ -1,6 +1,7 @@ -package ru.vityaman.lms.botalka.core.publication +package ru.vityaman.lms.botalka.core.publication.logging import ru.vityaman.lms.botalka.core.logging.Log +import ru.vityaman.lms.botalka.core.publication.task.PublicationTask fun loggingPublicationCallbacks(log: Log) = PublicationTask.Callbacks( onStart = { log.info("Starting...") }, diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/task/NotificationTask.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/task/NotificationTask.kt new file mode 100644 index 0000000..7fe1c0f --- /dev/null +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/task/NotificationTask.kt @@ -0,0 +1,42 @@ +package ru.vityaman.lms.botalka.core.publication.task + +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.onEach +import ru.vityaman.lms.botalka.commons.Consumer +import ru.vityaman.lms.botalka.commons.Runnable +import ru.vityaman.lms.botalka.commons.takeDuring +import ru.vityaman.lms.botalka.core.model.Homework +import ru.vityaman.lms.botalka.core.publication.PublicationSupplier +import kotlin.time.Duration + +class NotificationTask( + private val supplier: PublicationSupplier, + private val consumer: Consumer, + private val config: Config, + private val callbacks: Callbacks = Callbacks(), +) : Runnable { + override suspend fun run() { + callbacks.onStart() + supplier.events() + .takeDuring(config.batchDuration) + .onEach { callbacks.onNext(it.payload) } + .onEach { consumer.accept(it.payload) } + .onEach { callbacks.onSuccess(it.payload) } + .catch { callbacks.onError(it) } + .onEach { it.acknowledge() } + .collect {} + callbacks.onFinish() + } + + data class Config( + val batchDuration: Duration, + ) + + data class Callbacks( + val onStart: suspend () -> Unit = {}, + val onNext: suspend (Homework) -> Unit = {}, + val onSuccess: suspend (Homework) -> Unit = {}, + val onError: suspend (Throwable) -> Unit = {}, + val onFinish: suspend () -> Unit = {}, + ) +} diff --git a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/PublicationTask.kt b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/task/PublicationTask.kt similarity index 90% rename from botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/PublicationTask.kt rename to botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/task/PublicationTask.kt index 7dc1a9d..584d1dd 100644 --- a/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/PublicationTask.kt +++ b/botalka/src/main/kotlin/ru/vityaman/lms/botalka/core/publication/task/PublicationTask.kt @@ -1,7 +1,9 @@ -package ru.vityaman.lms.botalka.core.publication +package ru.vityaman.lms.botalka.core.publication.task import kotlinx.coroutines.flow.toCollection +import ru.vityaman.lms.botalka.commons.Runnable import ru.vityaman.lms.botalka.core.model.Homework +import ru.vityaman.lms.botalka.core.publication.PublicationConsumer import ru.vityaman.lms.botalka.core.storage.HomeworkStorage import ru.vityaman.lms.botalka.core.tx.TxEnv import java.time.Clock diff --git a/botalka/src/main/resources/application.yml b/botalka/src/main/resources/application.yml index 2ada0ab..b24a6d2 100644 --- a/botalka/src/main/resources/application.yml +++ b/botalka/src/main/resources/application.yml @@ -67,4 +67,7 @@ external: task: scheduled: publication: - precision-seconds: 60 \ No newline at end of file + precision-seconds: 60 + notification: + precision-seconds: 60 + batch-duration-seconds: 16