Skip to content

Commit

Permalink
#72 Refactor timeout on flow
Browse files Browse the repository at this point in the history
  • Loading branch information
vityaman committed Jun 13, 2024
1 parent 705dceb commit b4333b3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package ru.vityaman.lms.botalka.app.spring.task

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.timeout
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.app.spring.storage.MainR2dbcConfig
import ru.vityaman.lms.botalka.commons.takeDuring
import ru.vityaman.lms.botalka.core.logging.Slf4jLog
import ru.vityaman.lms.botalka.core.publication.PublicationConsumer
import ru.vityaman.lms.botalka.core.publication.PublicationSupplier
Expand Down Expand Up @@ -51,7 +50,6 @@ class SpringPublicationTask(

suspend fun start(): Unit = logic.run()

@OptIn(FlowPreview::class)
@Scheduled(
fixedRateString = "\${task.scheduled.publication.precision-seconds}",
initialDelayString = "\${task.scheduled.publication.precision-seconds}",
Expand All @@ -60,15 +58,10 @@ class SpringPublicationTask(
fun consume(): Unit = runBlocking {
val events = supplier
.events()
.timeout(5.seconds)
.catch {
if (it !is TimeoutCancellationException) {
throw it
}
}
.toList(mutableListOf())
.takeDuring(5.seconds)
.onEach { it.acknowledge() }
.map { it.payload.title.text }
.toList(mutableListOf())
println("Consumed $events")
}
}
16 changes: 16 additions & 0 deletions botalka/src/main/kotlin/ru/vityaman/lms/botalka/commons/Flow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ru.vityaman.lms.botalka.commons

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.timeout
import kotlin.time.Duration

@OptIn(FlowPreview::class)
fun <V> Flow<V>.takeDuring(duration: Duration): Flow<V> =
this.timeout(duration).catch {
if (it !is TimeoutCancellationException) {
throw it
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package ru.vityaman.lms.botalka.app.spring.logic
import io.kotest.common.runBlocking
import io.kotest.matchers.collections.shouldContainExactly
import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.timeout
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout
Expand All @@ -17,6 +15,7 @@ import ru.vityaman.lms.botalka.app.spring.BotalkaTestSuite
import ru.vityaman.lms.botalka.app.spring.env.FakeClock
import ru.vityaman.lms.botalka.app.spring.task.SpringPublicationSupplier
import ru.vityaman.lms.botalka.app.spring.task.SpringPublicationTask
import ru.vityaman.lms.botalka.commons.takeDuring
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.storage.HomeworkStorage
import java.time.OffsetDateTime
Expand Down Expand Up @@ -82,19 +81,13 @@ class PublicationTest(
}
}

@OptIn(FlowPreview::class)
private suspend fun published(): List<String> {
task.start()
val list = mutableListOf<String>()
publications.events().timeout(2.seconds).catch {
if (it !is TimeoutCancellationException) {
throw it
}
}
return publications.events()
.takeDuring(2.seconds)
.onEach { it.acknowledge() }
.onEach { list.add(it.payload.title.text) }
.collect { }
return list
.map { it.payload.title.text }
.toList(mutableListOf())
}

@Suppress("LongParameterList")
Expand Down

0 comments on commit b4333b3

Please sign in to comment.