Skip to content

Commit

Permalink
#72 Add notification task
Browse files Browse the repository at this point in the history
  • Loading branch information
vityaman committed Jun 13, 2024
1 parent b4333b3 commit d31b578
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ru.vityaman.lms.botalka.app.spring.task

import kotlinx.coroutines.runBlocking
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.core.logging.Slf4jLog
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.publication.PublicationSupplier
import ru.vityaman.lms.botalka.core.publication.logging.loggingNotificationCallbacks
import ru.vityaman.lms.botalka.core.publication.task.NotificationTask
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds

@Component
class SpringNotificationTask(
@Value("\${task.scheduled.notification.batch-duration-seconds}")
batchDurationSeconds: Int,

supplier: PublicationSupplier,
) {
private val log = Slf4jLog("NotificationTask")

private val logic = NotificationTask(
supplier = supplier,
consumer = object : Consumer<Homework> {
override suspend fun accept(value: Homework) {
log.info("Consumed $value")
}
},
config = NotificationTask.Config(
batchDuration = batchDurationSeconds.seconds,
),
callbacks = loggingNotificationCallbacks(log),
)

@Scheduled(
fixedRateString =
"\${task.scheduled.notification.precision-seconds}",
initialDelayString =
"\${task.scheduled.notification.precision-seconds}",
timeUnit = TimeUnit.SECONDS,
)
fun run(): Unit = runBlocking { start() }

suspend fun start(): Unit = logic.run()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.publication.KafkaPublicationConsumer
import ru.vityaman.lms.botalka.core.publication.PublicationConsumer
import ru.vityaman.lms.botalka.core.publication.kafka.KafkaPublicationConsumer
import ru.vityaman.lms.botalka.storage.kafka.KafkaProducer
import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.publication.KafkaPublicationSupplier
import ru.vityaman.lms.botalka.core.publication.PublicationSupplier
import ru.vityaman.lms.botalka.core.publication.kafka.KafkaPublicationSupplier
import ru.vityaman.lms.botalka.storage.kafka.KafkaConsumer
import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic

Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,25 @@
package ru.vityaman.lms.botalka.app.spring.task

import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.app.spring.storage.MainR2dbcConfig
import ru.vityaman.lms.botalka.commons.takeDuring
import ru.vityaman.lms.botalka.core.logging.Slf4jLog
import ru.vityaman.lms.botalka.core.publication.PublicationConsumer
import ru.vityaman.lms.botalka.core.publication.PublicationSupplier
import ru.vityaman.lms.botalka.core.publication.PublicationTask
import ru.vityaman.lms.botalka.core.publication.loggingPublicationCallbacks
import ru.vityaman.lms.botalka.core.publication.logging.loggingPublicationCallbacks
import ru.vityaman.lms.botalka.core.publication.task.PublicationTask
import ru.vityaman.lms.botalka.core.storage.HomeworkStorage
import ru.vityaman.lms.botalka.core.tx.TxEnv
import java.time.Clock
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds

@Component
class SpringPublicationTask(
homeworks: HomeworkStorage,

consumer: PublicationConsumer,

private val supplier: PublicationSupplier,

@Qualifier(MainR2dbcConfig.BeanName.TX_ENV)
mainTx: TxEnv,

Expand All @@ -49,19 +41,4 @@ class SpringPublicationTask(
fun run(): Unit = runBlocking { start() }

suspend fun start(): Unit = logic.run()

@Scheduled(
fixedRateString = "\${task.scheduled.publication.precision-seconds}",
initialDelayString = "\${task.scheduled.publication.precision-seconds}",
timeUnit = TimeUnit.SECONDS,
)
fun consume(): Unit = runBlocking {
val events = supplier
.events()
.takeDuring(5.seconds)
.onEach { it.acknowledge() }
.map { it.payload.title.text }
.toList(mutableListOf())
println("Consumed $events")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.vityaman.lms.botalka.commons

interface Runnable {
suspend fun run()
}

interface Consumer<V> {
suspend fun accept(value: V)
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ru.vityaman.lms.botalka.core.publication

import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.core.model.Homework

interface PublicationConsumer {
suspend fun accept(homework: Homework)
}
interface PublicationConsumer : Consumer<Homework>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ru.vityaman.lms.botalka.core.publication
package ru.vityaman.lms.botalka.core.publication.kafka

import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.publication.PublicationConsumer
import ru.vityaman.lms.botalka.storage.kafka.KafkaProducer

class KafkaPublicationConsumer(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package ru.vityaman.lms.botalka.core.publication
package ru.vityaman.lms.botalka.core.publication.kafka

import kotlinx.coroutines.flow.Flow
import ru.vityaman.lms.botalka.core.event.Event
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.publication.PublicationSupplier
import ru.vityaman.lms.botalka.storage.kafka.KafkaConsumer

class KafkaPublicationSupplier(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ru.vityaman.lms.botalka.core.publication.logging

import ru.vityaman.lms.botalka.core.logging.Log
import ru.vityaman.lms.botalka.core.publication.task.NotificationTask

fun loggingNotificationCallbacks(log: Log) = NotificationTask.Callbacks(
onStart = { log.info("Starting...") },
onNext = {
buildString {
append("Received homework with ")
append("id '${it.id}', ")
append("title '${it.title}', ")
append("moment '${it.publicationMoment}'")
}.let { log.info(it) }
},
onSuccess = { log.info("Processed homework with id '${it.id}'") },
onError = { log.warn("Task failed because '${it.message}'") },
onFinish = { log.info("Finished.") },
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ru.vityaman.lms.botalka.core.publication
package ru.vityaman.lms.botalka.core.publication.logging

import ru.vityaman.lms.botalka.core.logging.Log
import ru.vityaman.lms.botalka.core.publication.task.PublicationTask

fun loggingPublicationCallbacks(log: Log) = PublicationTask.Callbacks(
onStart = { log.info("Starting...") },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package ru.vityaman.lms.botalka.core.publication.task

import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.onEach
import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.commons.Runnable
import ru.vityaman.lms.botalka.commons.takeDuring
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.publication.PublicationSupplier
import kotlin.time.Duration

class NotificationTask(
private val supplier: PublicationSupplier,
private val consumer: Consumer<Homework>,
private val config: Config,
private val callbacks: Callbacks = Callbacks(),
) : Runnable {
override suspend fun run() {
callbacks.onStart()
supplier.events()
.takeDuring(config.batchDuration)
.onEach { callbacks.onNext(it.payload) }
.onEach { consumer.accept(it.payload) }
.onEach { callbacks.onSuccess(it.payload) }
.catch { callbacks.onError(it) }
.onEach { it.acknowledge() }
.collect {}
callbacks.onFinish()
}

data class Config(
val batchDuration: Duration,
)

data class Callbacks(
val onStart: suspend () -> Unit = {},
val onNext: suspend (Homework) -> Unit = {},
val onSuccess: suspend (Homework) -> Unit = {},
val onError: suspend (Throwable) -> Unit = {},
val onFinish: suspend () -> Unit = {},
)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ru.vityaman.lms.botalka.core.publication
package ru.vityaman.lms.botalka.core.publication.task

import kotlinx.coroutines.flow.toCollection
import ru.vityaman.lms.botalka.commons.Runnable
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.publication.PublicationConsumer
import ru.vityaman.lms.botalka.core.storage.HomeworkStorage
import ru.vityaman.lms.botalka.core.tx.TxEnv
import java.time.Clock
Expand Down
5 changes: 4 additions & 1 deletion botalka/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,7 @@ external:
task:
scheduled:
publication:
precision-seconds: 60
precision-seconds: 60
notification:
precision-seconds: 60
batch-duration-seconds: 16

0 comments on commit d31b578

Please sign in to comment.