Skip to content

Commit

Permalink
Retries (#49)
Browse files Browse the repository at this point in the history
Closes #48
  • Loading branch information
rucek authored Dec 4, 2023
1 parent 5b30b2a commit 01e00d2
Show file tree
Hide file tree
Showing 17 changed files with 1,106 additions and 32 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ There are some helper methods which might be useful when writing forked code:

* `forever { ... }` repeatedly evaluates the given code block forever
* `repeatWhile { ... }` repeatedly evaluates the given code block, as long as it returns `true`
* `retry(times, sleep) { ... }` retries the given block up to the given number of times
* `uninterruptible { ... }` evaluates the given code block making sure it can't be interrupted

## Syntax
Expand Down Expand Up @@ -694,6 +693,10 @@ Please see [the respective ADR](doc/adr/0001-error-propagation-in-channels.md) f
Channels are back-pressured, as the `.send` operation is blocking until there's a receiver thread available, or if
there's enough space in the buffer. The processing space is bound by the total size of channel buffers.

## Retries

The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms delay between attempts). See the [full docs](doc/retries.md) for details.

## Kafka sources & drains

Dependency:
Expand Down
21 changes: 0 additions & 21 deletions core/src/main/scala/ox/control.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ox

import scala.concurrent.duration.FiniteDuration

def forever(f: => Unit): Nothing =
while true do f
throw new RuntimeException("can't get here")
Expand All @@ -16,25 +14,6 @@ def repeatUntil(f: => Boolean): Unit =
var loop = true
while loop do loop = !f

// TODO: retry schedules
def retry[T](times: Int, sleep: FiniteDuration)(f: => T): T =
try f
catch
case e: Throwable =>
if times > 0
then
Thread.sleep(sleep.toMillis)
retry(times - 1, sleep)(f)
else throw e

def retryEither[E, T](times: Int, sleep: FiniteDuration)(f: => Either[E, T]): Either[E, T] =
f match
case r: Right[E, T] => r
case Left(_) if times > 0 =>
Thread.sleep(sleep.toMillis)
retry(times - 1, sleep)(f)
case l: Left[E, T] => l

def uninterruptible[T](f: => T): T =
scoped {
val t = fork(f)
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/ox/retry/Jitter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ox.retry

/** A random factor used for calculating the delay between subsequent retries when a backoff strategy is used for calculating the delay.
*
* The purpose of jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at
* the same time - which can result in subsequent failures, contrary to what you would expect from retrying. By introducing randomness to
* the delays, the retries become more evenly distributed over time.
*
* See the <a href="https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/">AWS Architecture Blog article on backoff and
* jitter</a> for a more in-depth explanation.
*
* Depending on the algorithm, the jitter can affect the delay in different ways - see the concrete variants for more details.
*/
enum Jitter:
/** No jitter, i.e. the delay just uses an exponential backoff with no adjustments. */
case None

/** Full jitter, i.e. the delay is a random value between 0 and the calculated backoff delay. */
case Full

/** Equal jitter, i.e. the delay is half of the calculated backoff delay plus a random value between 0 and the other half. */
case Equal

/** Decorrelated jitter, i.e. the delay is a random value between the initial delay and the last delay multiplied by 3. */
case Decorrelated
37 changes: 37 additions & 0 deletions core/src/main/scala/ox/retry/ResultPolicy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package ox.retry

/** A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which allows
* for failing fast on certain errors).
*
* @param isSuccess
* A function that determines whether a non-erroneous result is considered successful. By default, every non-erroneous result is
* considered successful.
* @param isWorthRetrying
* A function that determines whether an error is worth retrying. By default, all errors are retried.
* @tparam E
* The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning
* an `Either[E, T]`, this can be any `E`.
* @tparam T
* The successful result type for the operation.
*/
case class ResultPolicy[E, T](isSuccess: T => Boolean = (_: T) => true, isWorthRetrying: E => Boolean = (_: E) => true)

object ResultPolicy:
/** A policy that considers every non-erroneous result successful and retries on any error. */
def default[E, T]: ResultPolicy[E, T] = ResultPolicy()

/** A policy that customizes when a non-erroneous result is considered successful, and retries all errors
*
* @param isSuccess
* A predicate that indicates whether a non-erroneous result is considered successful.
*/
def successfulWhen[E, T](isSuccess: T => Boolean): ResultPolicy[E, T] = ResultPolicy(isSuccess = isSuccess)

/** A policy that customizes which errors are retried, and considers every non-erroneous result successful
* @param isWorthRetrying
* A predicate that indicates whether an erroneous result should be retried..
*/
def retryWhen[E, T](isWorthRetrying: E => Boolean): ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = isWorthRetrying)

/** A policy that considers every non-erroneous result successful and never retries any error, i.e. fails fast */
def neverRetry[E, T]: ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = _ => false)
106 changes: 106 additions & 0 deletions core/src/main/scala/ox/retry/RetryPolicy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package ox.retry

import scala.concurrent.duration.*

/** A policy that defines how to retry a failed operation.
*
* @param schedule
* The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation.
* See [[Schedule]] for more details.
* @param resultPolicy
* A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which
* allows for failing fast on certain errors). See [[ResultPolicy]] for more details.
* @tparam E
* The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning
* an `Either[E, T]`, this can be any `E`.
* @tparam T
* The successful result type for the operation.
*/
case class RetryPolicy[E, T](schedule: Schedule, resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T])

object RetryPolicy:
/** Creates a policy that retries up to a given number of times, with no delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Immediate(maxRetries))}}}
*
* @param maxRetries
* The maximum number of retries.
*/
def immediate[E, T](maxRetries: Int): RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate(maxRetries))

/** Creates a policy that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Immediate.forever)}}}
*/
def immediateForever[E, T]: RetryPolicy[E, T] = RetryPolicy(Schedule.Immediate.forever)

/** Creates a policy that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Delay(maxRetries, delay))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param delay
* The delay between subsequent attempts.
*/
def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay(maxRetries, delay))

/** Creates a policy that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryPolicy(Schedule.Delay.forever(delay))}}}
*
* @param delay
* The delay between subsequent attempts.
*/
def delayForever[E, T](delay: FiniteDuration): RetryPolicy[E, T] = RetryPolicy(Schedule.Delay.forever(delay))

/** Creates a policy that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a
* default [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries. Defaults to 1 minute.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def backoff[E, T](
maxRetries: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))

/** Creates a policy that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
* [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}}
*
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries. Defaults to 1 minute.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def backoffForever[E, T](
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryPolicy[E, T] =
RetryPolicy(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
119 changes: 119 additions & 0 deletions core/src/main/scala/ox/retry/Schedule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package ox.retry

import scala.concurrent.duration.*
import scala.util.Random

private[retry] sealed trait Schedule:
def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration

object Schedule:

private[retry] sealed trait Finite extends Schedule:
def maxRetries: Int

private[retry] sealed trait Infinite extends Schedule

/** A schedule that retries up to a given number of times, with no delay between subsequent attempts.
*
* @param maxRetries
* The maximum number of retries.
*/
case class Immediate(maxRetries: Int) extends Finite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero

object Immediate:
/** A schedule that retries indefinitely, with no delay between subsequent attempts. */
def forever: Infinite = ImmediateForever

private case object ImmediateForever extends Infinite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero

/** A schedule that retries up to a given number of times, with a fixed delay between subsequent attempts.
*
* @param maxRetries
* The maximum number of retries.
* @param delay
* The delay between subsequent attempts.
*/
case class Delay(maxRetries: Int, delay: FiniteDuration) extends Finite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay

object Delay:
/** A schedule that retries indefinitely, with a fixed delay between subsequent attempts.
*
* @param delay
* The delay between subsequent attempts.
*/
def forever(delay: FiniteDuration): Infinite = DelayForever(delay)

case class DelayForever private[retry] (delay: FiniteDuration) extends Infinite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay

/** A schedule that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts.
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* @param maxRetries
* The maximum number of retries.
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
case class Backoff(
maxRetries: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
) extends Finite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration =
Backoff.nextDelay(attempt, initialDelay, maxDelay, jitter, lastDelay)

object Backoff:
private[retry] def delay(attempt: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration): FiniteDuration =
// converting Duration <-> Long back and forth to avoid exceeding maximum duration
(initialDelay.toMillis * Math.pow(2, attempt)).toLong.min(maxDelay.toMillis).millis

private[retry] def nextDelay(
attempt: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration,
jitter: Jitter,
lastDelay: Option[FiniteDuration]
): FiniteDuration =
def backoffDelay = Backoff.delay(attempt, initialDelay, maxDelay)

jitter match
case Jitter.None => backoffDelay
case Jitter.Full => Random.between(0, backoffDelay.toMillis).millis
case Jitter.Equal =>
val backoff = backoffDelay.toMillis
(backoff / 2 + Random.between(0, backoff / 2)).millis
case Jitter.Decorrelated =>
val last = lastDelay.getOrElse(initialDelay).toMillis
Random.between(initialDelay.toMillis, last * 3).millis

/** A schedule that retries indefinitely, with an increasing delay (backoff) between subsequent attempts.
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial
* delay and capped at the given maximum delay.
*
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def forever(initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None): Infinite =
BackoffForever(initialDelay, maxDelay, jitter)

case class BackoffForever private[retry] (initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.minute, jitter: Jitter = Jitter.None)
extends Infinite:
override def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration =
Backoff.nextDelay(attempt, initialDelay, maxDelay, jitter, lastDelay)
Loading

0 comments on commit 01e00d2

Please sign in to comment.