diff --git a/README.md b/README.md index ec915893..e84111bf 100644 --- a/README.md +++ b/README.md @@ -375,7 +375,6 @@ There are some helper methods which might be useful when writing forked code: * `forever { ... }` repeatedly evaluates the given code block forever * `repeatWhile { ... }` repeatedly evaluates the given code block, as long as it returns `true` -* `retry(times, sleep) { ... }` retries the given block up to the given number of times * `uninterruptible { ... }` evaluates the given code block making sure it can't be interrupted ## Syntax @@ -694,6 +693,10 @@ Please see [the respective ADR](doc/adr/0001-error-propagation-in-channels.md) f Channels are back-pressured, as the `.send` operation is blocking until there's a receiver thread available, or if there's enough space in the buffer. The processing space is bound by the total size of channel buffers. +## Retries + +The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms delay between attempts). See the [full docs](doc/retries.md) for details. + ## Kafka sources & drains Dependency: diff --git a/core/src/main/scala/ox/control.scala b/core/src/main/scala/ox/control.scala index f00517a5..1ef73b3a 100644 --- a/core/src/main/scala/ox/control.scala +++ b/core/src/main/scala/ox/control.scala @@ -1,7 +1,5 @@ package ox -import scala.concurrent.duration.FiniteDuration - def forever(f: => Unit): Nothing = while true do f throw new RuntimeException("can't get here") @@ -16,25 +14,6 @@ def repeatUntil(f: => Boolean): Unit = var loop = true while loop do loop = !f -// TODO: retry schedules -def retry[T](times: Int, sleep: FiniteDuration)(f: => T): T = - try f - catch - case e: Throwable => - if times > 0 - then - Thread.sleep(sleep.toMillis) - retry(times - 1, sleep)(f) - else throw e - -def retryEither[E, T](times: Int, sleep: FiniteDuration)(f: => Either[E, T]): Either[E, T] = - f match - case r: Right[E, T] => r - case Left(_) if times > 0 => - Thread.sleep(sleep.toMillis) - retry(times - 1, sleep)(f) - case l: Left[E, T] => l - def uninterruptible[T](f: => T): T = scoped { val t = fork(f) diff --git a/core/src/main/scala/ox/retry/Jitter.scala b/core/src/main/scala/ox/retry/Jitter.scala new file mode 100644 index 00000000..58d5732c --- /dev/null +++ b/core/src/main/scala/ox/retry/Jitter.scala @@ -0,0 +1,25 @@ +package ox.retry + +/** A random factor used for calculating the delay between subsequent retries when a backoff strategy is used for calculating the delay. + * + * The purpose of jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at + * the same time - which can result in subsequent failures, contrary to what you would expect from retrying. By introducing randomness to + * the delays, the retries become more evenly distributed over time. + * + * See the AWS Architecture Blog article on backoff and + * jitter for a more in-depth explanation. + * + * Depending on the algorithm, the jitter can affect the delay in different ways - see the concrete variants for more details. + */ +enum Jitter: + /** No jitter, i.e. the delay just uses an exponential backoff with no adjustments. */ + case None + + /** Full jitter, i.e. the delay is a random value between 0 and the calculated backoff delay. */ + case Full + + /** Equal jitter, i.e. the delay is half of the calculated backoff delay plus a random value between 0 and the other half. */ + case Equal + + /** Decorrelated jitter, i.e. the delay is a random value between the initial delay and the last delay multiplied by 3. */ + case Decorrelated diff --git a/core/src/main/scala/ox/retry/ResultPolicy.scala b/core/src/main/scala/ox/retry/ResultPolicy.scala new file mode 100644 index 00000000..ac91c2ae --- /dev/null +++ b/core/src/main/scala/ox/retry/ResultPolicy.scala @@ -0,0 +1,37 @@ +package ox.retry + +/** A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which allows + * for failing fast on certain errors). + * + * @param isSuccess + * A function that determines whether a non-erroneous result is considered successful. By default, every non-erroneous result is + * considered successful. + * @param isWorthRetrying + * A function that determines whether an error is worth retrying. By default, all errors are retried. + * @tparam E + * The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning + * an `Either[E, T]`, this can be any `E`. + * @tparam T + * The successful result type for the operation. + */ +case class ResultPolicy[E, T](isSuccess: T => Boolean = (_: T) => true, isWorthRetrying: E => Boolean = (_: E) => true) + +object ResultPolicy: + /** A policy that considers every non-erroneous result successful and retries on any error. */ + def default[E, T]: ResultPolicy[E, T] = ResultPolicy() + + /** A policy that customizes when a non-erroneous result is considered successful, and retries all errors + * + * @param isSuccess + * A predicate that indicates whether a non-erroneous result is considered successful. + */ + def successfulWhen[E, T](isSuccess: T => Boolean): ResultPolicy[E, T] = ResultPolicy(isSuccess = isSuccess) + + /** A policy that customizes which errors are retried, and considers every non-erroneous result successful + * @param isWorthRetrying + * A predicate that indicates whether an erroneous result should be retried.. + */ + def retryWhen[E, T](isWorthRetrying: E => Boolean): ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = isWorthRetrying) + + /** A policy that considers every non-erroneous result successful and never retries any error, i.e. fails fast */ + def neverRetry[E, T]: ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = _ => false) diff --git a/core/src/main/scala/ox/retry/RetryPolicy.scala b/core/src/main/scala/ox/retry/RetryPolicy.scala new file mode 100644 index 00000000..3ba75601 --- /dev/null +++ b/core/src/main/scala/ox/retry/RetryPolicy.scala @@ -0,0 +1,106 @@ +package ox.retry + +import scala.concurrent.duration.* + +/** A policy that defines how to retry a failed operation. + * + * @param schedule + * The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation. + * See [[Schedule]] for more details. + * @param resultPolicy + * A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which + * allows for failing fast on certain errors). See [[ResultPolicy]] for more details. + * @tparam E + * The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning + * an `Either[E, T]`, this can be any `E`. + * @tparam T + * The successful result type for the operation. + */ +case class RetryPolicy[E, T](schedule: Schedule, resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T]) + +object RetryPolicy: + /** Creates a policy that retries up to a given number of times, with no delay between subsequent attempts, using a default + * [[ResultPolicy]]. + * + * This is a shorthand for {{{RetryPolicy(Schedule.Immediate(maxRetries))}}} + * + * @param maxRetries + * The maximum number of retries. + */ + def immediate[E, T](maxRetries: Int): RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate(maxRetries)) + + /** Creates a policy that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]]. + * + * This is a shorthand for {{{RetryPolicy(Schedule.Immediate.forever)}}} + */ + def immediateForever[E, T]: RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate.forever) + + /** Creates a policy that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default + * [[ResultPolicy]]. + * + * This is a shorthand for {{{RetryPolicy(Schedule.Delay(maxRetries, delay))}}} + * + * @param maxRetries + * The maximum number of retries. + * @param delay + * The delay between subsequent attempts. + */ + def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay(maxRetries, delay)) + + /** Creates a policy that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]]. + * + * This is a shorthand for {{{RetryPolicy(Schedule.Delay.forever(delay))}}} + * + * @param delay + * The delay between subsequent attempts. + */ + def delayForever[E, T](delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay.forever(delay)) + + /** Creates a policy that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a + * default [[ResultPolicy]]. + * + * The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay + * and capped at the given maximum delay. + * + * This is a shorthand for {{{RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}} + * + * @param maxRetries + * The maximum number of retries. + * @param initialDelay + * The delay before the first retry. + * @param maxDelay + * The maximum delay between subsequent retries. Defaults to 1 minute. + * @param jitter + * A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter, + * i.e. an exponential backoff with no adjustments. + */ + def backoff[E, T]( + maxRetries: Int, + initialDelay: FiniteDuration, + maxDelay: FiniteDuration = 1.minute, + jitter: Jitter = Jitter.None + ): RetryPolicy[E, T] = + RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter)) + + /** Creates a policy that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default + * [[ResultPolicy]]. + * + * The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay + * and capped at the given maximum delay. + * + * This is a shorthand for {{{RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}} + * + * @param initialDelay + * The delay before the first retry. + * @param maxDelay + * The maximum delay between subsequent retries. Defaults to 1 minute. + * @param jitter + * A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter, + * i.e. an exponential backoff with no adjustments. + */ + def backoffForever[E, T]( + initialDelay: FiniteDuration, + maxDelay: FiniteDuration = 1.minute, + jitter: Jitter = Jitter.None + ): RetryPolicy[E, T] = + RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter)) diff --git a/core/src/main/scala/ox/retry/Schedule.scala b/core/src/main/scala/ox/retry/Schedule.scala new file mode 100644 index 00000000..24a18361 --- /dev/null +++ b/core/src/main/scala/ox/retry/Schedule.scala @@ -0,0 +1,119 @@ +package ox.retry + +import scala.concurrent.duration.* +import scala.util.Random + +private[retry] sealed trait Schedule: + def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration + +object Schedule: + + private[retry] sealed trait Finite extends Schedule: + def maxRetries: Int + + private[retry] sealed trait Infinite extends Schedule + + /** A schedule that retries up to a given number of times, with no delay between subsequent attempts. + * + * @param maxRetries + * The maximum number of retries. + */ + case class Immediate(maxRetries: Int) extends Finite: + override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero + + object Immediate: + /** A schedule that retries indefinitely, with no delay between subsequent attempts. */ + def forever: Infinite = ImmediateForever + + private case object ImmediateForever extends Infinite: + override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero + + /** A schedule that retries up to a given number of times, with a fixed delay between subsequent attempts. + * + * @param maxRetries + * The maximum number of retries. + * @param delay + * The delay between subsequent attempts. + */ + case class Delay(maxRetries: Int, delay: FiniteDuration) extends Finite: + override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay + + object Delay: + /** A schedule that retries indefinitely, with a fixed delay between subsequent attempts. + * + * @param delay + * The delay between subsequent attempts. + */ + def forever(delay: FiniteDuration): Infinite = DelayForever(delay) + + case class DelayForever private[retry] (delay: FiniteDuration) extends Infinite: + override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay + + /** A schedule that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts. + * + * The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay + * and capped at the given maximum delay. + * + * @param maxRetries + * The maximum number of retries. + * @param initialDelay + * The delay before the first retry. + * @param maxDelay + * The maximum delay between subsequent retries. + * @param jitter + * A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter, + * i.e. an exponential backoff with no adjustments. + */ + case class Backoff( + maxRetries: Int, + initialDelay: FiniteDuration, + maxDelay: FiniteDuration = 1.minute, + jitter: Jitter = Jitter.None + ) extends Finite: + override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = + Backoff.nextDelay(attempt, initialDelay, maxDelay, jitter, lastDelay) + + object Backoff: + private[retry] def delay(attempt: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration): FiniteDuration = + // converting Duration <-> Long back and forth to avoid exceeding maximum duration + (initialDelay.toMillis * Math.pow(2, attempt)).toLong.min(maxDelay.toMillis).millis + + private[retry] def nextDelay( + attempt: Int, + initialDelay: FiniteDuration, + maxDelay: FiniteDuration, + jitter: Jitter, + lastDelay: Option[FiniteDuration] + ): FiniteDuration = + def backoffDelay = Backoff.delay(attempt, initialDelay, maxDelay) + + jitter match + case Jitter.None => backoffDelay + case Jitter.Full => Random.between(0, backoffDelay.toMillis).millis + case Jitter.Equal => + val backoff = backoffDelay.toMillis + (backoff / 2 + Random.between(0, backoff / 2)).millis + case Jitter.Decorrelated => + val last = lastDelay.getOrElse(initialDelay).toMillis + Random.between(initialDelay.toMillis, last * 3).millis + + /** A schedule that retries indefinitely, with an increasing delay (backoff) between subsequent attempts. + * + * The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial + * delay and capped at the given maximum delay. + * + * @param initialDelay + * The delay before the first retry. + * @param maxDelay + * The maximum delay between subsequent retries. + * @param jitter + * A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter, + * i.e. an exponential backoff with no adjustments. + */ + def forever(initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None): Infinite = + BackoffForever(initialDelay, maxDelay, jitter) + + case class BackoffForever private[retry] (initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None) + extends Infinite: + override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = + Backoff.nextDelay(attempt, initialDelay, maxDelay, jitter, lastDelay) diff --git a/core/src/main/scala/ox/retry/retry.scala b/core/src/main/scala/ox/retry/retry.scala new file mode 100644 index 00000000..b72baafe --- /dev/null +++ b/core/src/main/scala/ox/retry/retry.scala @@ -0,0 +1,67 @@ +package ox.retry + +import scala.annotation.tailrec +import scala.concurrent.duration.* +import scala.util.Try + +/** Retries an operation returning a direct result until it succeeds or the policy decides to stop. + * + * @param operation + * The operation to retry. + * @param policy + * The retry policy - see [[RetryPolicy]]. + * @return + * The result of the function if it eventually succeeds. + * @throws anything + * The exception thrown by the last attempt if the policy decides to stop. + */ +def retry[T](operation: => T)(policy: RetryPolicy[Throwable, T]): T = + retry(Try(operation))(policy).get + +/** Retries an operation returning a [[scala.util.Try]] until it succeeds or the policy decides to stop. + * + * @param operation + * The operation to retry. + * @param policy + * The retry policy - see [[RetryPolicy]]. + * @return + * A [[scala.util.Success]] if the function eventually succeeds, or, otherwise, a [[scala.util.Failure]] with the error from the last + * attempt. + */ +def retry[T](operation: => Try[T])(policy: RetryPolicy[Throwable, T]): Try[T] = + retry(operation.toEither)(policy).toTry + +/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the policy decides to stop. + * + * @param operation + * The operation to retry. + * @param policy + * The retry policy - see [[RetryPolicy]]. + * @return + * A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last attempt. + */ +def retry[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Either[E, T] = + @tailrec + def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): Either[E, T] = + def sleepIfNeeded = + val delay = policy.schedule.nextDelay(attempt + 1, lastDelay).toMillis + if (delay > 0) Thread.sleep(delay) + delay + + operation match + case left @ Left(error) => + if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then + val delay = sleepIfNeeded + loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis)) + else left + case right @ Right(result) => + if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then + val delay = sleepIfNeeded + loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis)) + else right + + val remainingAttempts = policy.schedule match + case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRetries) + case _ => None + + loop(0, remainingAttempts, None) diff --git a/core/src/main/scala/ox/syntax.scala b/core/src/main/scala/ox/syntax.scala index 1bd203a7..e8ef624d 100644 --- a/core/src/main/scala/ox/syntax.scala +++ b/core/src/main/scala/ox/syntax.scala @@ -1,11 +1,16 @@ package ox +import ox.retry.RetryPolicy + import scala.concurrent.duration.FiniteDuration +import scala.util.Try object syntax: - extension [T](f: => T) - def forever: Fork[Nothing] = ox.forever(f) - def retry(times: Int, sleep: FiniteDuration): T = ox.retry(times, sleep)(f) + extension [T](f: => T) def forever: Fork[Nothing] = ox.forever(f) + + extension [T](f: => T) def retry(policy: RetryPolicy[Throwable, T]): T = ox.retry.retry(f)(policy) + extension [T](f: => Try[T]) def retry(policy: RetryPolicy[Throwable, T]): Try[T] = ox.retry.retry(f)(policy) + extension [E, T](f: => Either[E, T]) def retry(policy: RetryPolicy[E, T]): Either[E, T] = ox.retry.retry(f)(policy) extension [T](f: => T)(using Ox) def fork: Fork[T] = ox.fork(f) diff --git a/core/src/test/scala/ox/ElapsedTime.scala b/core/src/test/scala/ox/ElapsedTime.scala new file mode 100644 index 00000000..fc6c8ab7 --- /dev/null +++ b/core/src/test/scala/ox/ElapsedTime.scala @@ -0,0 +1,12 @@ +package ox + +import scala.concurrent.duration.* + +trait ElapsedTime { + + def measure[T](f: => T): (T, Duration) = + val before = System.nanoTime() + val result = f + val after = System.nanoTime() + (result, (after - before).nanos) +} diff --git a/core/src/test/scala/ox/channels/SourceOpsThrottleTest.scala b/core/src/test/scala/ox/channels/SourceOpsThrottleTest.scala index 6e7d8cd3..f2eb95f1 100644 --- a/core/src/test/scala/ox/channels/SourceOpsThrottleTest.scala +++ b/core/src/test/scala/ox/channels/SourceOpsThrottleTest.scala @@ -5,8 +5,9 @@ import org.scalatest.matchers.should.Matchers import ox.* import scala.concurrent.duration.* +import ox.ElapsedTime -class SourceOpsThrottleTest extends AnyFlatSpec with Matchers { +class SourceOpsThrottleTest extends AnyFlatSpec with Matchers with ElapsedTime { behavior of "Source.throttle" it should "not throttle the empty source" in supervised { @@ -36,10 +37,4 @@ class SourceOpsThrottleTest extends AnyFlatSpec with Matchers { s.throttle(1, 50.nanos) } should have message "requirement failed: per time must be >= 1 ms" } - - private def measure[T](f: => T): (T, Duration) = - val before = System.currentTimeMillis() - val result = f - val after = System.currentTimeMillis(); - (result, (after - before).millis) } diff --git a/core/src/test/scala/ox/retry/BackoffRetryTest.scala b/core/src/test/scala/ox/retry/BackoffRetryTest.scala new file mode 100644 index 00000000..d7034d40 --- /dev/null +++ b/core/src/test/scala/ox/retry/BackoffRetryTest.scala @@ -0,0 +1,135 @@ +package ox.retry + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{EitherValues, TryValues} +import ox.ElapsedTime +import ox.retry.* + +import scala.concurrent.duration.* +import scala.util.Failure + +class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues with ElapsedTime: + + behavior of "Backoff retry" + + it should "retry a function" in { + // given + val maxRetries = 3 + val initialDelay = 100.millis + var counter = 0 + def f = + counter += 1 + if true then throw new RuntimeException("boom") + + // when + val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(f)(RetryPolicy.backoff(maxRetries, initialDelay))) + + // then + result should have message "boom" + elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay) + counter shouldBe 4 + } + + it should "retry a failing function forever" in { + // given + var counter = 0 + val initialDelay = 100.millis + val retriesUntilSuccess = 1_000 + val successfulResult = 42 + + def f = + counter += 1 + if counter <= retriesUntilSuccess then throw new RuntimeException("boom") else successfulResult + + // when + val result = retry(f)(RetryPolicy.backoffForever(initialDelay, maxDelay = 2.millis)) + + // then + result shouldBe successfulResult + counter shouldBe retriesUntilSuccess + 1 + } + + it should "respect maximum delay" in { + // given + val maxRetries = 3 + val initialDelay = 100.millis + val maxDelay = 200.millis + var counter = 0 + def f = + counter += 1 + if true then throw new RuntimeException("boom") + + // when + val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(f)(RetryPolicy.backoff(maxRetries, initialDelay, maxDelay))) + + // then + result should have message "boom" + elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay, maxDelay) + elapsedTime.toMillis should be < initialDelay.toMillis + maxRetries * maxDelay.toMillis + counter shouldBe 4 + } + + it should "use jitter" in { + // given + val maxRetries = 3 + val initialDelay = 100.millis + val maxDelay = 200.millis + var counter = 0 + def f = + counter += 1 + if true then throw new RuntimeException("boom") + + // when + val (result, elapsedTime) = + measure(the[RuntimeException] thrownBy retry(f)(RetryPolicy.backoff(maxRetries, initialDelay, maxDelay, Jitter.Equal))) + + // then + result should have message "boom" + elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay, maxDelay) / 2 + elapsedTime.toMillis should be < initialDelay.toMillis + maxRetries * maxDelay.toMillis + counter shouldBe 4 + } + + it should "retry an Either" in { + // given + val maxRetries = 3 + val initialDelay = 100.millis + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Left(errorMessage) + + // when + val (result, elapsedTime) = measure(retry(f)(RetryPolicy.backoff(maxRetries, initialDelay))) + + // then + result.left.value shouldBe errorMessage + elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay) + counter shouldBe 4 + } + + it should "retry a Try" in { + // given + val maxRetries = 3 + val initialDelay = 100.millis + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Failure(new RuntimeException(errorMessage)) + + // when + val (result, elapsedTime) = measure(retry(f)(RetryPolicy.backoff(maxRetries, initialDelay))) + + // then + result.failure.exception should have message errorMessage + elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay) + counter shouldBe 4 + } + + private def expectedTotalBackoffTimeMillis(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.day): Long = + (0 until maxRetries).map(Schedule.Backoff.delay(_, initialDelay, maxDelay).toMillis).sum diff --git a/core/src/test/scala/ox/retry/DelayedRetryTest.scala b/core/src/test/scala/ox/retry/DelayedRetryTest.scala new file mode 100644 index 00000000..f87a30f8 --- /dev/null +++ b/core/src/test/scala/ox/retry/DelayedRetryTest.scala @@ -0,0 +1,91 @@ +package ox.retry + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{EitherValues, TryValues} +import ox.ElapsedTime +import ox.retry.* + +import scala.concurrent.duration.* +import scala.util.Failure + +class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues with ElapsedTime: + + behavior of "Delayed retry" + + it should "retry a function" in { + // given + val maxRetries = 3 + val sleep = 100.millis + var counter = 0 + def f = + counter += 1 + if true then throw new RuntimeException("boom") + + // when + val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(f)(RetryPolicy.delay(maxRetries, sleep))) + + // then + result should have message "boom" + elapsedTime.toMillis should be >= maxRetries * sleep.toMillis + counter shouldBe 4 + } + + it should "retry a failing function forever" in { + // given + var counter = 0 + val sleep = 2.millis + val retriesUntilSuccess = 1_000 + val successfulResult = 42 + + def f = + counter += 1 + if counter <= retriesUntilSuccess then throw new RuntimeException("boom") else successfulResult + + // when + val result = retry(f)(RetryPolicy.delayForever(sleep)) + + // then + result shouldBe successfulResult + counter shouldBe retriesUntilSuccess + 1 + } + + it should "retry an Either" in { + // given + val maxRetries = 3 + val sleep = 100.millis + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Left(errorMessage) + + // when + val (result, elapsedTime) = measure(retry(f)(RetryPolicy.delay(maxRetries, sleep))) + + // then + result.left.value shouldBe errorMessage + elapsedTime.toMillis should be >= maxRetries * sleep.toMillis + counter shouldBe 4 + } + + it should "retry a Try" in { + // given + val maxRetries = 3 + val sleep = 100.millis + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Failure(new RuntimeException(errorMessage)) + + // when + val (result, elapsedTime) = measure(retry(f)(RetryPolicy.delay(maxRetries, sleep))) + + // then + result.failure.exception should have message errorMessage + elapsedTime.toMillis should be >= maxRetries * sleep.toMillis + counter shouldBe 4 + } diff --git a/core/src/test/scala/ox/retry/ImmediateRetryTest.scala b/core/src/test/scala/ox/retry/ImmediateRetryTest.scala new file mode 100644 index 00000000..829845d1 --- /dev/null +++ b/core/src/test/scala/ox/retry/ImmediateRetryTest.scala @@ -0,0 +1,234 @@ +package ox.retry + +import org.scalatest.{EitherValues, TryValues} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.retry.* + +import scala.util.{Failure, Success} + +class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues with Matchers: + + behavior of "Immediate retry" + + it should "retry a succeeding function" in { + // given + var counter = 0 + val successfulResult = 42 + + def f = + counter += 1 + successfulResult + + // when + val result = retry(f)(RetryPolicy.immediate(3)) + + // then + result shouldBe successfulResult + counter shouldBe 1 + } + + it should "fail fast when a function is not worth retrying" in { + // given + var counter = 0 + val errorMessage = "boom" + val policy = RetryPolicy[Throwable, Unit](Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != errorMessage)) + + def f = + counter += 1 + if true then throw new RuntimeException(errorMessage) + + // when/then + the[RuntimeException] thrownBy retry(f)(policy) should have message errorMessage + counter shouldBe 1 + } + + it should "retry a succeeding function with a custom success condition" in { + // given + var counter = 0 + val unsuccessfulResult = -1 + val policy = RetryPolicy[Throwable, Int](Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)) + + def f = + counter += 1 + unsuccessfulResult + + // when + val result = retry(f)(policy) + + // then + result shouldBe unsuccessfulResult + counter shouldBe 4 + } + + it should "retry a failing function" in { + // given + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + if true then throw new RuntimeException(errorMessage) + + // when/then + the[RuntimeException] thrownBy retry(f)(RetryPolicy.immediate(3)) should have message errorMessage + counter shouldBe 4 + } + + it should "retry a failing function forever" in { + // given + var counter = 0 + val retriesUntilSuccess = 10_000 + val successfulResult = 42 + + def f = + counter += 1 + if counter <= retriesUntilSuccess then throw new RuntimeException("boom") else successfulResult + + // when + val result = retry(f)(RetryPolicy.immediateForever) + + // then + result shouldBe successfulResult + counter shouldBe retriesUntilSuccess + 1 + } + + it should "retry a succeeding Either" in { + // given + var counter = 0 + val successfulResult = 42 + + def f: Either[String, Int] = + counter += 1 + Right(successfulResult) + + // when + val result = retry(f)(RetryPolicy.immediate(3)) + + // then + result.value shouldBe successfulResult + counter shouldBe 1 + } + + it should "fail fast when an Either is not worth retrying" in { + // given + var counter = 0 + val errorMessage = "boom" + val policy: RetryPolicy[String, Int] = RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != errorMessage)) + + def f: Either[String, Int] = + counter += 1 + Left(errorMessage) + + // when + val result = retry(f)(policy) + + // then + result.left.value shouldBe errorMessage + counter shouldBe 1 + } + + it should "retry a succeeding Either with a custom success condition" in { + // given + var counter = 0 + val unsuccessfulResult = -1 + val policy: RetryPolicy[String, Int] = RetryPolicy(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)) + + def f: Either[String, Int] = + counter += 1 + Right(unsuccessfulResult) + + // when + val result = retry(f)(policy) + + // then + result.value shouldBe unsuccessfulResult + counter shouldBe 4 + } + + it should "retry a failing Either" in { + // given + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Left(errorMessage) + + // when + val result = retry(f)(RetryPolicy.immediate(3)) + + // then + result.left.value shouldBe errorMessage + counter shouldBe 4 + } + + it should "retry a succeeding Try" in { + // given + var counter = 0 + val successfulResult = 42 + + def f = + counter += 1 + Success(successfulResult) + + // when + val result = retry(f)(RetryPolicy.immediate(3)) + + // then + result.success.value shouldBe successfulResult + counter shouldBe 1 + } + + it should "fail fast when a Try is not worth retrying" in { + // given + var counter = 0 + val errorMessage = "boom" + val policy: RetryPolicy[Throwable, Int] = RetryPolicy(Schedule.Immediate(3), ResultPolicy.neverRetry) + + def f = + counter += 1 + Failure(new RuntimeException(errorMessage)) + + // when + val result = retry(f)(policy) + + // then + result.failure.exception should have message errorMessage + counter shouldBe 1 + } + + it should "retry a succeeding Try with a custom success condition" in { + // given + var counter = 0 + val unsuccessfulResult = -1 + val policy: RetryPolicy[Throwable, Int] = RetryPolicy(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0)) + + def f = + counter += 1 + Success(unsuccessfulResult) + + // when + val result = retry(f)(policy) + + // then + result.success.value shouldBe unsuccessfulResult + counter shouldBe 4 + } + + it should "retry a failing Try" in { + // given + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Failure(new RuntimeException(errorMessage)) + + // when + val result = retry(f)(RetryPolicy.immediate(3)) + + // then + result.failure.exception should have message errorMessage + counter shouldBe 4 + } diff --git a/core/src/test/scala/ox/retry/JitterTest.scala b/core/src/test/scala/ox/retry/JitterTest.scala new file mode 100644 index 00000000..a2d9dc72 --- /dev/null +++ b/core/src/test/scala/ox/retry/JitterTest.scala @@ -0,0 +1,68 @@ +package ox.retry + +import org.scalatest.Inspectors +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.duration.* + +class JitterTest extends AnyFlatSpec with Matchers { + + behavior of "Jitter" + + private val baseSchedule = Schedule.Backoff(maxRetries = 3, initialDelay = 100.millis) + + it should "use no jitter" in { + // given + val schedule = baseSchedule + + // when + val delays = (1 to 5).map(schedule.nextDelay(_, None)) + + // then + delays should contain theSameElementsInOrderAs Seq(200, 400, 800, 1600, 3200).map(_.millis) + } + + it should "use full jitter" in { + // given + val schedule = baseSchedule.copy(jitter = Jitter.Full) + + // when + val delays = (1 to 5).map(schedule.nextDelay(_, None)) + + // then + Inspectors.forEvery(delays.zipWithIndex) { case (delay, i) => + val backoffDelay = Schedule.Backoff.delay(i + 1, schedule.initialDelay, schedule.maxDelay) + delay should (be >= 0.millis and be <= backoffDelay) + } + } + + it should "use equal jitter" in { + // given + val schedule = baseSchedule.copy(jitter = Jitter.Equal) + + // when + val delays = (1 to 5).map(schedule.nextDelay(_, None)) + + // then + Inspectors.forEvery(delays.zipWithIndex) { case (delay, i) => + val backoffDelay = Schedule.Backoff.delay(i + 1, schedule.initialDelay, schedule.maxDelay) + delay should (be >= backoffDelay / 2 and be <= backoffDelay) + } + } + + it should "use decorrelated jitter" in { + // given + val schedule = baseSchedule.copy(jitter = Jitter.Decorrelated) + + // when + val delays = (1 to 5).map(schedule.nextDelay(_, None)) + + // then + Inspectors.forEvery(delays.sliding(2).toList) { + case Seq(previousDelay, delay) => + delay should (be >= schedule.initialDelay and be <= previousDelay * 3) + case _ => fail("should never happen") // so that the match is exhaustive + } + } +} diff --git a/core/src/test/scala/ox/retry/RetrySyntaxTest.scala b/core/src/test/scala/ox/retry/RetrySyntaxTest.scala new file mode 100644 index 00000000..214a93f6 --- /dev/null +++ b/core/src/test/scala/ox/retry/RetrySyntaxTest.scala @@ -0,0 +1,60 @@ +package ox.retry + +import org.scalatest.{EitherValues, TryValues} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.syntax.* + +import scala.util.Failure + +class RetrySyntaxTest extends AnyFlatSpec with Matchers with TryValues with EitherValues: + + behavior of "Retry syntax" + + it should "support operations that return a result directly" in { + // given + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + if true then throw new RuntimeException(errorMessage) + + // when/then + the[RuntimeException] thrownBy f.retry(RetryPolicy.immediate(3)) should have message errorMessage + counter shouldBe 4 + } + + it should "support operations that return a Try" in { + // given + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Failure(new RuntimeException(errorMessage)) + + // when + val result = f.retry(RetryPolicy.immediate(3)) + + // then + result.failure.exception should have message errorMessage + counter shouldBe 4 + } + + it should "support operations that return an Either" in { + // given + var counter = 0 + val errorMessage = "boom" + + def f = + counter += 1 + Left(errorMessage) + + // when + val result = f.retry(RetryPolicy.immediate(3)) + + // then + result.left.value shouldBe errorMessage + counter shouldBe 4 + } diff --git a/doc/adr/0002-retries.md b/doc/adr/0002-retries.md new file mode 100644 index 00000000..108e934c --- /dev/null +++ b/doc/adr/0002-retries.md @@ -0,0 +1,22 @@ +# 2. Retries + +Date: 2023-11-30 + +## Context + +How should the [retries API](../retries.md) be implemented in terms of: +- developer friendliness, +- supported ways of representing the operation under retry, +- possibly infinite retries. + +## Decision + +We're using a single, unified syntax to retry and operation: +```scala +retry(operation)(policy) +``` +so that the developers don't need to wonder which variant to use. + +The operation can be a function returning a result directly, or wrapped in a `Try` or `Either`. Therefore, there are three overloaded variants of the `retry` function. + +For possibly infinite retries, we're using tail recursion to be stack safe. This comes at a cost of some code duplication in the retry logic, but is still more readable and easier to follow that a `while` loop with `var`s for passing the state. diff --git a/doc/retries.md b/doc/retries.md new file mode 100644 index 00000000..005b4f3c --- /dev/null +++ b/doc/retries.md @@ -0,0 +1,116 @@ +# Retries + +The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms delay between attempts). + +## API +The basic syntax for retries is: +```scala +retry(operation)(policy) +``` + +or, using a syntax sugar: +```scala +import ox.syntax.* + +operation.retry(policy) +``` + +## Operation definition +The `operation` can be provided as one of: +- a direct by-name parameter, i.e. `f: => T` +- a by-name `Try[T]`, i.e. `f: => Try[T]` +- a by-name `Either[E, T]`, i.e. `f: => Either[E, T]` + +## Policies + +A retry policy consists of two parts: +- a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial failure, +- a `ResultPolicy`, which indicates whether: + - a non-erroneous outcome of the `operation` should be considered a success (if not, the `operation` would be retried), + - an erroneous outcome of the `operation` should be retried or fail fast. + +The available schedules are defined in the `Schedule` object. Each schedule has a finite and an infinite variant. + +### Finite schedules + +Finite schedules have a common `maxRetries: Int` parameter, which determines how many times the `operation` would be retried after an initial failure. This means that the operation could be executed at most `maxRetries + 1` times. + +### Infinite schedules + +Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but without the `maxRetries` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the companion object of the respective finite schedule (see examples below). + +### Schedule types + +The supported schedules (specifically - their finite variants) are: +- `Immediate(maxRetries: Int)` - retries up to `maxRetries` times without any delay between subsequent attempts. +- `Delay(maxRetries: Int, delay: FiniteDuration)` - retries up to `maxRetries` times , sleeping for `delay` between subsequent attempts. +- `Backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)` - retries up to `maxRetries` times , sleeping for `initialDelay` before the first retry, increasing the sleep between subsequent attempts exponentially (with base `2`) up to an optional `maxDelay` (default: 1 minute). + + Optionally, a random factor (jitter) can be used when calculating the delay before the next attempt. The purpose of jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By introducing randomness to the delays, the retries become more evenly distributed over time. +- +- See the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) for a more in-depth explanation. + + The following jitter strategies are available (defined in the `Jitter` enum): + - `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used, + - `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt, + - `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and adding a random value between `0` and the other half, + - `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between the `initalAttempt` and `3 * lastDelay`. + +### Result policies + +A result policy allows to customize how the results of the `operation` are treated. It consists of two predicates: +- `isSuccess: T => Boolean` (default: `true`) - determines whether a non-erroneous result of the `operation` should be considered a success. When it evaluates to `true` - no further attempts would be made, otherwise - we'd keep retrying. + + With finite schedules (i.e. those with `maxRetries` defined), if `isSuccess` keeps returning `false` when `maxRetries` are reached, the result is returned as-is, even though it's considered "unsuccessful", +- `isWorthRetrying: E => Boolean` (default: `true`) - determines whether another attempt would be made if the `operation` results in an error `E`. When it evaluates to `true` - we'd keep retrying, otherwise - we'd fail fast with the error. + +The `ResultPolicy[E, T]` is generic both over the error (`E`) and result (`T`) type. Note, however, that for the direct and `Try` variants of the `operation`, the error type `E` is fixed to `Throwable`, while for the `Either` variant, `E` can ba an arbitrary type. + +### API shorthands + +When you don't need to customize the result policy (i.e. use the default one), you can use one of the following shorthands to define a retry policy with a given schedule (note that the parameters are the same as when manually creating the respective `Schedule`): +- `RetryPolicy.immediate(maxRetries: Int)`, +- `RetryPolicy.immediateForever`, +- `RetryPolicy.delay(maxRetries: Int, delay: FiniteDuration)`, +- `RetryPolicy.delayForever(delay: FiniteDuration)`, +- `RetryPolicy.backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`, +- `RetryPolicy.backoffForever(initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`. + +If you want to customize a part of the result policy, you can use the following shorthands: +- `ResultPolicy.default[E, T]` - uses the default settings, +- `ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)` - uses the default `isWorthRetrying` and the provided `isSuccess`, +- `ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)` - uses the default `isSuccess` and the provided `isWorthRetrying`, +- `ResultPolicy.neverRetry[E, T]` - uses the default `isSuccess` and fails fast on any error. + +## Examples +```scala +import ox.retry +import scala.concurrent.duration.* + +def directOperation: Int = ??? +def eitherOperation: Either[String, Int] = ??? +def tryOperation: Try[Int] = ??? + +// various operation definitions - same syntax +retry(directOperation)(RetryPolicy.immediate(3)) +retry(eitherOperation)(RetryPolicy.immediate(3)) +retry(tryOperation)(RetryPolicy.immediate(3)) + +// various policies with custom schedules and default ResultPolicy +retry(directOperation)(RetryPolicy.delay(3, 100.millis)) +retry(directOperation)(RetryPolicy.backoff(3, 100.millis)) // defaults: maxDelay = 1.minute, jitter = Jitter.None +retry(directOperation)(RetryPolicy.backoff(3, 100.millis, 5.minutes, Jitter.Equal)) + +// infinite retries with a default ResultPolicy +retry(directOperation)(RetryPolicy.delayForever(100.millis)) +retry(directOperation)(RetryPolicy.backoffForever(100.millis, 5.minutes, Jitter.Full)) + +// result policies +// custom success +retry(directOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.successfulWhen(_ > 0))) +// fail fast on certain errors +retry(directOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMessage != "fatal error"))) +retry(eitherOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error"))(3)) +``` + +See the tests in `ox.retry.*` for more.