Skip to content

Commit

Permalink
more spell corrections, docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil-Lontkowski committed Jan 27, 2025
1 parent dc6d93e commit 5183768
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 38 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/ox/resilience/CircuitBreaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ private[resilience] case class Metrics(
private[resilience] case class AcquireResult(acquired: Boolean, circuitState: CircuitBreakerState)

private case class CircuitBreakerStateMachineConfig(
failureRateThreshold: Int,
slowCallThreshold: Int,
failureRateThreshold: PercentageThreshold,
slowCallThreshold: PercentageThreshold,
slowCallDurationThreshold: FiniteDuration,
minimumNumberOfCalls: Int,
numberOfCallsInHalfOpenState: Int,
Expand All @@ -47,8 +47,8 @@ private case class CircuitBreakerStateMachineConfig(
private object CircuitBreakerStateMachineConfig:
def fromConfig(c: CircuitBreakerConfig): CircuitBreakerStateMachineConfig =
CircuitBreakerStateMachineConfig(
failureRateThreshold = c.failureRateThreshold.toInt,
slowCallThreshold = c.slowCallThreshold.toInt,
failureRateThreshold = c.failureRateThreshold,
slowCallThreshold = c.slowCallThreshold,
slowCallDurationThreshold = c.slowCallDurationThreshold,
minimumNumberOfCalls = c.minimumNumberOfCalls,
numberOfCallsInHalfOpenState = c.numberOfCallsInHalfOpenState,
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/ox/resilience/CircuitBreakerConfig.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
package ox.resilience

import scala.concurrent.duration.*
import java.util.concurrent.TimeUnit

/** Allows to configure how [[Metrics]] will be calculated
*/
enum SlidingWindow:
/** Window counting last n operations when calculating metrics.
* @param windowSize
* number of last n results recored.
* number of last n results recorded.
*/
case CountBased(windowSize: Int)

/** Window counting operations in the lapse of `duraiton` before current time.
/** Window counting operations in the lapse of `duration` before current time.
* @param duration
* span of time where results are considered for including in metrics.
* span of time in which results are included in metrics.
*/
case TimeBased(duration: FiniteDuration)
end SlidingWindow

/** Type representing percentage threshold between 0 and 100 */
opaque type PercentageThreshold = Int

extension (c: PercentageThreshold) def toInt: Int = c
extension (c: PercentageThreshold)
def toInt: Int = c
def isExceeded(by: Int): Boolean = by >= c

object PercentageThreshold:
def apply(c: Int): PercentageThreshold =
Expand Down Expand Up @@ -68,11 +69,11 @@ object CircuitBreakerConfig:
def default: CircuitBreakerConfig = CircuitBreakerConfig(
failureRateThreshold = PercentageThreshold(50),
slowCallThreshold = PercentageThreshold(50),
slowCallDurationThreshold = 60.seconds,
slowCallDurationThreshold = 10.seconds,
slidingWindow = SlidingWindow.CountBased(100),
minimumNumberOfCalls = 20,
waitDurationOpenState = FiniteDuration(10, TimeUnit.SECONDS),
halfOpenTimeoutDuration = FiniteDuration(0, TimeUnit.MILLISECONDS),
waitDurationOpenState = 10.seconds,
halfOpenTimeoutDuration = 0.millis,
numberOfCallsInHalfOpenState = 10
)
end CircuitBreakerConfig
18 changes: 11 additions & 7 deletions core/src/main/scala/ox/resilience/CircuitBreakerStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[resilience] case class CircuitBreakerStateMachine(
updateAfter(config.waitDurationOpenState, selfRef)
case (
CircuitBreakerState.Open(_) | CircuitBreakerState.Closed(_),
CircuitBreakerState.HalfOpen(since, semaphore, completedOperations)
CircuitBreakerState.HalfOpen(_, _, _)
) =>
// schedule timeout for halfOpen state if is not 0
if config.halfOpenTimeoutDuration.toMillis != 0 then updateAfter(config.halfOpenTimeoutDuration, selfRef)
Expand Down Expand Up @@ -71,10 +71,11 @@ private[resilience] object CircuitBreakerStateMachine:

def nextState(metrics: Metrics, currentState: CircuitBreakerState, config: CircuitBreakerStateMachineConfig): CircuitBreakerState =
val currentTimestamp = metrics.timestamp
val exceededThreshold = (metrics.failureRate >= config.failureRateThreshold || metrics.slowCallsRate >= config.slowCallThreshold)
val exceededThreshold =
config.failureRateThreshold.isExceeded(metrics.failureRate) || config.slowCallThreshold.isExceeded(metrics.slowCallsRate)
val minCallsRecorder = metrics.operationsInWindow >= config.minimumNumberOfCalls
currentState match
case self @ CircuitBreakerState.Closed(since) =>
case self @ CircuitBreakerState.Closed(_) =>
if minCallsRecorder && exceededThreshold then
if config.waitDurationOpenState.toMillis == 0 then
CircuitBreakerState.HalfOpen(currentTimestamp, Semaphore(config.numberOfCallsInHalfOpenState))
Expand Down Expand Up @@ -110,6 +111,9 @@ private[resilience] sealed trait CircuitBreakerResults(using val ox: Ox):
def calculateMetrics(lastAcquisitionResult: Option[AcquireResult], timestamp: Long): Metrics

private[resilience] object CircuitBreakerResults:
private object Percentage:
def of(observed: Int, size: Int): Int = ((observed / size.toFloat) * 100).toInt

case class CountBased(windowSize: Int)(using ox: Ox) extends CircuitBreakerResults(using ox):
private val results = new collection.mutable.ArrayDeque[CircuitBreakerResult](windowSize + 1)
private var slowCalls = 0
Expand Down Expand Up @@ -141,8 +145,8 @@ private[resilience] object CircuitBreakerResults:

def calculateMetrics(lastAcquisitionResult: Option[AcquireResult], timestamp: Long): Metrics =
val numOfOperations = results.length
val failuresRate = ((failedCalls / numOfOperations.toFloat) * 100).toInt
val slowRate = ((slowCalls / numOfOperations.toFloat) * 100).toInt
val failuresRate = Percentage.of(failedCalls, numOfOperations)
val slowRate = Percentage.of(slowCalls, numOfOperations)
Metrics(
failuresRate,
slowRate,
Expand Down Expand Up @@ -176,8 +180,8 @@ private[resilience] object CircuitBreakerResults:
case CircuitBreakerResult.Slow => slowCalls -= 1
}
val numOfOperations = results.length
val failuresRate = ((failedCalls / numOfOperations.toFloat) * 100).toInt
val slowRate = ((slowCalls / numOfOperations.toFloat) * 100).toInt
val failuresRate = Percentage.of(failedCalls, numOfOperations)
val slowRate = Percentage.of(slowCalls, numOfOperations)
Metrics(
failuresRate,
slowRate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,33 @@ class CircuitBreakerStateMachineTest extends AnyFlatSpec with Matchers:
// given
val config = defaultConfig
val stateMachine = CircuitBreakerStateMachine(config)
val currentTimstamp = System.currentTimeMillis()
val currentTimestamp = System.currentTimeMillis()
val lastResult: Option[AcquireResult] = None
val metrics =
Metrics(hundredPercentSuccessRate, hundredPercentSuccessRate, config.minimumNumberOfCalls, lastResult, currentTimstamp)
Metrics(hundredPercentSuccessRate, hundredPercentSuccessRate, config.minimumNumberOfCalls, lastResult, currentTimestamp)

// when
val resultingState = CircuitBreakerStateMachine.nextState(metrics, CircuitBreakerState.Closed(currentTimstamp), stateMachine.config)
val resultingState = CircuitBreakerStateMachine.nextState(metrics, CircuitBreakerState.Closed(currentTimestamp), stateMachine.config)

resultingState shouldBe a[CircuitBreakerState.Closed]
}

it should "go to open after surpasing failure threshold" in supervised {
it should "go to open after surpassing failure threshold" in supervised {
// given
val config = defaultConfig
val stateMachine = CircuitBreakerStateMachine(config)
val currentTimstamp = System.currentTimeMillis()
val currentTimestamp = System.currentTimeMillis()
val lastResult: Option[AcquireResult] = None
val metrics = Metrics(badFailureRate, hundredPercentSuccessRate, config.minimumNumberOfCalls, lastResult, currentTimstamp)
val metrics = Metrics(badFailureRate, hundredPercentSuccessRate, config.minimumNumberOfCalls, lastResult, currentTimestamp)

// when
val resultingState = CircuitBreakerStateMachine.nextState(metrics, CircuitBreakerState.Closed(currentTimstamp), stateMachine.config)
val resultingState = CircuitBreakerStateMachine.nextState(metrics, CircuitBreakerState.Closed(currentTimestamp), stateMachine.config)

// then
resultingState shouldBe a[CircuitBreakerState.Open]
}

it should "go straight to half open after surpasing failure threshold with defined waitDurationOpenState = 0" in supervised {
it should "go straight to half open after surpassing failure threshold with defined waitDurationOpenState = 0" in supervised {
// given
val config = defaultConfig.copy(waitDurationOpenState = FiniteDuration(0, TimeUnit.MILLISECONDS))
val stateMachine = CircuitBreakerStateMachine(config)
Expand Down Expand Up @@ -78,7 +78,7 @@ class CircuitBreakerStateMachineTest extends AnyFlatSpec with Matchers:
resultingState shouldBe a[CircuitBreakerState.Open]
}

it should "update counter of completed operations in halfopen state" in supervised {
it should "update counter of completed operations in halfOpen state" in supervised {
// given
val config = defaultConfig
val stateMachine = CircuitBreakerStateMachine(config)
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/ox/resilience/CircuitBreakerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit
result2.value.value shouldBe "success"
}

it should "drop operation after exceeding fauilure threshold" in supervised {
it should "drop operation after exceeding failure threshold" in supervised {
// given
val thresholdRate = PercentageThreshold(100)
val numberOfOperations = 1
Expand Down Expand Up @@ -94,7 +94,7 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit

behavior of "Circuit Breaker scheduled state changes"

it should "switch to halfopen after configured time" in supervised {
it should "switch to halfOpen after configured time" in supervised {
// given
val thresholdRate = PercentageThreshold(100)
val numberOfOperations = 1
Expand Down Expand Up @@ -256,7 +256,7 @@ class CircuitBreakerTest extends AnyFlatSpec with Matchers with OptionValues wit
sleep(500.millis)
circuitBreaker.runOrDropEither(Right("c")).discard
sleep(100.millis) // wait for state to register
// Should go back to closed, we have one succesful operation
// Should go back to closed, we have one successful operation
circuitBreaker.stateMachine.state shouldBe a[CircuitBreakerState.Closed]
}

Expand Down
53 changes: 46 additions & 7 deletions doc/utils/circuit-breaker.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Circuit Breaker

A circuit breaker is used to provide stability and prevent cascading failures in distributed systems.
These should be used with other mechanisms (such as timeouts or rate limiters) to prevent the failure of a single component from bringing down all components.
The Circuit Breaker can proactively identify unresponsive services and prevent repeated attempts.

The circuit breaker allows controlling execution of operations and stops if certain condition are met. CircuitBreaker is thread-safe and can be used in concurrent scenarios.

## API
Expand All @@ -17,6 +21,12 @@ supervised:
val operationResult: Option[T] = circuitBreaker.runOrDrop(operation)
```

The CircuitBreaker is a finite state machine with three states: `Closed`, `Open` and `HalfOpen`.
- While in `Open` state - all calls are dropped.
- In `Closed` state - calls are accepted.
- In `HalfOpen` state - only configured number of call can be started and depending on their results state can go back to `Open` or `Closed`. See [conditions for state change](#conditions-for-state-change).


## Configuration

Many config parameters relate to calculated metrics. Those metrics are percentage of calls that failed and percentage of calls that exceeded `slowCallDurationThreshold`.
Expand All @@ -31,12 +41,12 @@ There are two ways that metrics are calculated.

### Failure rate and slow call rate thresholds

The state of the CircuitBreaker changes from `Closed` to `Open` when the `failureRate` is greater or equal to configurable threshold. For example when 80% of recorded call results failed.
The state of the CircuitBreaker changes from `Closed` to `Open` when the failure rate is greater or equal to configurable threshold. For example when 80% of recorded call results failed.
Failures are counted based on provided `ErrorMode`. For example any exception that is thrown by the operation, when using the direct, "unwrapped" API or any `Left` variant when using `runOrDropEither`.

The same state change also happen when percentage of slow calls (exceeding configurable `slowCallDurationThreshold`) is equal or greater than configured threshold. For example 80% of calls took longer then 10 seconds.

Those metrics are considered only when number of recorder calls is greater or equal to `minimumNumberOfCalls`, otherwise we don't change state even if `failureRate` is 100%.
Those metrics are considered only when number of recorder calls is greater or equal to `minimumNumberOfCalls`, otherwise we don't change state even if failure rate is 100%.

### Parameters

Expand All @@ -52,18 +62,47 @@ Those metrics are considered only when number of recorder calls is greater or eq
`SlidingWindow` variants:

- `CountBased(windowSize: Int)` - This variant calculates metrics based on last n results of calls recorded. These statistics are cleared on every state change.
- `TimeBased(duration: FiniteDuration)` - This variant calculates metrics of operations in the lapse of `duraiton` before current time. These statistics are cleared on every state change.
- `TimeBased(duration: FiniteDuration)` - This variant calculates metrics of operations in the lapse of `duration` before current time. These statistics are cleared on every state change.

### Providing configuration

CircuitBreaker can be configured during instantiation by providing `CircuitBreakerConfig`.

```scala mdoc:compile-only
import ox.supervised
import ox.resilience.*
import scala.concurrent.duration.*

supervised:
// using default config
CircuitBreaker(CircuitBreakerConfig.default)

// custom config
val config = CircuitBreakerConfig(
failureRateThreshold = PercentageThreshold(50),
slowCallThreshold = PercentageThreshold(50),
slowCallDurationThreshold = 10.seconds,
slidingWindow = SlidingWindow.CountBased(100),
minimumNumberOfCalls = 20,
waitDurationOpenState = 10.seconds,
halfOpenTimeoutDuration = 0.millis,
numberOfCallsInHalfOpenState = 10
)

// providing config for CircuitBreaker instance
CircuitBreaker(config)
```

Values defined in `CircuitBreakerConfig.default`:

```
failureRateThreshold = PercentageThreshold(50)
slowCallThreshold = PercentageThreshold(50)
slowCallDurationThreshold = 60.seconds
slowCallDurationThreshold = 10.seconds
slidingWindow = SlidingWindow.CountBased(100)
minimumNumberOfCalls = 20
waitDurationOpenState = FiniteDuration(10, TimeUnit.SECONDS)
halfOpenTimeoutDuration = FiniteDuration(0, TimeUnit.MILLISECONDS)
waitDurationOpenState = 10.seconds,
halfOpenTimeoutDuration = 0.millis,
numberOfCallsInHalfOpenState = 10
```

Expand All @@ -79,7 +118,7 @@ numberOfCallsInHalfOpenState = 10


```{note}
CircuitBreaker uses actor internally and since actor executes on one thread this may be bottleneck. That means that calculating state change can be deleyad and breaker can let few more operations to complete before openning.
CircuitBreaker uses actor internally and since actor executes on one thread this may be bottleneck. That means that calculating state change can be delayed and breaker can let few more operations to complete before opening.
This can be the case with many very fast operations.
```

Expand Down

0 comments on commit 5183768

Please sign in to comment.