From 6a655ad731f0cf3ae3b11f45c8d7fe7a05448ac4 Mon Sep 17 00:00:00 2001 From: Vladislav Podporkin Date: Thu, 12 Sep 2024 11:32:26 +0300 Subject: [PATCH] wip --- FSharp.Control.Futures.sln | 13 ++ ...p.Control.Futures.Actors.Playground.fsproj | 17 +++ .../Program.fs | 31 +++++ src/FSharp.Control.Futures.Actors/Arbiter.fs | 69 ++++++++++ .../FSharp.Control.Futures.Actors.fsproj | 19 +++ src/FSharp.Control.Futures.Actors/Handler.fs | 18 +++ src/FSharp.Control.Futures.Actors/Library.fs | 5 + src/FSharp.Control.Futures.Actors/Types.fs | 120 ++++++++++++++++++ src/FSharp.Control.Futures/Extensions.fs | 5 + 9 files changed, 297 insertions(+) create mode 100644 examples/FSharp.Control.Futures.Actors.Playground/FSharp.Control.Futures.Actors.Playground.fsproj create mode 100644 examples/FSharp.Control.Futures.Actors.Playground/Program.fs create mode 100644 src/FSharp.Control.Futures.Actors/Arbiter.fs create mode 100644 src/FSharp.Control.Futures.Actors/FSharp.Control.Futures.Actors.fsproj create mode 100644 src/FSharp.Control.Futures.Actors/Handler.fs create mode 100644 src/FSharp.Control.Futures.Actors/Library.fs create mode 100644 src/FSharp.Control.Futures.Actors/Types.fs diff --git a/FSharp.Control.Futures.sln b/FSharp.Control.Futures.sln index b4cdb74..4f68919 100644 --- a/FSharp.Control.Futures.sln +++ b/FSharp.Control.Futures.sln @@ -20,6 +20,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{F0 EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Futures.Examples", "examples\FSharp.Control.Futures.Examples\FSharp.Control.Futures.Examples.fsproj", "{00BAC644-843A-4EE7-AF0E-78DFB4AA977D}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Futures.Actors", "src\FSharp.Control.Futures.Actors\FSharp.Control.Futures.Actors.fsproj", "{AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Futures.Actors.Playground", "examples\FSharp.Control.Futures.Actors.Playground\FSharp.Control.Futures.Actors.Playground.fsproj", "{5983781E-8F81-410B-B184-F1B9ADB26CA3}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -54,6 +58,14 @@ Global {00BAC644-843A-4EE7-AF0E-78DFB4AA977D}.Debug|Any CPU.Build.0 = Debug|Any CPU {00BAC644-843A-4EE7-AF0E-78DFB4AA977D}.Release|Any CPU.ActiveCfg = Release|Any CPU {00BAC644-843A-4EE7-AF0E-78DFB4AA977D}.Release|Any CPU.Build.0 = Release|Any CPU + {AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}.Release|Any CPU.Build.0 = Release|Any CPU + {5983781E-8F81-410B-B184-F1B9ADB26CA3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5983781E-8F81-410B-B184-F1B9ADB26CA3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5983781E-8F81-410B-B184-F1B9ADB26CA3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5983781E-8F81-410B-B184-F1B9ADB26CA3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {B0400CAB-0469-44D5-B0A5-93265136ABDB} = {44835DE2-D38A-4533-BE69-3A06B779FF5A} @@ -61,5 +73,6 @@ Global {10A6C033-D1B7-48F8-882D-FB2094BC7BD7} = {44835DE2-D38A-4533-BE69-3A06B779FF5A} {66567C90-4DA2-4A78-9428-A7C4188A0DC2} = {44835DE2-D38A-4533-BE69-3A06B779FF5A} {00BAC644-843A-4EE7-AF0E-78DFB4AA977D} = {F06738AF-F184-49F0-8C63-5CA259D062EB} + {5983781E-8F81-410B-B184-F1B9ADB26CA3} = {F06738AF-F184-49F0-8C63-5CA259D062EB} EndGlobalSection EndGlobal diff --git a/examples/FSharp.Control.Futures.Actors.Playground/FSharp.Control.Futures.Actors.Playground.fsproj b/examples/FSharp.Control.Futures.Actors.Playground/FSharp.Control.Futures.Actors.Playground.fsproj new file mode 100644 index 0000000..8c2f1a4 --- /dev/null +++ b/examples/FSharp.Control.Futures.Actors.Playground/FSharp.Control.Futures.Actors.Playground.fsproj @@ -0,0 +1,17 @@ + + + + Exe + net8.0 + + + + + + + + + + + + diff --git a/examples/FSharp.Control.Futures.Actors.Playground/Program.fs b/examples/FSharp.Control.Futures.Actors.Playground/Program.fs new file mode 100644 index 0000000..b462a13 --- /dev/null +++ b/examples/FSharp.Control.Futures.Actors.Playground/Program.fs @@ -0,0 +1,31 @@ + + + + + + +open FSharp.Control.Futures +open FSharp.Control.Futures.Actors +open FSharp.Control.Futures.Runtime + + + +type HelloActor() = + interface IActor> with + member this.Build(ctx) = + let addr = ctx.Bind(Handler.reply (fun _ctx msg -> future { + printfn $"Hello, {msg}!" + return 12 + })) + addr + member this.OnStop(var0, cancel) = failwith "todo" + member this.Start(var0) = failwith "todo" + member this.Stop(var0) = failwith "todo" + +let arb = Arbiter(fun () -> HelloActor()) + +let facade, arbFut = arb.Start() +let task = ThreadPoolRuntime.spawn arbFut + +let r = facade.Send("Bill") |> Future.runBlocking +printfn $"Reply is {r}" diff --git a/src/FSharp.Control.Futures.Actors/Arbiter.fs b/src/FSharp.Control.Futures.Actors/Arbiter.fs new file mode 100644 index 0000000..b21ccef --- /dev/null +++ b/src/FSharp.Control.Futures.Actors/Arbiter.fs @@ -0,0 +1,69 @@ +namespace FSharp.Control.Futures.Actors + +open FSharp.Control.Futures +open FSharp.Control.Futures.Runtime +open FSharp.Control.Futures.Sync + + + +type ArbiterMsgCell<'i, 'o> = + { Msg: 'i; Reply: OneShot<'o> } + +module ArbiterMsgCell = + let box (cell: ArbiterMsgCell<'i, 'o>) : obj = box cell + let unbox<'i, 'o> (boxedCell: obj) : ArbiterMsgCell<'i, 'o> = unbox boxedCell + +type ArbiterMsg = + | Msg of int * arbiterMsgCell: obj + + +type ArbiterAddress<'i, 'o>(arbiterMailbox: Mailbox, handlerIndex: int) = + interface IAddress<'i, 'o> with + member this.Send(msg: 'i): Future<'o> = future { + let os = OneShot<'o>() + let cell: ArbiterMsgCell<'i, 'o> = { Msg = msg; Reply = os } + let arbMsg = ArbiterMsg.Msg (handlerIndex, ArbiterMsgCell.box cell) + arbiterMailbox.Post(arbMsg) + return! os + } + + + + +type Arbiter<'f> = + val actorFactory: unit -> IActor<'f> + val mutable isStarted: bool + + new(actorFactory) = + { actorFactory = actorFactory + isStarted = false } + + member this.Start(): 'f * Future = + if this.isStarted then invalidOp "Double start arbiter" + this.isStarted <- true + + let mailbox = Mailbox() + let handlers = ResizeArray() + let actor = this.actorFactory () + + let bc = { + new IBuildContext<'f> with + override this.Bind<'i, 'o>(handler) = + let boxHandler (ctx: IActorContext<'f>) (cell: obj) = + let cell: ArbiterMsgCell<'i, 'o> = ArbiterMsgCell.unbox cell + handler.Handle(ctx, cell.Msg, cell.Reply.AsSender) + let idx = handlers.Count + handlers.Add(boxHandler) + let addr = ArbiterAddress(mailbox, idx) + addr + } + let facade = actor.Build(bc) + facade, future { + while true do + let! msg = mailbox.Receive() + match msg with + | Msg(idx, arbiterMsgCell) -> + let action = handlers[idx] (Unchecked.defaultof>) arbiterMsgCell + do! action + () + } diff --git a/src/FSharp.Control.Futures.Actors/FSharp.Control.Futures.Actors.fsproj b/src/FSharp.Control.Futures.Actors/FSharp.Control.Futures.Actors.fsproj new file mode 100644 index 0000000..9144a88 --- /dev/null +++ b/src/FSharp.Control.Futures.Actors/FSharp.Control.Futures.Actors.fsproj @@ -0,0 +1,19 @@ + + + + net8.0 + true + + + + + + + + + + + + + + diff --git a/src/FSharp.Control.Futures.Actors/Handler.fs b/src/FSharp.Control.Futures.Actors/Handler.fs new file mode 100644 index 0000000..2492402 --- /dev/null +++ b/src/FSharp.Control.Futures.Actors/Handler.fs @@ -0,0 +1,18 @@ +namespace FSharp.Control.Futures.Actors + +open FSharp.Control.Futures +open FSharp.Control.Futures.Sync +open FSharp.Control.Futures.Actors + + +[] +module Handler = + + let create<'i, 'o, 'f> (f: IActorContext<'f> -> 'i -> OneShotSender<'o> -> Future) : IHandler<'i, 'o, 'f> = + { new IHandler<'i, 'o, 'f> with member this.Handle(ctx, msg, reply) = f ctx msg reply } + + let reply<'i, 'o, 'f> (f: IActorContext<'f> -> 'i -> Future<'o>) : IHandler<'i, 'o, 'f> = + create (fun ctx msg reply -> future { let! r = f ctx msg in reply.Send(r) |> ignore; () }) + + let ofFunc<'i, 'o> (f: 'i -> 'o) : IHandler<'i, 'o, 'f> = + create (fun _ctx msg reply -> future { let r = f msg in reply.Send(r) |> ignore; () } ) diff --git a/src/FSharp.Control.Futures.Actors/Library.fs b/src/FSharp.Control.Futures.Actors/Library.fs new file mode 100644 index 0000000..15424fc --- /dev/null +++ b/src/FSharp.Control.Futures.Actors/Library.fs @@ -0,0 +1,5 @@ +namespace FSharp.Control.Futures.Actors + +module Say = + let hello name = + printfn "Hello %s" name diff --git a/src/FSharp.Control.Futures.Actors/Types.fs b/src/FSharp.Control.Futures.Actors/Types.fs new file mode 100644 index 0000000..af837bb --- /dev/null +++ b/src/FSharp.Control.Futures.Actors/Types.fs @@ -0,0 +1,120 @@ +namespace rec FSharp.Control.Futures.Actors + +open System.Diagnostics +open FSharp.Control.Futures +open FSharp.Control.Futures.Runtime +open FSharp.Control.Futures.Sync + +[] +type SendError = + | Closed + // | Timeout ??? + +[] +type TryPushError = + | Full + | Closed + +[] +type PushError = + | Closed + + +type IAddress<'i, 'o> = + /// + /// Отправляет сообщение актору и ожидает ответа. + /// Если получатель мертв и не обрабатывает сообщения, выкинет исключение. + /// + abstract Send: 'i -> Future<'o> + + /// + /// Отправляет сообщение актору и ожидает ответа. + /// Если получатель мертв и не обрабатывает сообщения, вернет ошибку. + /// + // abstract TrySend: 'i -> Future> + + // /// + // /// Мгновенно отправляет сообщение актору, но не может дождаться его ответа. + // /// Если очередь актора заполнена или он уже мертв, возвращает ошибку. + // /// + // abstract TryPush: 'i -> Result + // + // /// + // /// Отправляет сообщение актору игнорируя размер его очереди и другие ошибки. + // /// Возвращает ошибку если актор уже умер. + // /// + // abstract Push: 'i -> Result + +type ActorState = + | Starting + | Started + | Stopping + | Stopped + + with + member this.IsAlive: bool = + match this with + | Starting | Started -> true + | Stopping | Stopped -> false + + member this.IsDead: bool = + match this with + | Starting | Started -> false + | Stopping | Stopped -> true + +type IActorContext<'f> = + + abstract Spawn: Future<'a> -> IFutureTask<'a> + + /// + /// Self actor facade + /// + abstract Facade: 'f + + abstract State: ActorState + + /// + /// Stop receiving new messages and switch to Stopping status. + /// All already queued messages will be handled ignored if actor not restored from stopping. + /// + abstract Stop: unit -> unit + + /// + /// Stop receiving new messages and switch to Stopping status. + /// All already queued messages will be handled. + /// When all messages handled, switch to Stopped status. + /// + abstract Terminate: unit -> unit + + +/// +/// Actor handler interface for handle one msg type. +/// +/// Handlers in `FSharp.Control.Futures.Actors` typed by +/// input message type, output result type, and "facade" type. +/// +/// A "facade" is something that declares an actor's interface to itself and other world. +/// +/// +type IHandler<'i, 'o, 'f> = + abstract Handle: ctx: IActorContext<'f> * msg: 'i * reply: OneShotSender<'o> -> Future + +type IHandler<'i, 'o> = IHandler<'i, 'o, unit> + +type IBuildContext<'f> = + abstract Bind<'i, 'o> : IHandler<'i, 'o, 'f> -> IAddress<'i, 'o> + +// 'f -- facade +[] +type IActor<'f> = + abstract Build: IBuildContext<'f> -> 'f + + abstract Start: IActorContext<'f> -> unit + + /// + /// Called when actor stopping. + /// Cancel stopping, if `cancel` flag set to true. + /// + abstract OnStop: IActorContext<'f> * cancel: byref -> unit + + abstract Stop: IActorContext<'f> -> unit diff --git a/src/FSharp.Control.Futures/Extensions.fs b/src/FSharp.Control.Futures/Extensions.fs index 36f08f4..ab5bf7f 100644 --- a/src/FSharp.Control.Futures/Extensions.fs +++ b/src/FSharp.Control.Futures/Extensions.fs @@ -71,3 +71,8 @@ module Future = let timeout (duration: TimeSpan) (fut: Future<'a>) : Future> = Future.first (fut |> Future.map Ok) (sleep duration |> Future.map (fun _ -> Error (TimeoutException()))) + + +type IEnvironmentContext = + inherit IContext + abstract Environments: obj seq