Skip to content

Commit

Permalink
Implement Actor.invoke to return Fut
Browse files Browse the repository at this point in the history
  • Loading branch information
sghpjuikit committed Jan 14, 2025
1 parent 2e6a405 commit 800b188
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions src/util/main/sp/it/util/async/actor/Actor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package sp.it.util.async.actor

import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.BlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import org.jetbrains.annotations.Blocking
import sp.it.util.async.future.Fut
import sp.it.util.async.sleep
import sp.it.util.async.threadFactory
import sp.it.util.dev.ThreadSafe
import sp.it.util.functional.Option.Some
import sp.it.util.functional.runTry
import sp.it.util.functional.toUnit
Expand Down Expand Up @@ -51,6 +54,7 @@ class ActorVt<T>(

/** Close actor. New events are ignored. Queued events will be processed. Returns after all events are processed. */
@Blocking
@ThreadSafe
fun closeAndWait(): Unit {
isRunning = false
while (queue.isNotEmpty()) sleep(1)
Expand All @@ -71,15 +75,28 @@ class ActorSe<T>(
private val executor = ThreadPoolExecutor(0, 1, 10, TimeUnit.SECONDS, LinkedBlockingQueue(), threadFactory(name, false))

/** Send event for processing. Returns immediatelly. */
operator fun invoke(message: T): Unit =
executor.execute { actionSafe(message) }
operator fun invoke(message: T): Fut<Unit> =
Fut<Unit>(
CompletableFuture<Unit>().also { future ->
executor.execute {
runTry {
actionSafe(message)
}.ifOk {
future.complete(Unit)
}.ifError {
future.completeExceptionally(it)
}
}
}
)

/** Close actor. New events are ignored. Queued events will be processed. Returns immediatelly. */
fun close(): Unit =
executor.shutdown()

/** Close actor. New events are ignored. Queued events will be processed. Returns after all events are processed. */
@Blocking
@ThreadSafe
fun closeAndWait(): Unit =
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS).toUnit()

Expand Down

0 comments on commit 800b188

Please sign in to comment.