From 94bd20c6ecd63033b455fe40b44ae64179763c38 Mon Sep 17 00:00:00 2001 From: Vladislav Podporkin Date: Sun, 5 Nov 2023 09:57:37 +0300 Subject: [PATCH] Some tmp refactor --- src/FSharp.Control.Futures/Extensions.fs | 30 ++++++++- .../FSharp.Control.Futures.fsproj | 6 +- src/FSharp.Control.Futures/Internals.fs | 24 +++---- src/FSharp.Control.Futures/Running.fs | 32 ---------- .../Scheduling/GlobalThreadPoolScheduler.fs | 23 +++++++ .../RunnerScheduler.fs} | 54 ++-------------- .../Scheduling/Scheduling.fs | 25 ++++++++ .../Scheduling/SingleThreadScheduler.fs | 63 +++++++++++++++++++ src/FSharp.Control.Futures/Types.fs | 49 ++++++++++----- 9 files changed, 192 insertions(+), 114 deletions(-) delete mode 100644 src/FSharp.Control.Futures/Running.fs create mode 100644 src/FSharp.Control.Futures/Scheduling/GlobalThreadPoolScheduler.fs rename src/FSharp.Control.Futures/{Scheduling.fs => Scheduling/RunnerScheduler.fs} (64%) create mode 100644 src/FSharp.Control.Futures/Scheduling/Scheduling.fs create mode 100644 src/FSharp.Control.Futures/Scheduling/SingleThreadScheduler.fs diff --git a/src/FSharp.Control.Futures/Extensions.fs b/src/FSharp.Control.Futures/Extensions.fs index 6905a4b..cca7680 100644 --- a/src/FSharp.Control.Futures/Extensions.fs +++ b/src/FSharp.Control.Futures/Extensions.fs @@ -11,6 +11,7 @@ open FSharp.Control.Futures.Internals [] module Future = + // TODO: move to internal internals module type internal Sleep(duration: TimeSpan) = let mutable _timer: Timer = nullObj let mutable _notify: PrimaryNotify = PrimaryNotify(false) @@ -36,7 +37,33 @@ module Future = _timer.Dispose() _timer <- nullObj + // [run] + + /// Spawn a Future on current thread and synchronously waits for its Ready + /// The simplest implementation of the Future scheduler. + /// Equivalent to `(Scheduler.spawnOn anyScheduler).Join()`, + /// but without the cost of complex general purpose scheduler synchronization + let runSync (fut: Future<'a>) : 'a = + // The simplest implementation of the Future scheduler. + // Based on a polling cycle (polling -> waiting for awakening -> awakening -> polling -> ...) + // until the point with the result is reached + use wh = new EventWaitHandle(false, EventResetMode.AutoReset) + let mutable fut = fut + let ctx = + { new IContext with member _.Wake() = wh.Set() |> ignore } + + let rec pollWhilePending (poller: NaiveFuture<'a>) = + let mutable poller = poller + match poller.Poll(ctx) with + | NaivePoll.Ready x -> x + | NaivePoll.Pending -> + wh.WaitOne() |> ignore + pollWhilePending poller + + pollWhilePending (NaiveFuture(fut)) + // [runtime based] + let sleep (duration: TimeSpan) : Future = Sleep(duration) @@ -46,6 +73,3 @@ module Future = let timeout (duration: TimeSpan) (fut: Future<'a>) : Future> = Future.first (fut |> Future.map Ok) (sleep duration |> Future.map (fun _ -> Error (TimeoutException()))) - - - diff --git a/src/FSharp.Control.Futures/FSharp.Control.Futures.fsproj b/src/FSharp.Control.Futures/FSharp.Control.Futures.fsproj index 1e2f8b9..b7df76e 100644 --- a/src/FSharp.Control.Futures/FSharp.Control.Futures.fsproj +++ b/src/FSharp.Control.Futures/FSharp.Control.Futures.fsproj @@ -24,7 +24,6 @@ - @@ -35,7 +34,10 @@ - + + + + diff --git a/src/FSharp.Control.Futures/Internals.fs b/src/FSharp.Control.Futures/Internals.fs index 9c0da23..333ee1e 100644 --- a/src/FSharp.Control.Futures/Internals.fs +++ b/src/FSharp.Control.Futures/Internals.fs @@ -224,7 +224,7 @@ module NaivePoll = /// для завершения с результатом или исключением и отмены. /// (TODO: если try без фактического исключения не абсолютно бесплатен, есть смысл убрать его отсюда) [] -type NaivePoller<'a> = +type NaiveFuture<'a> = val mutable public Internal: Future<'a> new(fut: Future<'a>) = { Internal = fut } @@ -259,15 +259,15 @@ type NaivePoller<'a> = [] type PrimaryMerge<'a, 'b> = - val mutable _poller1: NaivePoller<'a> - val mutable _poller2: NaivePoller<'b> + val mutable _poller1: NaiveFuture<'a> + val mutable _poller2: NaiveFuture<'b> val mutable _result1: 'a val mutable _result2: 'b val mutable _resultsBits: int // bitflag: r1 = 1 | r2 = 2 new (fut1: Future<'a>, fut2: Future<'b>) = - { _poller1 = NaivePoller(fut1) - _poller2 = NaivePoller(fut2) + { _poller1 = NaiveFuture(fut1) + _poller2 = NaiveFuture(fut2) _result1 = Unchecked.defaultof<_> _result2 = Unchecked.defaultof<_> _resultsBits = 0 } @@ -576,7 +576,7 @@ module Futures = [] type Bind<'a, 'b>(source: Future<'a>, binder: 'a -> Future<'b>) = - let mutable poller = NaivePoller(source) + let mutable poller = NaiveFuture(source) interface Future<'b> with member this.Poll(ctx) = match poller.Poll(ctx) with @@ -588,7 +588,7 @@ module Futures = [] type Map<'a, 'b>(source: Future<'a>, mapping: 'a -> 'b) = - let mutable poller = NaivePoller(source) + let mutable poller = NaiveFuture(source) interface Future<'b> with member this.Poll(ctx) = match poller.Poll(ctx) with @@ -600,7 +600,7 @@ module Futures = [] type Ignore<'a>(source: Future<'a>) = - let mutable poller = NaivePoller(source) + let mutable poller = NaiveFuture(source) interface Future with member this.Poll(ctx) = match poller.Poll(ctx) with @@ -625,8 +625,8 @@ module Futures = [] type First<'a>(fut1: Future<'a>, fut2: Future<'a>) = - let mutable poller1 = NaivePoller(fut1) - let mutable poller2 = NaivePoller(fut2) + let mutable poller1 = NaiveFuture(fut1) + let mutable poller2 = NaiveFuture(fut2) interface Future<'a> with member _.Poll(ctx) = @@ -658,7 +658,7 @@ module Futures = [] type Join<'a>(source: Future>) = - let mutable poller = NaivePoller(source) + let mutable poller = NaiveFuture(source) interface Future<'a> with member this.Poll(ctx) = match poller.Poll(ctx) with @@ -682,7 +682,7 @@ module Futures = [] type TryWith<'a>(body: Future<'a>, handler: exn -> Future<'a>) = - let mutable poller = NaivePoller(body) + let mutable poller = NaiveFuture(body) let mutable handler = handler interface Future<'a> with diff --git a/src/FSharp.Control.Futures/Running.fs b/src/FSharp.Control.Futures/Running.fs deleted file mode 100644 index a20245c..0000000 --- a/src/FSharp.Control.Futures/Running.fs +++ /dev/null @@ -1,32 +0,0 @@ -[] -module FSharp.Control.Futures.Running - -open FSharp.Control.Futures.Internals - -module Future = - - open System.Threading - open FSharp.Control.Futures - - /// Spawn a Future on current thread and synchronously waits for its Ready - /// The simplest implementation of the Future scheduler. - /// Equivalent to `(Scheduler.spawnOn anyScheduler).Join()`, - /// but without the cost of complex general purpose scheduler synchronization - let runSync (fut: Future<'a>) : 'a = - // The simplest implementation of the Future scheduler. - // Based on a polling cycle (polling -> waiting for awakening -> awakening -> polling -> ...) - // until the point with the result is reached - use wh = new EventWaitHandle(false, EventResetMode.AutoReset) - let mutable fut = fut - let ctx = - { new IContext with member _.Wake() = wh.Set() |> ignore } - - let rec pollWhilePending (poller: NaivePoller<'a>) = - let mutable poller = poller - match poller.Poll(ctx) with - | NaivePoll.Ready x -> x - | NaivePoll.Pending -> - wh.WaitOne() |> ignore - pollWhilePending (poller) - - pollWhilePending (NaivePoller(fut)) diff --git a/src/FSharp.Control.Futures/Scheduling/GlobalThreadPoolScheduler.fs b/src/FSharp.Control.Futures/Scheduling/GlobalThreadPoolScheduler.fs new file mode 100644 index 0000000..3670de3 --- /dev/null +++ b/src/FSharp.Control.Futures/Scheduling/GlobalThreadPoolScheduler.fs @@ -0,0 +1,23 @@ +namespace rec FSharp.Control.Futures.Scheduling + +open System.Threading +open FSharp.Control.Futures +open FSharp.Control.Futures.Scheduling.RunnerScheduler.RunnerScheduler + + +type GlobalThreadPoolTaskRunner() = + interface ITaskRunner with + member _.RunTask(task) = + ThreadPool.QueueUserWorkItem(fun _ -> do task.Run()) |> ignore + +type GlobalThreadPoolScheduler internal () = + interface IScheduler with + member this.Spawn(fut: Future<'a>) = + let task = RunnerTask<'a>(fut, GlobalThreadPoolScheduler.globalThreadPoolTaskRunner) + task.InitialRun() + task :> IFutureTask<'a> + member _.Dispose() = () + +module GlobalThreadPoolScheduler = + let internal globalThreadPoolTaskRunner = GlobalThreadPoolTaskRunner() + let globalThreadPoolScheduler: IScheduler = upcast new GlobalThreadPoolScheduler() diff --git a/src/FSharp.Control.Futures/Scheduling.fs b/src/FSharp.Control.Futures/Scheduling/RunnerScheduler.fs similarity index 64% rename from src/FSharp.Control.Futures/Scheduling.fs rename to src/FSharp.Control.Futures/Scheduling/RunnerScheduler.fs index c31ccf3..b4a9be7 100644 --- a/src/FSharp.Control.Futures/Scheduling.fs +++ b/src/FSharp.Control.Futures/Scheduling/RunnerScheduler.fs @@ -1,15 +1,10 @@ -namespace FSharp.Control.Futures.Scheduling +namespace FSharp.Control.Futures.Scheduling.RunnerScheduler open System.Threading - open FSharp.Control.Futures open FSharp.Control.Futures.Internals open FSharp.Control.Futures.Sync - -// ------------------- -// ThreadPollScheduler - module internal rec RunnerScheduler = // Бит сигнализирующий о наличии пробуждения @@ -21,14 +16,13 @@ module internal rec RunnerScheduler = type ITaskRunner = abstract RunTask: RunnerTask<'a> -> unit - abstract Scheduler: IScheduler option type RunnerTask<'a>(fut: Future<'a>, runner: ITaskRunner) as this = let mutable ivar = OnceVar<'a>() let mutable state = 0uL - let mutable fut = fut + let mutable fut = NaiveFuture(fut) let changeState (f: uint64 -> uint64) : uint64 = let mutable prevState = 0uL @@ -58,13 +52,12 @@ module internal rec RunnerScheduler = let mutable isComplete = false try - PollTransiting(&fut, context - , onReady=fun x -> + match fut.Poll(context) with + | NaivePoll.Pending -> () + | NaivePoll.Ready x -> IVar.put x ivar isComplete <- true prevState <- changeState (fun x -> x ||| IsCompleteBit) - , onPending=fun () -> () - ) with e -> IVar.putExn e ivar @@ -77,7 +70,7 @@ module internal rec RunnerScheduler = changeState (fun x -> x ||| IsRunBit ||| IsWakedBit) |> ignore runner.RunTask(this) - interface IJoinHandle<'a> with + interface IFutureTask<'a> with member _.Await() = ivar.Get() @@ -87,38 +80,3 @@ module internal rec RunnerScheduler = member _.Cancel() = ivar |> IVar.putExn FutureTerminatedException - - - type GlobalThreadPoolTaskRunner() = - interface ITaskRunner with - member _.RunTask(task) = - ThreadPool.QueueUserWorkItem(fun _ -> do task.Run()) |> ignore - member _.Scheduler = Some globalThreadPoolScheduler - - type GlobalThreadPoolScheduler() = - interface IScheduler with - member this.Spawn(fut: Future<'a>) = - let task = RunnerTask<'a>(fut, globalThreadPoolTaskRunner) - task.InitialRun() - task :> IJoinHandle<'a> - - member _.Dispose() = () - - let globalThreadPoolTaskRunner = GlobalThreadPoolTaskRunner() - let globalThreadPoolScheduler: IScheduler = upcast new GlobalThreadPoolScheduler() - - -[] -module Schedulers = - let threadPool: IScheduler = RunnerScheduler.globalThreadPoolScheduler - - -[] -module Scheduler = - - /// Run Future on passed scheduler - /// Return Future waited result passed Future - let spawnOn (scheduler: IScheduler) (fut: Future<'a>) = scheduler.Spawn(fut) - /// Run Future on thread pool scheduler - /// Return Future waited result passed Future - let spawnOnThreadPool fut = spawnOn Schedulers.threadPool fut diff --git a/src/FSharp.Control.Futures/Scheduling/Scheduling.fs b/src/FSharp.Control.Futures/Scheduling/Scheduling.fs new file mode 100644 index 0000000..e8e3286 --- /dev/null +++ b/src/FSharp.Control.Futures/Scheduling/Scheduling.fs @@ -0,0 +1,25 @@ +namespace FSharp.Control.Futures.Scheduling + +open System.Threading + +open FSharp.Control.Futures +open FSharp.Control.Futures.Internals +open FSharp.Control.Futures.Scheduling.GlobalThreadPoolScheduler +open FSharp.Control.Futures.Sync + + +[] +module Schedulers = + let threadPool: IScheduler = GlobalThreadPoolScheduler.globalThreadPoolScheduler + + +[] +module Scheduler = + + /// Run Future on passed scheduler + /// Return Future waited result passed Future + let spawnOn (scheduler: IScheduler) (fut: Future<'a>) = scheduler.Spawn(fut) + + /// Run Future on thread pool scheduler + /// Return Future waited result passed Future + let spawnOnThreadPool fut = spawnOn Schedulers.threadPool fut diff --git a/src/FSharp.Control.Futures/Scheduling/SingleThreadScheduler.fs b/src/FSharp.Control.Futures/Scheduling/SingleThreadScheduler.fs new file mode 100644 index 0000000..28da7b5 --- /dev/null +++ b/src/FSharp.Control.Futures/Scheduling/SingleThreadScheduler.fs @@ -0,0 +1,63 @@ +namespace FSharp.Control.Futures.Scheduling + +open System.Collections.Generic +open System.Linq.Expressions +open System.Runtime.CompilerServices +open System.Threading +open FSharp.Control.Futures +open FSharp.Control.Futures.Internals + + +module Constants = + let [] MinimalTrimDelta : int = 1024 // 4кб для int + + +type ISchedulerTask = + abstract Poll: IContext -> bool + abstract Drop: unit -> unit + +type SchedulerTask<'a> = + val mutable currentState: NaiveFuture<'a> + val mutable isWaiting: bool // runtime safety checks for onceCell + val mutable onceCell: PrimaryOnceCell<'a> + + new (fut: Future<'a>) = { + currentState = NaiveFuture(fut) + isWaiting = false + onceCell = PrimaryOnceCell.Empty() + } + + override this.Equals(other: obj): bool = + refEq this other + + override this.GetHashCode(): int = + RuntimeHelpers.GetHashCode(this) + + interface IFutureTask<'a> with + member this.Cancel() : unit = failwith "todo" + member this.Await() : Future<'a> = failwith "todo" + member this.WaitBlocking() : 'a = (this:> IFutureTask<'a>).Await() |> Future.runSync + + interface Future<'a> with + member this.Poll(ctx: IContext) : Poll<'a> = + this.onceCell.PollGet(ctx) |> NaivePoll.toPoll + + member _.Drop() : unit = + failwith "TODO" + + +[] +type TaskId = TaskId of int + with member this.Inner() : int = let (TaskId x) = this in x + +type SingleThreadScheduler() = + let syncObj = obj + let mutable disposeCancellationToken = CancellationToken() + + let mutable tasks = List() + let mutable pollingQueue = Queue() + + + +module SingleThreadScheduler = + () diff --git a/src/FSharp.Control.Futures/Types.fs b/src/FSharp.Control.Futures/Types.fs index f47c0a9..0cec52f 100644 --- a/src/FSharp.Control.Futures/Types.fs +++ b/src/FSharp.Control.Futures/Types.fs @@ -1,9 +1,13 @@ namespace FSharp.Control.Futures +// Модуль содержит определения самых базовых типов +// с минимумом функционала, достаточно няпрямую следующими из их определения. +// Определение достаточно расплывчатое, но, к примеру, прямые вызовы функций сюда подходит +// А вот комбинаторы Future, даже самые фундаментальные, нет + open System -// ========== -// Core types +// [Core types] /// Current state of a AsyncComputation type [] @@ -42,23 +46,19 @@ and IContext = and IScheduler = inherit IDisposable - abstract Spawn: IFuture<'a> -> IJoinHandle<'a> + abstract Spawn: IFuture<'a> -> IFutureTask<'a> /// Allows to cancel and wait (asynchronously or synchronously) for a spawned Future. -and IJoinHandle<'a> = +and IFutureTask<'a> = abstract Cancel: unit -> unit - abstract Join: unit -> 'a - abstract Await: unit -> IFuture<'a> + abstract Await: unit -> IFuture<'a> // WaitAsync + abstract WaitBlocking: unit -> 'a -// Core types -// ========== -// Aliases +// [Aliases] type Future<'a> = IFuture<'a> -// Aliases -// ========== -// Exceptions +// [Exceptions] /// Exception is thrown when future is in a terminated state: /// Completed, Completed with exception, Canceled @@ -66,14 +66,16 @@ type FutureTerminatedException internal () = inherit Exception() type FutureDroppedException internal () = inherit FutureTerminatedException() exception FutureThreadingException +exception FutureTaskAbortedException + [] module Exceptions = let FutureTerminatedException : FutureTerminatedException = FutureTerminatedException() let FutureCancelledException : FutureDroppedException = FutureDroppedException() -// Exceptions -// ========== -// Poll utils + +// [Modules] +// [Modules / Poll] [] module Poll = @@ -86,5 +88,18 @@ module Poll = let inline isTransit (poll: Poll<'a>) : bool = match poll with Poll.Transit _ -> true | _ -> false -// Poll utils -// ========== +// [Modules / Future] +module Future = + // Poll и Drop не являются первостепенными функциями пользовательского пространства, + // поэтому не могут быть отражены в этом модуле. + // Рассмотрите возможность использования Internals + () + +// [Modules / Scheduler] +module Scheduler = + let spawn (fut: Future<'a>) (scheduler: IScheduler) : IFutureTask<'a> = scheduler.Spawn(fut) + +// [Modules / FutureTask] +module FutureTask = + let await (futTask: IFutureTask<'a>) : Future<'a> = futTask.Await() + let cancel (futTask: IFutureTask<'a>) : unit = futTask.Cancel()