Skip to content

Commit

Permalink
Some tmp refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
DragonFrai committed Nov 5, 2023
1 parent 32a2e56 commit 94bd20c
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 114 deletions.
30 changes: 27 additions & 3 deletions src/FSharp.Control.Futures/Extensions.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ open FSharp.Control.Futures.Internals
[<RequireQualifiedAccess>]
module Future =

// TODO: move to internal internals module
type internal Sleep(duration: TimeSpan) =
let mutable _timer: Timer = nullObj
let mutable _notify: PrimaryNotify = PrimaryNotify(false)
Expand All @@ -36,7 +37,33 @@ module Future =
_timer.Dispose()
_timer <- nullObj

// [run]

/// Spawn a Future on current thread and synchronously waits for its Ready
/// The simplest implementation of the Future scheduler.
/// Equivalent to `(Scheduler.spawnOn anyScheduler).Join()`,
/// but without the cost of complex general purpose scheduler synchronization
let runSync (fut: Future<'a>) : 'a =
// The simplest implementation of the Future scheduler.
// Based on a polling cycle (polling -> waiting for awakening -> awakening -> polling -> ...)
// until the point with the result is reached
use wh = new EventWaitHandle(false, EventResetMode.AutoReset)
let mutable fut = fut
let ctx =
{ new IContext with member _.Wake() = wh.Set() |> ignore }

let rec pollWhilePending (poller: NaiveFuture<'a>) =
let mutable poller = poller
match poller.Poll(ctx) with
| NaivePoll.Ready x -> x
| NaivePoll.Pending ->
wh.WaitOne() |> ignore
pollWhilePending poller

pollWhilePending (NaiveFuture(fut))

// [runtime based]

let sleep (duration: TimeSpan) : Future<unit> =
Sleep(duration)

Expand All @@ -46,6 +73,3 @@ module Future =

let timeout (duration: TimeSpan) (fut: Future<'a>) : Future<Result<'a, TimeoutException>> =
Future.first (fut |> Future.map Ok) (sleep duration |> Future.map (fun _ -> Error (TimeoutException())))



6 changes: 4 additions & 2 deletions src/FSharp.Control.Futures/FSharp.Control.Futures.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
<Compile Include="Internals.fs" />
<Compile Include="Future.fs" />
<Compile Include="Extensions.fs" />
<Compile Include="Running.fs" />
<Compile Include="Mailbox\Reply.fs" />
<Compile Include="Mailbox\Mailbox.fs" />
<Compile Include="Sync\Namespace.fs" />
Expand All @@ -35,7 +34,10 @@
<Compile Include="Sync\Mutex.fs" />
<Compile Include="Sync\RwLock.fs" />
<Compile Include="Sync\OnceVar.fs" />
<Compile Include="Scheduling.fs" />
<Compile Include="Scheduling\RunnerScheduler.fs" />
<Compile Include="Scheduling\GlobalThreadPoolScheduler.fs" />
<Compile Include="Scheduling\SingleThreadScheduler.fs" />
<Compile Include="Scheduling\Scheduling.fs" />
<Compile Include="Transforms.fs" />
<Compile Include="IO.fs" />
</ItemGroup>
Expand Down
24 changes: 12 additions & 12 deletions src/FSharp.Control.Futures/Internals.fs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ module NaivePoll =
/// для завершения с результатом или исключением и отмены.
/// (TODO: если try без фактического исключения не абсолютно бесплатен, есть смысл убрать его отсюда)
[<Struct; NoComparison; NoEquality>]
type NaivePoller<'a> =
type NaiveFuture<'a> =
val mutable public Internal: Future<'a>
new(fut: Future<'a>) = { Internal = fut }

Expand Down Expand Up @@ -259,15 +259,15 @@ type NaivePoller<'a> =

[<Struct>]
type PrimaryMerge<'a, 'b> =
val mutable _poller1: NaivePoller<'a>
val mutable _poller2: NaivePoller<'b>
val mutable _poller1: NaiveFuture<'a>
val mutable _poller2: NaiveFuture<'b>
val mutable _result1: 'a
val mutable _result2: 'b
val mutable _resultsBits: int // bitflag: r1 = 1 | r2 = 2

new (fut1: Future<'a>, fut2: Future<'b>) =
{ _poller1 = NaivePoller(fut1)
_poller2 = NaivePoller(fut2)
{ _poller1 = NaiveFuture(fut1)
_poller2 = NaiveFuture(fut2)
_result1 = Unchecked.defaultof<_>
_result2 = Unchecked.defaultof<_>
_resultsBits = 0 }
Expand Down Expand Up @@ -576,7 +576,7 @@ module Futures =

[<Sealed>]
type Bind<'a, 'b>(source: Future<'a>, binder: 'a -> Future<'b>) =
let mutable poller = NaivePoller(source)
let mutable poller = NaiveFuture(source)
interface Future<'b> with
member this.Poll(ctx) =
match poller.Poll(ctx) with
Expand All @@ -588,7 +588,7 @@ module Futures =

[<Sealed>]
type Map<'a, 'b>(source: Future<'a>, mapping: 'a -> 'b) =
let mutable poller = NaivePoller(source)
let mutable poller = NaiveFuture(source)
interface Future<'b> with
member this.Poll(ctx) =
match poller.Poll(ctx) with
Expand All @@ -600,7 +600,7 @@ module Futures =

[<Sealed>]
type Ignore<'a>(source: Future<'a>) =
let mutable poller = NaivePoller(source)
let mutable poller = NaiveFuture(source)
interface Future<unit> with
member this.Poll(ctx) =
match poller.Poll(ctx) with
Expand All @@ -625,8 +625,8 @@ module Futures =

[<Sealed>]
type First<'a>(fut1: Future<'a>, fut2: Future<'a>) =
let mutable poller1 = NaivePoller(fut1)
let mutable poller2 = NaivePoller(fut2)
let mutable poller1 = NaiveFuture(fut1)
let mutable poller2 = NaiveFuture(fut2)

interface Future<'a> with
member _.Poll(ctx) =
Expand Down Expand Up @@ -658,7 +658,7 @@ module Futures =

[<Sealed>]
type Join<'a>(source: Future<Future<'a>>) =
let mutable poller = NaivePoller(source)
let mutable poller = NaiveFuture(source)
interface Future<'a> with
member this.Poll(ctx) =
match poller.Poll(ctx) with
Expand All @@ -682,7 +682,7 @@ module Futures =

[<Sealed>]
type TryWith<'a>(body: Future<'a>, handler: exn -> Future<'a>) =
let mutable poller = NaivePoller(body)
let mutable poller = NaiveFuture(body)
let mutable handler = handler

interface Future<'a> with
Expand Down
32 changes: 0 additions & 32 deletions src/FSharp.Control.Futures/Running.fs

This file was deleted.

23 changes: 23 additions & 0 deletions src/FSharp.Control.Futures/Scheduling/GlobalThreadPoolScheduler.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace rec FSharp.Control.Futures.Scheduling

open System.Threading
open FSharp.Control.Futures
open FSharp.Control.Futures.Scheduling.RunnerScheduler.RunnerScheduler


type GlobalThreadPoolTaskRunner() =
interface ITaskRunner with
member _.RunTask(task) =
ThreadPool.QueueUserWorkItem(fun _ -> do task.Run()) |> ignore

type GlobalThreadPoolScheduler internal () =
interface IScheduler with
member this.Spawn(fut: Future<'a>) =
let task = RunnerTask<'a>(fut, GlobalThreadPoolScheduler.globalThreadPoolTaskRunner)
task.InitialRun()
task :> IFutureTask<'a>
member _.Dispose() = ()

module GlobalThreadPoolScheduler =
let internal globalThreadPoolTaskRunner = GlobalThreadPoolTaskRunner()
let globalThreadPoolScheduler: IScheduler = upcast new GlobalThreadPoolScheduler()
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
namespace FSharp.Control.Futures.Scheduling
namespace FSharp.Control.Futures.Scheduling.RunnerScheduler

open System.Threading

open FSharp.Control.Futures
open FSharp.Control.Futures.Internals
open FSharp.Control.Futures.Sync


// -------------------
// ThreadPollScheduler

module internal rec RunnerScheduler =

// Бит сигнализирующий о наличии пробуждения
Expand All @@ -21,14 +16,13 @@ module internal rec RunnerScheduler =

type ITaskRunner =
abstract RunTask: RunnerTask<'a> -> unit
abstract Scheduler: IScheduler option

type RunnerTask<'a>(fut: Future<'a>, runner: ITaskRunner) as this =

let mutable ivar = OnceVar<'a>()
let mutable state = 0uL

let mutable fut = fut
let mutable fut = NaiveFuture(fut)

let changeState (f: uint64 -> uint64) : uint64 =
let mutable prevState = 0uL
Expand Down Expand Up @@ -58,13 +52,12 @@ module internal rec RunnerScheduler =

let mutable isComplete = false
try
PollTransiting(&fut, context
, onReady=fun x ->
match fut.Poll(context) with
| NaivePoll.Pending -> ()
| NaivePoll.Ready x ->
IVar.put x ivar
isComplete <- true
prevState <- changeState (fun x -> x ||| IsCompleteBit)
, onPending=fun () -> ()
)
with e ->
IVar.putExn e ivar

Expand All @@ -77,7 +70,7 @@ module internal rec RunnerScheduler =
changeState (fun x -> x ||| IsRunBit ||| IsWakedBit) |> ignore
runner.RunTask(this)

interface IJoinHandle<'a> with
interface IFutureTask<'a> with

member _.Await() =
ivar.Get()
Expand All @@ -87,38 +80,3 @@ module internal rec RunnerScheduler =

member _.Cancel() =
ivar |> IVar.putExn FutureTerminatedException


type GlobalThreadPoolTaskRunner() =
interface ITaskRunner with
member _.RunTask(task) =
ThreadPool.QueueUserWorkItem(fun _ -> do task.Run()) |> ignore
member _.Scheduler = Some globalThreadPoolScheduler

type GlobalThreadPoolScheduler() =
interface IScheduler with
member this.Spawn(fut: Future<'a>) =
let task = RunnerTask<'a>(fut, globalThreadPoolTaskRunner)
task.InitialRun()
task :> IJoinHandle<'a>

member _.Dispose() = ()

let globalThreadPoolTaskRunner = GlobalThreadPoolTaskRunner()
let globalThreadPoolScheduler: IScheduler = upcast new GlobalThreadPoolScheduler()


[<RequireQualifiedAccess>]
module Schedulers =
let threadPool: IScheduler = RunnerScheduler.globalThreadPoolScheduler


[<RequireQualifiedAccess>]
module Scheduler =

/// <summary> Run Future on passed scheduler </summary>
/// <returns> Return Future waited result passed Future </returns>
let spawnOn (scheduler: IScheduler) (fut: Future<'a>) = scheduler.Spawn(fut)
/// <summary> Run Future on thread pool scheduler </summary>
/// <returns> Return Future waited result passed Future </returns>
let spawnOnThreadPool fut = spawnOn Schedulers.threadPool fut
25 changes: 25 additions & 0 deletions src/FSharp.Control.Futures/Scheduling/Scheduling.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace FSharp.Control.Futures.Scheduling

open System.Threading

open FSharp.Control.Futures
open FSharp.Control.Futures.Internals
open FSharp.Control.Futures.Scheduling.GlobalThreadPoolScheduler
open FSharp.Control.Futures.Sync


[<RequireQualifiedAccess>]
module Schedulers =
let threadPool: IScheduler = GlobalThreadPoolScheduler.globalThreadPoolScheduler


[<RequireQualifiedAccess>]
module Scheduler =

/// <summary> Run Future on passed scheduler </summary>
/// <returns> Return Future waited result passed Future </returns>
let spawnOn (scheduler: IScheduler) (fut: Future<'a>) = scheduler.Spawn(fut)

/// <summary> Run Future on thread pool scheduler </summary>
/// <returns> Return Future waited result passed Future </returns>
let spawnOnThreadPool fut = spawnOn Schedulers.threadPool fut
63 changes: 63 additions & 0 deletions src/FSharp.Control.Futures/Scheduling/SingleThreadScheduler.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
namespace FSharp.Control.Futures.Scheduling

open System.Collections.Generic
open System.Linq.Expressions
open System.Runtime.CompilerServices
open System.Threading
open FSharp.Control.Futures
open FSharp.Control.Futures.Internals


module Constants =
let [<Literal>] MinimalTrimDelta : int = 1024 // 4кб для int


type ISchedulerTask =
abstract Poll: IContext -> bool
abstract Drop: unit -> unit

type SchedulerTask<'a> =
val mutable currentState: NaiveFuture<'a>
val mutable isWaiting: bool // runtime safety checks for onceCell
val mutable onceCell: PrimaryOnceCell<'a>

new (fut: Future<'a>) = {
currentState = NaiveFuture(fut)
isWaiting = false
onceCell = PrimaryOnceCell.Empty()
}

override this.Equals(other: obj): bool =
refEq this other

override this.GetHashCode(): int =
RuntimeHelpers.GetHashCode(this)

interface IFutureTask<'a> with
member this.Cancel() : unit = failwith "todo"
member this.Await() : Future<'a> = failwith "todo"
member this.WaitBlocking() : 'a = (this:> IFutureTask<'a>).Await() |> Future.runSync

interface Future<'a> with
member this.Poll(ctx: IContext) : Poll<'a> =
this.onceCell.PollGet(ctx) |> NaivePoll.toPoll

member _.Drop() : unit =
failwith "TODO"


[<Struct; NoComparison; StructuralEquality>]
type TaskId = TaskId of int
with member this.Inner() : int = let (TaskId x) = this in x

type SingleThreadScheduler() =
let syncObj = obj
let mutable disposeCancellationToken = CancellationToken()

let mutable tasks = List<ISchedulerTask>()
let mutable pollingQueue = Queue<ISchedulerTask>()



module SingleThreadScheduler =
()
Loading

0 comments on commit 94bd20c

Please sign in to comment.