Skip to content

Commit

Permalink
Proposal: returning a second function from batchedThrottle allowing t…
Browse files Browse the repository at this point in the history
…o await the next dispatch

Should debounce and bufferedThrottle follow the same API?
  • Loading branch information
h0lg committed Mar 22, 2024
1 parent 6b35d58 commit 473db05
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 40 deletions.
30 changes: 28 additions & 2 deletions src/Fabulous.Tests/CmdTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type ``Cmd tests``() =
messageCount <- messageCount + 1
dispatched <- msg :: dispatched

let batchedThrottleCmd = Cmd.batchedThrottle 100 NewValues
let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues

batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch
Expand All @@ -186,7 +186,7 @@ type ``Cmd tests``() =
messageCount <- messageCount + 1
dispatched <- msg :: dispatched

let batchedThrottleCmd = Cmd.batchedThrottle 100 NewValues
let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues

batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch
batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch
Expand All @@ -212,3 +212,29 @@ type ``Cmd tests``() =
Assert.AreEqual(4, messageCount)
Assert.AreEqual([ NewValues[4]; NewValues[3]; NewValues[2]; NewValues[1] ], dispatched)
}

[<Test>]
member _.``Cmd.batchedThrottle factory can be awaited for completion``() =
async {
let mutable messageCount = 0
let mutable dispatched = [] // records dispatched messages latest first

let dispatch msg =
messageCount <- messageCount + 1
dispatched <- msg :: dispatched

let createCmd, awaitNextDispatch = Cmd.batchedThrottle 100 NewValues

createCmd 1 |> CmdTestsHelper.execute dispatch
createCmd 2 |> CmdTestsHelper.execute dispatch

// Only the first value should have been dispatched immediately
Assert.AreEqual(1, messageCount)
Assert.AreEqual([ NewValues[1] ], dispatched)

do! awaitNextDispatch None // only waits until next dispatch

// All values should have been dispatched after waiting
Assert.AreEqual(2, messageCount)
Assert.AreEqual([ NewValues[2]; NewValues[1] ], dispatched)
}
100 changes: 62 additions & 38 deletions src/Fabulous/Cmd.fs
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,23 @@ module Cmd =
/// <param name="interval">The minimum time interval between two consecutive Command executions in milliseconds.</param>
/// <param name="fn">A function that maps a list of factory input values to a message for dispatch.</param>
/// <returns>
/// A Command factory function that maps a list of input values to a Command which dispatches a message (mapped from the pending values),
/// Two methods - the first being a Command factory function that maps a list of input values to a Command
/// which dispatches a message (mapped from the pending values),
/// either immediately or after a delay respecting the interval, while remembering and dispatching all remembered values
/// when the interval has elapsed, ensuring no values are lost.
/// The second can be used for awaiting the next dispatch from the outside while adding some buffer time.
/// </returns>
let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : 'value -> Cmd<'msg> =
let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : ('value -> Cmd<'msg>) * (System.TimeSpan option -> Async<unit>) =
let rateLimit = System.TimeSpan.FromMilliseconds(float interval)
let funLock = obj() // ensures safe access to resources shared across different threads
let mutable lastDispatch = System.DateTime.MinValue
let mutable pendingValues: 'value list = []
let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command

// gets the time to wait until the next allowed dispatch returning a negative timespan if the time is up
let getTimeUntilNextDispatch () =
lastDispatch.Add(rateLimit) - System.DateTime.UtcNow

// dispatches all pendingValues and resets them while updating lastDispatch
let dispatchBatch (dispatch: 'msg -> unit) =
// Dispatch in the order they were received
Expand All @@ -344,39 +350,57 @@ module Cmd =
lastDispatch <- System.DateTime.UtcNow
pendingValues <- []

// Return a factory function mapping input values to sleeping Commands dispatching all pending messages
fun (value: 'value) ->
[ fun dispatch ->
lock funLock (fun () ->
let now = System.DateTime.UtcNow
let elapsedSinceLastDispatch = now - lastDispatch
pendingValues <- value :: pendingValues

// If the interval has elapsed since the last dispatch, dispatch all pending messages
if elapsedSinceLastDispatch >= rateLimit then
dispatchBatch dispatch
else // schedule dispatch

// if the the last sleeping dispatch can still be cancelled, do so
if cts <> null then
cts.Cancel()
cts.Dispose()

// used to enable cancelling this dispatch if newer values come into the factory
cts <- new CancellationTokenSource()

Async.Start(
async {
// wait only as long as we have to before next dispatch
do! Async.Sleep(rateLimit - elapsedSinceLastDispatch)

lock funLock (fun () ->
dispatchBatch dispatch

// done; invalidate own cancellation
if cts <> null then
cts.Dispose()
cts <- null)
},
cts.Token
)) ]
// a factory function mapping input values to sleeping Commands dispatching all pending messages
let factory =
fun (value: 'value) ->
[ fun dispatch ->
lock funLock (fun () ->
let untilNextDispatch = getTimeUntilNextDispatch()
pendingValues <- value :: pendingValues

// If the interval has elapsed since the last dispatch, dispatch all pending messages
if untilNextDispatch <= System.TimeSpan.Zero then
dispatchBatch dispatch
else // schedule dispatch

// if the the last sleeping dispatch can still be cancelled, do so
if cts <> null then
cts.Cancel()
cts.Dispose()

// used to enable cancelling this dispatch if newer values come into the factory
cts <- new CancellationTokenSource()

Async.Start(
async {
// wait only as long as we have to before next dispatch
do! Async.Sleep(untilNextDispatch)

lock funLock (fun () ->
dispatchBatch dispatch

// done; invalidate own cancellation
if cts <> null then
cts.Dispose()
cts <- null)
},
cts.Token
)) ]

// a function to wait until after the next async dispatch + some buffer time to ensure the dispatch is complete
let awaitNextDispatch buffer =
lock funLock (fun () ->
async {
if not pendingValues.IsEmpty then
let untilAfterNextDispatch =
getTimeUntilNextDispatch()
+ match buffer with
| Some value -> value
| None -> System.TimeSpan.Zero

if untilAfterNextDispatch > System.TimeSpan.Zero then
do! Async.Sleep(untilAfterNextDispatch)
})

// return both the factory and the await helper
factory, awaitNextDispatch

0 comments on commit 473db05

Please sign in to comment.