Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retries #49

Merged
merged 29 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1fc3c1d
Extract ElapsedTime so that it can be reused
rucek Nov 16, 2023
3cc1eba
Remove old retries
rucek Nov 16, 2023
cb5792e
Basic retry for function/Try/Either
rucek Nov 16, 2023
0469fda
Delays and simple backoff
rucek Nov 16, 2023
d61e292
Initial isWorthRetrying for Either
rucek Nov 16, 2023
4c267d5
Add upper bound for delay when using backoff
rucek Nov 21, 2023
5b430e7
Add jitter
rucek Nov 22, 2023
52bd207
Remove unnecessary toList
rucek Nov 23, 2023
51fe027
Fail on unexpected match
rucek Nov 23, 2023
c364df0
Use System.nanoTime instead of System.currentTimeMillis for measuring…
rucek Nov 23, 2023
57f0b43
Fix formatting
rucek Nov 23, 2023
e7c5d7d
Add retries with unlimited attempts, make the retry logic tail-recursive
rucek Nov 27, 2023
41fedb0
Use enum for jitter
rucek Nov 27, 2023
17cde01
Add isWorthRetrying for functions and Try
rucek Nov 27, 2023
98b9d17
Move RetryPolicy to a separate file
rucek Nov 27, 2023
2991dac
Reduce maximum backoff delay to 1 minute
rucek Nov 27, 2023
24925a5
Refactor RetryPolicy to include a schedule and a result policy
rucek Nov 30, 2023
57716f9
Rename Direct to Immediate
rucek Nov 30, 2023
67566e1
Update test name
rucek Nov 30, 2023
77091ad
Add custom conditions to fail-fast tests
rucek Nov 30, 2023
d11454f
Add docs for retries
rucek Nov 30, 2023
0caa72b
Add retries to main README
rucek Nov 30, 2023
01926c6
Update docs on ResultPolicy error type
rucek Nov 30, 2023
c148380
Remove DummyImplicit's as they are no longer needed
rucek Nov 30, 2023
ebc6cdf
Fix typo in retries readme
rucek Nov 30, 2023
d671f21
Scaladocs and cleanup
rucek Nov 30, 2023
b2cf1ee
Add ADR for retries
rucek Nov 30, 2023
62e40ef
Fix naming
rucek Dec 4, 2023
5510a98
Add syntax sugar for retries
rucek Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions core/src/main/scala/ox/control.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ox

import scala.concurrent.duration.FiniteDuration

def forever(f: => Unit): Nothing =
while true do f
throw new RuntimeException("can't get here")
Expand All @@ -16,25 +14,6 @@ def repeatUntil(f: => Boolean): Unit =
var loop = true
while loop do loop = !f

// TODO: retry schedules
def retry[T](times: Int, sleep: FiniteDuration)(f: => T): T =
try f
catch
case e: Throwable =>
if times > 0
then
Thread.sleep(sleep.toMillis)
retry(times - 1, sleep)(f)
else throw e

def retryEither[E, T](times: Int, sleep: FiniteDuration)(f: => Either[E, T]): Either[E, T] =
f match
case r: Right[E, T] => r
case Left(_) if times > 0 =>
Thread.sleep(sleep.toMillis)
retry(times - 1, sleep)(f)
case l: Left[E, T] => l

def uninterruptible[T](f: => T): T =
scoped {
val t = fork(f)
Expand Down
81 changes: 81 additions & 0 deletions core/src/main/scala/ox/retry/retry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package ox.retry

import scala.concurrent.duration.*
import scala.util.{Random, Try}

sealed trait Jitter

object Jitter:
case object None extends Jitter
case object Full extends Jitter
case object Equal extends Jitter
case object Decorrelated extends Jitter
rucek marked this conversation as resolved.
Show resolved Hide resolved

trait RetryPolicy:
def maxRetries: Int
rucek marked this conversation as resolved.
Show resolved Hide resolved
def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration

object RetryPolicy:
case class Direct(maxRetries: Int) extends RetryPolicy:
def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = Duration.Zero

case class Delay(maxRetries: Int, delay: FiniteDuration) extends RetryPolicy:
def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration = delay

case class Backoff(
maxRetries: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.day,
jitter: Jitter = Jitter.None
) extends RetryPolicy:
def nextDelay(attempt: Int, lastDelay: Option[FiniteDuration]): FiniteDuration =
def backoffDelay = Backoff.delay(attempt, initialDelay, maxDelay)

jitter match
case Jitter.None => backoffDelay
case Jitter.Full => Random.between(0, backoffDelay.toMillis).millis
case Jitter.Equal =>
val backoff = backoffDelay.toMillis
(backoff / 2 + Random.between(0, backoff / 2)).millis
case Jitter.Decorrelated =>
val last = lastDelay.getOrElse(initialDelay).toMillis
Random.between(initialDelay.toMillis, last * 3).millis

private[retry] object Backoff:
def delay(attempt: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration): FiniteDuration =
(initialDelay * Math.pow(2, attempt).toLong).min(maxDelay)

def retry[T](f: => T)(policy: RetryPolicy): T =
retry(f, _ => true)(policy)

def retry[T](f: => T, isSuccess: T => Boolean)(policy: RetryPolicy): T =
retry(Try(f), isSuccess)(policy).get

def retry[E, T](f: => Either[E, T])(policy: RetryPolicy)(using dummy: DummyImplicit): Either[E, T] =
retry(f, _ => true)(policy)(using dummy)

def retry[E, T](f: => Either[E, T], isSuccess: T => Boolean, isWorthRetrying: E => Boolean = (_: E) => true)(policy: RetryPolicy)(using
dummy: DummyImplicit
): Either[E, T] =
def loop(remainingAttempts: Int, lastDelay: Option[FiniteDuration] = None): Either[E, T] =
rucek marked this conversation as resolved.
Show resolved Hide resolved
def nextAttemptOr(e: => Either[E, T]) =
if remainingAttempts > 0 then
val delay = policy.nextDelay(policy.maxRetries - remainingAttempts, lastDelay).toMillis
if delay > 0 then Thread.sleep(delay)
loop(remainingAttempts - 1, Some(delay.millis))
else e

f match
case left @ Left(error) =>
if isWorthRetrying(error) then nextAttemptOr(left)
else left
case right @ Right(result) if !isSuccess(result) => nextAttemptOr(right)
case right => right

loop(policy.maxRetries)

def retry[T](f: => Try[T])(policy: RetryPolicy)(using dummy1: DummyImplicit, dummy2: DummyImplicit): Try[T] =
retry(f, _ => true)(policy)(using dummy1, dummy2)

def retry[T](f: => Try[T], isSuccess: T => Boolean)(policy: RetryPolicy)(using dummy1: DummyImplicit, dummy2: DummyImplicit): Try[T] =
retry(f.toEither, isSuccess)(policy)(using dummy1).toTry
4 changes: 1 addition & 3 deletions core/src/main/scala/ox/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package ox
import scala.concurrent.duration.FiniteDuration

object syntax:
extension [T](f: => T)
def forever: Fork[Nothing] = ox.forever(f)
def retry(times: Int, sleep: FiniteDuration): T = ox.retry(times, sleep)(f)
extension [T](f: => T) def forever: Fork[Nothing] = ox.forever(f)

extension [T](f: => T)(using Ox)
def fork: Fork[T] = ox.fork(f)
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/ox/ElapsedTime.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package ox

import scala.concurrent.duration.*

trait ElapsedTime {

def measure[T](f: => T): (T, Duration) =
val before = System.currentTimeMillis()
rucek marked this conversation as resolved.
Show resolved Hide resolved
val result = f
val after = System.currentTimeMillis();
(result, (after - before).millis)
}
9 changes: 2 additions & 7 deletions core/src/test/scala/ox/channels/SourceOpsThrottleTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import org.scalatest.matchers.should.Matchers
import ox.*

import scala.concurrent.duration.*
import ox.ElapsedTime

class SourceOpsThrottleTest extends AnyFlatSpec with Matchers {
class SourceOpsThrottleTest extends AnyFlatSpec with Matchers with ElapsedTime {
behavior of "Source.throttle"

it should "not throttle the empty source" in supervised {
Expand Down Expand Up @@ -36,10 +37,4 @@ class SourceOpsThrottleTest extends AnyFlatSpec with Matchers {
s.throttle(1, 50.nanos)
} should have message "requirement failed: per time must be >= 1 ms"
}

private def measure[T](f: => T): (T, Duration) =
val before = System.currentTimeMillis()
val result = f
val after = System.currentTimeMillis();
(result, (after - before).millis)
}
116 changes: 116 additions & 0 deletions core/src/test/scala/ox/retry/BackoffRetryTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package ox.retry

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{EitherValues, TryValues}
import ox.ElapsedTime
import ox.retry.*

import scala.concurrent.duration.*
import scala.util.Failure

class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues with ElapsedTime:

behavior of "Backoff retry"

it should "retry a function" in {
// given
val maxRetries = 3
val initialDelay = 100.millis
var counter = 0
def f =
counter += 1
if true then throw new RuntimeException("boom")

// when
val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(f)(RetryPolicy.Backoff(maxRetries, initialDelay)))

// then
result should have message "boom"
elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay)
counter shouldBe 4
}

it should "respect maximum delay" in {
// given
val maxRetries = 3
val initialDelay = 100.millis
val maxDelay = 200.millis
var counter = 0
def f =
counter += 1
if true then throw new RuntimeException("boom")

// when
val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(f)(RetryPolicy.Backoff(maxRetries, initialDelay, maxDelay)))

// then
result should have message "boom"
elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay, maxDelay)
elapsedTime.toMillis should be < initialDelay.toMillis + maxRetries * maxDelay.toMillis
counter shouldBe 4
}

it should "use jitter" in {
// given
val maxRetries = 3
val initialDelay = 100.millis
val maxDelay = 200.millis
var counter = 0
def f =
counter += 1
if true then throw new RuntimeException("boom")

// when
val (result, elapsedTime) =
measure(the[RuntimeException] thrownBy retry(f)(RetryPolicy.Backoff(maxRetries, initialDelay, maxDelay, Jitter.Equal)))

// then
result should have message "boom"
elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay, maxDelay) / 2
elapsedTime.toMillis should be < initialDelay.toMillis + maxRetries * maxDelay.toMillis
counter shouldBe 4
}

it should "retry an Either" in {
// given
val maxRetries = 3
val initialDelay = 100.millis
var counter = 0
val errorMessage = "boom"

def f =
counter += 1
Left(errorMessage)

// when
val (result, elapsedTime) = measure(retry(f)(RetryPolicy.Backoff(maxRetries, initialDelay)))

// then
result.left.value shouldBe errorMessage
elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay)
counter shouldBe 4
}

it should "retry a Try" in {
// given
val maxRetries = 3
val initialDelay = 100.millis
var counter = 0
val errorMessage = "boom"

def f =
counter += 1
Failure(new RuntimeException(errorMessage))

// when
val (result, elapsedTime) = measure(retry(f)(RetryPolicy.Backoff(maxRetries, initialDelay)))

// then
result.failure.exception should have message errorMessage
elapsedTime.toMillis should be >= expectedTotalBackoffTimeMillis(maxRetries, initialDelay)
counter shouldBe 4
}

private def expectedTotalBackoffTimeMillis(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration = 1.day): Long =
(0 until maxRetries).map(RetryPolicy.Backoff.delay(_, initialDelay, maxDelay).toMillis).sum
72 changes: 72 additions & 0 deletions core/src/test/scala/ox/retry/DelayedRetryTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package ox.retry

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{EitherValues, TryValues}
import ox.ElapsedTime
import ox.retry.*

import scala.concurrent.duration.*
import scala.util.Failure

class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues with ElapsedTime:

behavior of "Delayed retry"

it should "retry a function" in {
// given
val maxRetries = 3
val sleep = 100.millis
var counter = 0
def f =
counter += 1
if true then throw new RuntimeException("boom")

// when
val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(f)(RetryPolicy.Delay(maxRetries, sleep)))

// then
result should have message "boom"
elapsedTime.toMillis should be >= maxRetries * sleep.toMillis
counter shouldBe 4
}

it should "retry an Either" in {
// given
val maxRetries = 3
val sleep = 100.millis
var counter = 0
val errorMessage = "boom"

def f =
counter += 1
Left(errorMessage)

// when
val (result, elapsedTime) = measure(retry(f)(RetryPolicy.Delay(maxRetries, sleep)))

// then
result.left.value shouldBe errorMessage
elapsedTime.toMillis should be >= maxRetries * sleep.toMillis
counter shouldBe 4
}

it should "retry a Try" in {
// given
val maxRetries = 3
val sleep = 100.millis
var counter = 0
val errorMessage = "boom"

def f =
counter += 1
Failure(new RuntimeException(errorMessage))

// when
val (result, elapsedTime) = measure(retry(f)(RetryPolicy.Delay(maxRetries, sleep)))

// then
result.failure.exception should have message errorMessage
elapsedTime.toMillis should be >= maxRetries * sleep.toMillis
counter shouldBe 4
}
Loading