Skip to content

Commit

Permalink
Fix the tick source to emit values at regular intervals, accommodatin…
Browse files Browse the repository at this point in the history
…g for slow consumers (#82)
  • Loading branch information
adamw authored Feb 26, 2024
1 parent 1654c29 commit 03f9ddb
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 30 deletions.
40 changes: 36 additions & 4 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -948,12 +948,44 @@ trait SourceCompanionOps:
}
c

def tick[T](interval: FiniteDuration, element: T = ())(using Ox, StageCapacity): Source[T] =
val c = StageCapacity.newChannel[T]
/** Creates a rendezvous channel (without a buffer, regardless of the [[StageCapacity]] in scope), to which the given value is sent
* repeatedly, at least [[interval]] apart between each two elements. The first value is sent immediately.
*
* The interval is measured between the subsequent invocations of the `send(value)` method. Hence, if there's a slow consumer, the next
* tick can be sent right after the previous one is received (if it was received later than the inter-tick interval duration). However,
* ticks don't accumulate, e.g. when the consumer is so slow that multiple intervals pass between `send` invocations.
*
* Must be run within a scope, since a child fork is created which sends the ticks, and waits until the next tick can be sent.
*
* @param interval
* The temporal spacing between subsequent ticks.
* @param value
* The value to send to the channel on every tick.
* @return
* A source to which the tick values are sent.
* @example
* {{{
* scala>
* import ox.*
* import ox.channels.Source
* import scala.concurrent.duration.DurationInt
*
* supervised {
* val s1 = Source.tick(100.millis)
* s1.receive()
* s2.receive() // this will complete at least 100 milliseconds later
* }
* }}}
*/
def tick[T](interval: FiniteDuration, value: T = ())(using Ox): Source[T] =
val c = Channel.rendezvous[T]
fork {
forever {
c.send(element)
Thread.sleep(interval.toMillis)
val start = System.nanoTime()
c.send(value)
val end = System.nanoTime()
val sleep = interval.toNanos - (end - start)
if sleep > 0 then Thread.sleep(sleep / 1_000_000, (sleep % 1_000_000).toInt)
}
}
c
Expand Down
6 changes: 2 additions & 4 deletions core/src/test/scala/ox/ForkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ class ForkTest extends AnyFlatSpec with Matchers {
finally trail.add("f1 complete")
}

trail.add("main mid")
trail.add(s"result = ${f1.join()}")
}

trail.get shouldBe Vector("main mid", "f2 complete", "f1 complete", "result = 11")
trail.get shouldBe Vector("f2 complete", "f1 complete", "result = 11")
}

it should "allow extension method syntax" in {
Expand All @@ -60,11 +59,10 @@ class ForkTest extends AnyFlatSpec with Matchers {
finally trail.add("f1 complete")
}.fork

trail.add("main mid")
trail.add(s"result = ${f1.join()}")
}

trail.get shouldBe Vector("main mid", "f2 complete", "f1 complete", "result = 11")
trail.get shouldBe Vector("f2 complete", "f1 complete", "result = 11")
}

it should "interrupt child forks when parents complete" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SourceOpsMapStatefulTest extends AnyFlatSpec with Matchers {

it should "propagate errors in the mapping function" in supervised {
// given
given StageCapacity = StageCapacity(0) // so that the error isn't created too early
val c = Source.fromValues("a", "b", "c")

// when
Expand Down
22 changes: 0 additions & 22 deletions core/src/test/scala/ox/channels/SourceOpsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,9 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters.*

class SourceOpsTest extends AnyFlatSpec with Matchers with Eventually {

it should "tick regularly" in {
supervised {
val c = Source.tick(100.millis)
val start = System.currentTimeMillis()
c.receive() shouldBe ()
(System.currentTimeMillis() - start) shouldBe >=(0L)
(System.currentTimeMillis() - start) shouldBe <=(50L)

c.receive() shouldBe ()
(System.currentTimeMillis() - start) shouldBe >=(100L)
(System.currentTimeMillis() - start) shouldBe <=(150L)

c.receive() shouldBe ()
(System.currentTimeMillis() - start) shouldBe >=(200L)
(System.currentTimeMillis() - start) shouldBe <=(250L)
}
}

it should "timeout" in {
supervised {
val c = Source.timeout(100.millis)
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsTickTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ox.channels

import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters.*

class SourceOpsTickTest extends AnyFlatSpec with Matchers with Eventually {
it should "tick regularly" in {
supervised {
val start = System.currentTimeMillis()
val c = Source.tick(100.millis)
c.receive() shouldBe ()
(System.currentTimeMillis() - start) shouldBe >=(0L)
(System.currentTimeMillis() - start) shouldBe <=(50L)

c.receive() shouldBe ()
(System.currentTimeMillis() - start) shouldBe >=(100L)
(System.currentTimeMillis() - start) shouldBe <=(150L)

c.receive() shouldBe ()
(System.currentTimeMillis() - start) shouldBe >=(200L)
(System.currentTimeMillis() - start) shouldBe <=(250L)
}
}

it should "tick immediately in case of a slow consumer, and then resume normal " in {
supervised {
val start = System.currentTimeMillis()
val c = Source.tick(100.millis)

// simulating a slow consumer
Thread.sleep(200)
c.receive() shouldBe () // a tick should be waiting
(System.currentTimeMillis() - start) shouldBe >=(200L)
(System.currentTimeMillis() - start) shouldBe <=(250L)

c.receive() shouldBe () // and immediately another, as the interval between send-s has passed
(System.currentTimeMillis() - start) shouldBe >=(200L)
(System.currentTimeMillis() - start) shouldBe <=(250L)
}
}
}

0 comments on commit 03f9ddb

Please sign in to comment.