Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulkhead #269

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/src/main/scala/ox/resilience/Bulkhead.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ox.resilience

import java.util.concurrent.Semaphore
import scala.concurrent.duration.*

case class Bulkhead(maxConcurrentCalls: Int):
private val semaphore = Semaphore(maxConcurrentCalls)

def runOrDrop[T](operation: => T): Option[T] =
if semaphore.tryAcquire() then
try Some(operation)
finally semaphore.release()
else None

def runOrDropWithTimeout[T](timeoutDuration: FiniteDuration)(operation: => T): Option[T] =
Option.when(semaphore.tryAcquire(timeoutDuration.toMillis, MILLISECONDS))(operation)
end Bulkhead
111 changes: 111 additions & 0 deletions core/src/test/scala/ox/resilience/BulkheadTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package ox.resilience

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.concurrent.duration.*
import ox.*
import ox.util.ElapsedTime
import org.scalatest.OptionValues

class BulkheadTest extends AnyFlatSpec with Matchers with OptionValues with ElapsedTime:

behavior of "Bulkhead operation run"

it should "drop operation above maxConcurrentCalls" in {
val bulkHead = Bulkhead(2)
def f() =
sleep(1000.millis)
"result"

var result1: Option[String] = None
var result2: Option[String] = None
var result3: Option[String] = None
supervised:
forkUserDiscard:
result1 = bulkHead.runOrDrop(f())
forkUserDiscard:
result2 = bulkHead.runOrDrop(f())
forkUserDiscard:
sleep(500.millis)
result3 = bulkHead.runOrDrop(f())

result1 shouldBe Some("result")
result2 shouldBe Some("result")
result3 shouldBe None
}

it should "forward failure to user in case of failure" in {
val bulkHead = Bulkhead(2)
var counter = 0
def f() =
sleep(1000.millis)
if counter > 0 then throw new RuntimeException("boom")
counter += 1
"result"

var result1: Option[String] = None
var result2: Option[Exception] = None
var result3: Option[String] = None

supervised:
forkUserDiscard:
result1 = bulkHead.runOrDrop(f())
forkUserDiscard:
sleep(200.millis)
result2 = Some(the[RuntimeException] thrownBy bulkHead.runOrDrop(f()))
forkUserDiscard:
sleep(400.millis)
result3 = bulkHead.runOrDrop(f())

result1 shouldBe Some("result")
result2.value.getMessage shouldBe "boom"
result3 shouldBe None
}

behavior of "Bulkhead operation timeout"

it should "block until acquisition is possible or timeout passes" in {
val bulkHead = Bulkhead(1)
def f() =
sleep(1000.millis)
"result"

var duration: Option[Duration] = None
var result1: Option[String] = None
var result2: Option[String] = None
supervised:
forkUserDiscard:
result1 = bulkHead.runOrDrop(f())
forkUserDiscard:
sleep(500.millis)
val (res, dur) = measure(bulkHead.runOrDropWithTimeout(2000.millis)(f()))
result2 = res
duration = Some(dur)

result1 shouldBe Some("result")
duration.value.toMillis should be >= 1450L // Waiting for result1 to finish plus task time (minus 50 millis for tolerance)
result2 shouldBe Some("result")
}

it should "respect timeout" in {
val bulkHead = Bulkhead(1)
def f() =
sleep(1000.millis)
"result"

var duration: Option[Duration] = None
var result1: Option[String] = None
var result2: Option[String] = None
supervised:
forkUserDiscard:
result1 = bulkHead.runOrDrop(f())
forkUserDiscard:
sleep(300.millis)
val (res, dur) = measure(bulkHead.runOrDropWithTimeout(500.millis)(f()))
result2 = res
duration = Some(dur)

duration.value.toMillis should be >= 450L // 50 millis less for tolerance
result2 shouldBe None
}
end BulkheadTest
50 changes: 50 additions & 0 deletions doc/utils/bulkhead.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Bulkhead

The bulkhead mechanism allows to constaint number of in flight operations made on one instance. For example to allow for maximum of 3 operation running at the same time.

# API

Bulkhead uses semaphore to determine if operation should be run, it exposes two methods:
- `def runOrDrop[T](operation: => T): Option[T]`
- `def runOrDropWithTimeout[T](timeoutDuration: FiniteDuration)(operation: => T): Option[T]`

## Operation definition

The `operation` can be provided directly using a by-name parameter, i.e. `f: => T`.
Since bulkhead does not need to handle errors there is no need for [ErrorMode](../basics/error-handling.md).

## Examples

```scala mdoc:compile-only
import scala.concurrent.duration.*
import ox.*
import ox.resilience.Bulkhead

val bulkHead = Bulkhead(1)
def f() =
sleep(2000.millis)
"result"

var result1: Option[String] = None
var result2: Option[String] = None
var result3: Option[String] = None
var result4: Option[String] = None

supervised:
forkUserDiscard:
result1 = bulkHead.runOrDrop(f())
forkUserDiscard:
sleep(500.millis)
result2 = bulkHead.runOrDrop(f())
forkUserDiscard:
sleep(1000.millis)
result3 = bulkHead.runOrDropWithTimeout(2000.millis)(f())
forkUserDiscard:
sleep(1000.millis)
result4 = bulkHead.runOrDropWithTimeout(500.millis)(f())

result1 // Some("result") - completed first
result2 // None - exceeded maxConcurrentCalls, so the operation was dropped
result3 // Some("result") - completed after waiting for 1000 millis
result4 // None - timed out
```