From b19029c9e2c8f4e2cf9d645dde4b6cca8cc62eac Mon Sep 17 00:00:00 2001 From: Kamil-Lontkowski Date: Wed, 15 Jan 2025 14:52:13 +0100 Subject: [PATCH] Bulkhead API --- .../main/scala/ox/resilience/Bulkhead.scala | 17 +++ .../scala/ox/resilience/BulkheadTest.scala | 111 ++++++++++++++++++ doc/utils/bulkhead.md | 50 ++++++++ 3 files changed, 178 insertions(+) create mode 100644 core/src/main/scala/ox/resilience/Bulkhead.scala create mode 100644 core/src/test/scala/ox/resilience/BulkheadTest.scala create mode 100644 doc/utils/bulkhead.md diff --git a/core/src/main/scala/ox/resilience/Bulkhead.scala b/core/src/main/scala/ox/resilience/Bulkhead.scala new file mode 100644 index 00000000..5b6c7896 --- /dev/null +++ b/core/src/main/scala/ox/resilience/Bulkhead.scala @@ -0,0 +1,17 @@ +package ox.resilience + +import java.util.concurrent.Semaphore +import scala.concurrent.duration.* + +case class Bulkhead(maxConcurrentCalls: Int): + private val semaphore = Semaphore(maxConcurrentCalls) + + def runOrDrop[T](operation: => T): Option[T] = + if semaphore.tryAcquire() then + try Some(operation) + finally semaphore.release() + else None + + def runOrDropWithTimeout[T](timeoutDuration: FiniteDuration)(operation: => T): Option[T] = + Option.when(semaphore.tryAcquire(timeoutDuration.toMillis, MILLISECONDS))(operation) +end Bulkhead diff --git a/core/src/test/scala/ox/resilience/BulkheadTest.scala b/core/src/test/scala/ox/resilience/BulkheadTest.scala new file mode 100644 index 00000000..63a7e5f3 --- /dev/null +++ b/core/src/test/scala/ox/resilience/BulkheadTest.scala @@ -0,0 +1,111 @@ +package ox.resilience + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.concurrent.duration.* +import ox.* +import ox.util.ElapsedTime +import org.scalatest.OptionValues + +class BulkheadTest extends AnyFlatSpec with Matchers with OptionValues with ElapsedTime: + + behavior of "Bulkhead operation run" + + it should "drop operation above maxConcurrentCalls" in { + val bulkHead = Bulkhead(2) + def f() = + sleep(1000.millis) + "result" + + var result1: Option[String] = None + var result2: Option[String] = None + var result3: Option[String] = None + supervised: + forkUserDiscard: + result1 = bulkHead.runOrDrop(f()) + forkUserDiscard: + result2 = bulkHead.runOrDrop(f()) + forkUserDiscard: + sleep(500.millis) + result3 = bulkHead.runOrDrop(f()) + + result1 shouldBe Some("result") + result2 shouldBe Some("result") + result3 shouldBe None + } + + it should "forward failure to user in case of failure" in { + val bulkHead = Bulkhead(2) + var counter = 0 + def f() = + sleep(1000.millis) + if counter > 0 then throw new RuntimeException("boom") + counter += 1 + "result" + + var result1: Option[String] = None + var result2: Option[Exception] = None + var result3: Option[String] = None + + supervised: + forkUserDiscard: + result1 = bulkHead.runOrDrop(f()) + forkUserDiscard: + sleep(200.millis) + result2 = Some(the[RuntimeException] thrownBy bulkHead.runOrDrop(f())) + forkUserDiscard: + sleep(400.millis) + result3 = bulkHead.runOrDrop(f()) + + result1 shouldBe Some("result") + result2.value.getMessage shouldBe "boom" + result3 shouldBe None + } + + behavior of "Bulkhead operation timeout" + + it should "block until acquisition is possible or timeout passes" in { + val bulkHead = Bulkhead(1) + def f() = + sleep(1000.millis) + "result" + + var duration: Option[Duration] = None + var result1: Option[String] = None + var result2: Option[String] = None + supervised: + forkUserDiscard: + result1 = bulkHead.runOrDrop(f()) + forkUserDiscard: + sleep(500.millis) + val (res, dur) = measure(bulkHead.runOrDropWithTimeout(2000.millis)(f())) + result2 = res + duration = Some(dur) + + result1 shouldBe Some("result") + duration.value.toMillis should be >= 1450L // Waiting for result1 to finish plus task time (minus 50 millis for tolerance) + result2 shouldBe Some("result") + } + + it should "respect timeout" in { + val bulkHead = Bulkhead(1) + def f() = + sleep(1000.millis) + "result" + + var duration: Option[Duration] = None + var result1: Option[String] = None + var result2: Option[String] = None + supervised: + forkUserDiscard: + result1 = bulkHead.runOrDrop(f()) + forkUserDiscard: + sleep(300.millis) + val (res, dur) = measure(bulkHead.runOrDropWithTimeout(500.millis)(f())) + result2 = res + duration = Some(dur) + + duration.value.toMillis should be >= 450L // 50 millis less for tolerance + result2 shouldBe None + } +end BulkheadTest diff --git a/doc/utils/bulkhead.md b/doc/utils/bulkhead.md new file mode 100644 index 00000000..63dccb81 --- /dev/null +++ b/doc/utils/bulkhead.md @@ -0,0 +1,50 @@ +# Bulkhead + +The bulkhead mechanism allows to constaint number of in flight operations made on one instance. For example to allow for maximum of 3 operation running at the same time. + +# API + +Bulkhead uses semaphore to determine if operation should be run, it exposes two methods: +- `def runOrDrop[T](operation: => T): Option[T]` +- `def runOrDropWithTimeout[T](timeoutDuration: FiniteDuration)(operation: => T): Option[T]` + +## Operation definition + +The `operation` can be provided directly using a by-name parameter, i.e. `f: => T`. +Since bulkhead does not need to handle errors there is no need for [ErrorMode](../basics/error-handling.md). + +## Examples + +```scala mdoc:compile-only +import scala.concurrent.duration.* +import ox.* +import ox.resilience.Bulkhead + +val bulkHead = Bulkhead(1) +def f() = + sleep(2000.millis) + "result" + +var result1: Option[String] = None +var result2: Option[String] = None +var result3: Option[String] = None +var result4: Option[String] = None + +supervised: + forkUserDiscard: + result1 = bulkHead.runOrDrop(f()) + forkUserDiscard: + sleep(500.millis) + result2 = bulkHead.runOrDrop(f()) + forkUserDiscard: + sleep(1000.millis) + result3 = bulkHead.runOrDropWithTimeout(2000.millis)(f()) + forkUserDiscard: + sleep(1000.millis) + result4 = bulkHead.runOrDropWithTimeout(500.millis)(f()) + +result1 // Some("result") - completed first +result2 // None - exceeded maxConcurrentCalls, so the operation was dropped +result3 // Some("result") - completed after waiting for 1000 millis +result4 // None - timed out +``` \ No newline at end of file