Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
DragonFrai committed Sep 12, 2024
1 parent 2cee60c commit 6a655ad
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 0 deletions.
13 changes: 13 additions & 0 deletions FSharp.Control.Futures.sln
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{F0
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Futures.Examples", "examples\FSharp.Control.Futures.Examples\FSharp.Control.Futures.Examples.fsproj", "{00BAC644-843A-4EE7-AF0E-78DFB4AA977D}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Futures.Actors", "src\FSharp.Control.Futures.Actors\FSharp.Control.Futures.Actors.fsproj", "{AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FSharp.Control.Futures.Actors.Playground", "examples\FSharp.Control.Futures.Actors.Playground\FSharp.Control.Futures.Actors.Playground.fsproj", "{5983781E-8F81-410B-B184-F1B9ADB26CA3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -54,12 +58,21 @@ Global
{00BAC644-843A-4EE7-AF0E-78DFB4AA977D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{00BAC644-843A-4EE7-AF0E-78DFB4AA977D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{00BAC644-843A-4EE7-AF0E-78DFB4AA977D}.Release|Any CPU.Build.0 = Release|Any CPU
{AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AD517DBF-4D4D-488D-9A7C-8145C89DA2B2}.Release|Any CPU.Build.0 = Release|Any CPU
{5983781E-8F81-410B-B184-F1B9ADB26CA3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5983781E-8F81-410B-B184-F1B9ADB26CA3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5983781E-8F81-410B-B184-F1B9ADB26CA3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5983781E-8F81-410B-B184-F1B9ADB26CA3}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{B0400CAB-0469-44D5-B0A5-93265136ABDB} = {44835DE2-D38A-4533-BE69-3A06B779FF5A}
{8942F91F-8948-4679-9F81-DBD11008EEE9} = {5C394F17-6CE6-4A62-AB3B-AD614A346ECC}
{10A6C033-D1B7-48F8-882D-FB2094BC7BD7} = {44835DE2-D38A-4533-BE69-3A06B779FF5A}
{66567C90-4DA2-4A78-9428-A7C4188A0DC2} = {44835DE2-D38A-4533-BE69-3A06B779FF5A}
{00BAC644-843A-4EE7-AF0E-78DFB4AA977D} = {F06738AF-F184-49F0-8C63-5CA259D062EB}
{5983781E-8F81-410B-B184-F1B9ADB26CA3} = {F06738AF-F184-49F0-8C63-5CA259D062EB}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Compile Include="Program.fs"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\FSharp.Control.Futures.Actors\FSharp.Control.Futures.Actors.fsproj" />
<ProjectReference Include="..\..\src\FSharp.Control.Futures\FSharp.Control.Futures.fsproj" />
</ItemGroup>

</Project>
31 changes: 31 additions & 0 deletions examples/FSharp.Control.Futures.Actors.Playground/Program.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@






open FSharp.Control.Futures
open FSharp.Control.Futures.Actors
open FSharp.Control.Futures.Runtime



type HelloActor() =
interface IActor<IAddress<string, int>> with
member this.Build(ctx) =
let addr = ctx.Bind(Handler.reply (fun _ctx msg -> future {
printfn $"Hello, {msg}!"
return 12
}))
addr
member this.OnStop(var0, cancel) = failwith "todo"
member this.Start(var0) = failwith "todo"
member this.Stop(var0) = failwith "todo"

let arb = Arbiter(fun () -> HelloActor())

let facade, arbFut = arb.Start()
let task = ThreadPoolRuntime.spawn arbFut

let r = facade.Send("Bill") |> Future.runBlocking
printfn $"Reply is {r}"
69 changes: 69 additions & 0 deletions src/FSharp.Control.Futures.Actors/Arbiter.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
namespace FSharp.Control.Futures.Actors

open FSharp.Control.Futures
open FSharp.Control.Futures.Runtime
open FSharp.Control.Futures.Sync



type ArbiterMsgCell<'i, 'o> =
{ Msg: 'i; Reply: OneShot<'o> }

module ArbiterMsgCell =
let box (cell: ArbiterMsgCell<'i, 'o>) : obj = box cell
let unbox<'i, 'o> (boxedCell: obj) : ArbiterMsgCell<'i, 'o> = unbox boxedCell

type ArbiterMsg =
| Msg of int * arbiterMsgCell: obj


type ArbiterAddress<'i, 'o>(arbiterMailbox: Mailbox<ArbiterMsg>, handlerIndex: int) =
interface IAddress<'i, 'o> with
member this.Send(msg: 'i): Future<'o> = future {
let os = OneShot<'o>()
let cell: ArbiterMsgCell<'i, 'o> = { Msg = msg; Reply = os }
let arbMsg = ArbiterMsg.Msg (handlerIndex, ArbiterMsgCell.box cell)
arbiterMailbox.Post(arbMsg)
return! os
}




type Arbiter<'f> =
val actorFactory: unit -> IActor<'f>
val mutable isStarted: bool

new(actorFactory) =
{ actorFactory = actorFactory
isStarted = false }

member this.Start(): 'f * Future<unit> =
if this.isStarted then invalidOp "Double start arbiter"
this.isStarted <- true

let mailbox = Mailbox()
let handlers = ResizeArray()
let actor = this.actorFactory ()

let bc = {
new IBuildContext<'f> with
override this.Bind<'i, 'o>(handler) =
let boxHandler (ctx: IActorContext<'f>) (cell: obj) =
let cell: ArbiterMsgCell<'i, 'o> = ArbiterMsgCell.unbox cell
handler.Handle(ctx, cell.Msg, cell.Reply.AsSender)
let idx = handlers.Count
handlers.Add(boxHandler)
let addr = ArbiterAddress(mailbox, idx)
addr
}
let facade = actor.Build(bc)
facade, future {
while true do
let! msg = mailbox.Receive()
match msg with
| Msg(idx, arbiterMsgCell) ->
let action = handlers[idx] (Unchecked.defaultof<IActorContext<'f>>) arbiterMsgCell
do! action
()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<Compile Include="Types.fs" />
<Compile Include="Handler.fs" />
<Compile Include="Arbiter.fs" />
<Compile Include="Library.fs"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\FSharp.Control.Futures\FSharp.Control.Futures.fsproj" />
</ItemGroup>

</Project>
18 changes: 18 additions & 0 deletions src/FSharp.Control.Futures.Actors/Handler.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace FSharp.Control.Futures.Actors

open FSharp.Control.Futures
open FSharp.Control.Futures.Sync
open FSharp.Control.Futures.Actors


[<RequireQualifiedAccess>]
module Handler =

let create<'i, 'o, 'f> (f: IActorContext<'f> -> 'i -> OneShotSender<'o> -> Future<unit>) : IHandler<'i, 'o, 'f> =
{ new IHandler<'i, 'o, 'f> with member this.Handle(ctx, msg, reply) = f ctx msg reply }

let reply<'i, 'o, 'f> (f: IActorContext<'f> -> 'i -> Future<'o>) : IHandler<'i, 'o, 'f> =
create (fun ctx msg reply -> future { let! r = f ctx msg in reply.Send(r) |> ignore; () })

let ofFunc<'i, 'o> (f: 'i -> 'o) : IHandler<'i, 'o, 'f> =
create (fun _ctx msg reply -> future { let r = f msg in reply.Send(r) |> ignore; () } )
5 changes: 5 additions & 0 deletions src/FSharp.Control.Futures.Actors/Library.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace FSharp.Control.Futures.Actors

module Say =
let hello name =
printfn "Hello %s" name
120 changes: 120 additions & 0 deletions src/FSharp.Control.Futures.Actors/Types.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
namespace rec FSharp.Control.Futures.Actors

open System.Diagnostics
open FSharp.Control.Futures
open FSharp.Control.Futures.Runtime
open FSharp.Control.Futures.Sync

[<RequireQualifiedAccess>]
type SendError =
| Closed
// | Timeout ???

[<RequireQualifiedAccess>]
type TryPushError =
| Full
| Closed

[<RequireQualifiedAccess>]
type PushError =
| Closed


type IAddress<'i, 'o> =
/// <summary>
/// Отправляет сообщение актору и ожидает ответа.
/// Если получатель мертв и не обрабатывает сообщения, выкинет исключение.
/// </summary>
abstract Send: 'i -> Future<'o>

/// <summary>
/// Отправляет сообщение актору и ожидает ответа.
/// Если получатель мертв и не обрабатывает сообщения, вернет ошибку.
/// </summary>
// abstract TrySend: 'i -> Future<Result<'o, SendError>>

// /// <summary>
// /// Мгновенно отправляет сообщение актору, но не может дождаться его ответа.
// /// Если очередь актора заполнена или он уже мертв, возвращает ошибку.
// /// </summary>
// abstract TryPush: 'i -> Result<unit, TryPushError>
//
// /// <summary>
// /// Отправляет сообщение актору игнорируя размер его очереди и другие ошибки.
// /// Возвращает ошибку если актор уже умер.
// /// </summary>
// abstract Push: 'i -> Result<unit, PushError>

type ActorState =
| Starting
| Started
| Stopping
| Stopped

with
member this.IsAlive: bool =
match this with
| Starting | Started -> true
| Stopping | Stopped -> false

member this.IsDead: bool =
match this with
| Starting | Started -> false
| Stopping | Stopped -> true

type IActorContext<'f> =

abstract Spawn: Future<'a> -> IFutureTask<'a>

/// <summary>
/// Self actor facade
/// </summary>
abstract Facade: 'f

abstract State: ActorState

/// <summary>
/// Stop receiving new messages and switch to Stopping status.
/// All already queued messages will be handled ignored if actor not restored from stopping.
/// </summary>
abstract Stop: unit -> unit

/// <summary>
/// Stop receiving new messages and switch to Stopping status.
/// All already queued messages will be handled.
/// When all messages handled, switch to Stopped status.
/// </summary>
abstract Terminate: unit -> unit


/// <summary>
/// Actor handler interface for handle one msg type.
///
/// Handlers in `FSharp.Control.Futures.Actors` typed by
/// input message type, output result type, and "facade" type.
///
/// A "facade" is something that declares an actor's interface to itself and other world.
///
/// </summary>
type IHandler<'i, 'o, 'f> =
abstract Handle: ctx: IActorContext<'f> * msg: 'i * reply: OneShotSender<'o> -> Future<unit>

type IHandler<'i, 'o> = IHandler<'i, 'o, unit>

type IBuildContext<'f> =
abstract Bind<'i, 'o> : IHandler<'i, 'o, 'f> -> IAddress<'i, 'o>

// 'f -- facade
[<Interface>]
type IActor<'f> =
abstract Build: IBuildContext<'f> -> 'f

abstract Start: IActorContext<'f> -> unit

/// <summary>
/// Called when actor stopping.
/// Cancel stopping, if `cancel` flag set to true.
/// </summary>
abstract OnStop: IActorContext<'f> * cancel: byref<bool> -> unit

abstract Stop: IActorContext<'f> -> unit
5 changes: 5 additions & 0 deletions src/FSharp.Control.Futures/Extensions.fs
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ module Future =

let timeout (duration: TimeSpan) (fut: Future<'a>) : Future<Result<'a, TimeoutException>> =
Future.first (fut |> Future.map Ok) (sleep duration |> Future.map (fun _ -> Error (TimeoutException())))


type IEnvironmentContext =
inherit IContext
abstract Environments: obj seq

0 comments on commit 6a655ad

Please sign in to comment.