From fae797fed38f817bbb269a3b039d60411b047181 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Sat, 15 Jun 2024 23:59:58 +0200 Subject: [PATCH] Add full Concurrent ML support --- bench/bench_ch.ml | 95 ++++++++++++ bench/main.ml | 1 + lib/picos/bootstrap/picos_bootstrap.ml | 206 ++++++++++++++++++++----- lib/picos/picos.mli | 58 +++++++ lib/picos_sync/ch.ml | 173 +++++++++++++++++++++ lib/picos_sync/event.ml | 6 + lib/picos_sync/picos_sync.ml | 1 + lib/picos_sync/picos_sync.mli | 41 ++++- test/test_picos_dscheck.ml | 8 +- test/test_sync.ml | 29 +++- 10 files changed, 567 insertions(+), 51 deletions(-) create mode 100644 bench/bench_ch.ml create mode 100644 lib/picos_sync/ch.ml diff --git a/bench/bench_ch.ml b/bench/bench_ch.ml new file mode 100644 index 000000000..f5aed489c --- /dev/null +++ b/bench/bench_ch.ml @@ -0,0 +1,95 @@ +open Multicore_bench +open Picos +open Picos_sync +open Picos_structured + +let run_one_domain ~budgetf () = + let n_msgs = 200 * Util.iter_factor in + let t = Ch.create () in + let init _ = () in + let wrap _ () = Scheduler.run in + let work _ () = + Bundle.join_after @@ fun bundle -> + begin + Bundle.fork bundle @@ fun () -> + for i = 1 to n_msgs do + Ch.give t i + done + end; + begin + Bundle.fork bundle @@ fun () -> + for _ = 1 to n_msgs do + Ch.take t |> ignore + done + end + in + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let run_one ~budgetf ~n_givers ~n_takers () = + let n_domains = n_givers + n_takers in + + let n_msgs = 200 / n_domains * Util.iter_factor in + + let t = Ch.create ~padded:true () in + + let n_msgs_to_give = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in + + let init _ = + Atomic.set n_msgs_to_give n_msgs; + Atomic.set n_msgs_to_take n_msgs + in + let wrap _ () = Scheduler.run in + let work i () = + let computation = Computation.create () in + let yielder () = + try + while true do + Fiber.yield () + done + with Exit -> () + in + Fiber.spawn ~forbid:false computation [ yielder ]; + begin + if i < n_givers then + let rec work () = + let n = Util.alloc n_msgs_to_give in + if 0 < n then begin + for i = 1 to n do + Ch.give t i + done; + work () + end + in + work () + else + let rec work () = + let n = Util.alloc n_msgs_to_take in + if 0 < n then begin + for _ = 1 to n do + Ch.take t |> ignore + done; + work () + end + in + work () + end; + Computation.cancel computation (Exn_bt.get_callstack 0 Exit) + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" (format "giver" n_givers) (format "taker" n_takers) + in + Times.record ~budgetf ~n_domains ~init ~wrap ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2 ] [ 1; 2 ] + |> List.concat_map @@ fun (n_givers, n_takers) -> + if Picos_domain.recommended_domain_count () < n_givers + n_takers then [] + else run_one ~budgetf ~n_givers ~n_takers ()) diff --git a/bench/main.ml b/bench/main.ml index c8796e5da..6950780c5 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -15,6 +15,7 @@ let benchmarks = ("Picos_mpscq", Bench_mpscq.run_suite); ("Picos_htbl", Bench_htbl.run_suite); ("Picos_stdio", Bench_stdio.run_suite); + ("Picos_sync Ch", Bench_ch.run_suite); ("Fib", Bench_fib.run_suite); ("Picos binaries", Bench_binaries.run_suite); ] diff --git a/lib/picos/bootstrap/picos_bootstrap.ml b/lib/picos/bootstrap/picos_bootstrap.ml index 510aec76b..249210c23 100644 --- a/lib/picos/bootstrap/picos_bootstrap.ml +++ b/lib/picos/bootstrap/picos_bootstrap.ml @@ -51,18 +51,134 @@ module Computation = struct let[@inline never] error_returned () = invalid_arg "already returned" - type 'a state = - | Canceled of Exn_bt.t - | Returned of 'a - | Continue of { balance_and_mode : int; triggers : Trigger.t list } + type _ tx = + | Done : [> `Done ] tx + | Alive : { + state : [ `Unknown | `Success | `Failure ] Atomic.t; + mutable completions : completions; + } + -> [> `Alive ] tx + + and completions = + | Nil + | Completion : { + computation : 'a t; + mutable before : ('a, [ `Continue ]) st; + completions : completions; + } + -> completions + + and ('a, _) st = + | Canceled : { + exn_bt : Exn_bt.t; + mutable tx : [ `Done | `Alive ] tx; + } + -> ('a, [> `Canceled ]) st + | Returned : { + value : 'a; + mutable tx : [ `Done | `Alive ] tx; + } + -> ('a, [> `Returned ]) st + | Continue : { + balance_and_mode : int; + triggers : Trigger.t list; + } + -> ('a, [> `Continue ]) st + + and 'a state = + | S : ('a, [< `Canceled | `Returned | `Continue ]) st -> 'a state + [@@unboxed] + + and 'a t = 'a state Atomic.t + + module Tx = struct + let[@inline never] already_committed () = invalid_arg "already committed" + let[@inline] same (type a b) (x : a t) (y : b t) = x == (Obj.magic y : a t) + + type t = [ `Alive ] tx + + let[@inline] create () = + Alive { state = Atomic.make `Unknown; completions = Nil } + + let rec rollback result tx = function + | Nil -> result + | Completion r -> begin + begin + match Atomic.get r.computation with + | ( S (Canceled { tx = previous_tx; _ }) + | S (Returned { tx = previous_tx; _ }) ) as before -> + if tx == previous_tx then + Atomic.compare_and_set r.computation before (S r.before) + |> ignore + | S (Continue _) -> () + end; + rollback result tx r.completions + end - type 'a t = 'a state Atomic.t + let rec commit = function + | Nil -> true + | Completion r -> + begin + match Atomic.get r.computation with + | S (Canceled r) -> r.tx <- Done + | S (Returned r) -> r.tx <- Done + | S (Continue _) -> failwith "Impossible" + end; + let (Continue { triggers; _ }) = r.before in + List.iter Trigger.signal triggers; + commit r.completions + + let rec try_abort (Alive r as tx : [ `Alive ] tx) = + match Atomic.get r.state with + | `Unknown -> + if Atomic.compare_and_set r.state `Unknown `Failure then + rollback true tx r.completions + else try_abort tx (* state is write once so No need to backoff *) + | `Failure -> rollback true tx r.completions + | `Success -> false + + let abort tx = if not (try_abort tx) then already_committed () + + let[@inline] try_abort = function + | Done -> false + | Alive _ as tx -> try_abort tx + + let rec try_complete (Alive r as tx : [ `Alive ] tx) computation backoff + (after : (_, [< `Canceled | `Returned ]) st) = + match Atomic.get computation with + | S (Continue _ as before) -> + Atomic.get r.state == `Unknown + && + let completions = r.completions in + r.completions <- Completion { computation; before; completions }; + Atomic.compare_and_set computation (S before) (S after) + || begin + r.completions <- completions; + try_complete tx computation (Backoff.once backoff) after + end + | S (Canceled { tx = previous_tx; _ }) + | S (Returned { tx = previous_tx; _ }) -> + try_abort previous_tx && try_complete tx computation backoff after + + let[@inline] try_return (Alive r as tx : [ `Alive ] tx) computation value = + try_complete tx computation Backoff.default + (Returned { value; tx = Alive r }) + + let[@inline] try_cancel (Alive r as tx : [ `Alive ] tx) computation exn_bt = + try_complete tx computation Backoff.default + (Canceled { exn_bt; tx = Alive r }) + + let try_commit (Alive r as tx : [ `Alive ] tx) = + if Atomic.compare_and_set r.state `Unknown `Success then + commit r.completions + else rollback false tx r.completions + end let fifo_bit = 1 let one = 2 - let empty_fifo = Continue { triggers = []; balance_and_mode = fifo_bit } - and empty_lifo = Continue { triggers = []; balance_and_mode = 0 } + let empty_fifo = S (Continue { triggers = []; balance_and_mode = fifo_bit }) + and empty_lifo = S (Continue { triggers = []; balance_and_mode = 0 }) let create ?(mode : [ `FIFO | `LIFO ] = `FIFO) () = Atomic.make (if mode == `FIFO then empty_fifo else empty_lifo) @@ -70,17 +186,17 @@ module Computation = struct let with_action ?(mode : [ `FIFO | `LIFO ] = `FIFO) x y action = let balance_and_mode = one + Bool.to_int (mode == `FIFO) in let trigger = Trigger.from_action x y action in - Atomic.make (Continue { balance_and_mode; triggers = [ trigger ] }) + Atomic.make (S (Continue { balance_and_mode; triggers = [ trigger ] })) let is_canceled t = match Atomic.get t with - | Canceled _ -> true - | Returned _ | Continue _ -> false + | S (Canceled { tx; _ }) -> tx == Done + | S (Returned _) | S (Continue _) -> false let canceled t = match Atomic.get t with - | Canceled exn_bt -> Some exn_bt - | Returned _ | Continue _ -> None + | S (Canceled { exn_bt; tx }) -> if tx == Done then Some exn_bt else None + | S (Returned _) | S (Continue _) -> None (** [gc] is called when balance becomes negative by both [try_attach] and [detach]. This ensures that the [O(n)] lazy removal done by [gc] cannot @@ -95,15 +211,16 @@ module Computation = struct if balance_and_mode <= one + fifo_bit then triggers else List.rev triggers in - Continue { balance_and_mode; triggers } + S (Continue { balance_and_mode; triggers }) | r :: rs -> if Trigger.is_signaled r then gc balance_and_mode triggers rs else gc (balance_and_mode + one) (r :: triggers) rs let rec try_attach t trigger backoff = match Atomic.get t with - | Returned _ | Canceled _ -> false - | Continue r as before -> + | S (Returned { tx; _ }) | S (Canceled { tx; _ }) -> + Tx.try_abort tx && try_attach t trigger backoff + | S (Continue r) as before -> (* We check the trigger before potential allocations. *) (not (Trigger.is_signaled trigger)) && @@ -111,7 +228,7 @@ module Computation = struct if fifo_bit <= r.balance_and_mode then let balance_and_mode = r.balance_and_mode + one in let triggers = trigger :: r.triggers in - Continue { balance_and_mode; triggers } + S (Continue { balance_and_mode; triggers }) else gc (one + (r.balance_and_mode land fifo_bit)) [ trigger ] r.triggers in @@ -122,13 +239,16 @@ module Computation = struct let rec unsafe_unsuspend t backoff = match Atomic.get t with - | Returned _ -> true - | Canceled _ -> false - | Continue r as before -> + | S (Returned { tx; _ }) -> + if Tx.try_abort tx then unsafe_unsuspend t backoff else true + | S (Canceled { tx; _ }) -> + if Tx.try_abort tx then unsafe_unsuspend t backoff else false + | S (Continue r) as before -> let after = if fifo_bit <= r.balance_and_mode then - Continue - { r with balance_and_mode = r.balance_and_mode - (2 * one) } + S + (Continue + { r with balance_and_mode = r.balance_and_mode - (2 * one) }) else gc (r.balance_and_mode land fifo_bit) [] r.triggers in Atomic.compare_and_set t before after @@ -143,13 +263,14 @@ module Computation = struct let is_running t = match Atomic.get t with - | Canceled _ | Returned _ -> false - | Continue _ -> true + | S (Canceled { tx; _ }) | S (Returned { tx; _ }) -> tx != Done + | S (Continue _) -> true let rec try_terminate t after backoff = match Atomic.get t with - | Returned _ | Canceled _ -> false - | Continue r as before -> + | S (Returned { tx; _ }) | S (Canceled { tx; _ }) -> + if Tx.try_abort tx then try_terminate t after backoff else false + | S (Continue r) as before -> if Atomic.compare_and_set t before after then begin List.iter Trigger.signal (if r.balance_and_mode land fifo_bit = fifo_bit then @@ -159,16 +280,20 @@ module Computation = struct end else try_terminate t after (Backoff.once backoff) - let returned_unit = Returned (Obj.magic ()) + let returned_unit = Obj.magic (S (Returned { value = (); tx = Done })) let make_returned value = - if value == Obj.magic () then returned_unit else Returned value + if value == Obj.magic () then returned_unit + else S (Returned { value; tx = Done }) let returned value = Atomic.make (make_returned value) let finished = Atomic.make (make_returned ()) let try_return t value = try_terminate t (make_returned value) Backoff.default let try_finish t = try_terminate t returned_unit Backoff.default - let try_cancel t exn_bt = try_terminate t (Canceled exn_bt) Backoff.default + + let try_cancel t exn_bt = + try_terminate t (S (Canceled { exn_bt; tx = Done })) Backoff.default + let return t value = try_return t value |> ignore let finish t = try_finish t |> ignore let cancel t exn_bt = try_cancel t exn_bt |> ignore @@ -182,19 +307,22 @@ module Computation = struct let check t = match Atomic.get t with - | Canceled exn_bt -> Exn_bt.raise exn_bt - | Returned _ | Continue _ -> () + | S (Canceled { exn_bt; tx; _ }) -> if tx == Done then Exn_bt.raise exn_bt + | S (Returned _) | S (Continue _) -> () let peek t = match Atomic.get t with - | Canceled exn_bt -> Some (Error exn_bt) - | Returned value -> Some (Ok value) - | Continue _ -> None + | S (Canceled { exn_bt; tx; _ }) -> + if tx == Done then Some (Error exn_bt) else None + | S (Returned { value; tx; _ }) -> + if tx == Done then Some (Ok value) else None + | S (Continue _) -> None let propagate _ from into = match Atomic.get from with - | Returned _ | Continue _ -> () - | Canceled _ as after -> try_terminate into after Backoff.default |> ignore + | S (Returned _) | S (Continue _) -> () + | S (Canceled _ as after) -> + try_terminate into (S after) Backoff.default |> ignore let canceler ~from ~into = Trigger.from_action from into propagate @@ -203,9 +331,11 @@ module Computation = struct let rec get_or block t = match Atomic.get t with - | Returned value -> value - | Canceled exn_bt -> Exn_bt.raise exn_bt - | Continue _ -> get_or block (block t) + | S (Returned { value; tx; _ }) -> + if tx == Done then value else get_or block (block t) + | S (Canceled { exn_bt; tx; _ }) -> + if tx == Done then Exn_bt.raise exn_bt else get_or block (block t) + | S (Continue _) -> get_or block (block t) let attach_canceler ~from ~into = let canceler = canceler ~from ~into in diff --git a/lib/picos/picos.mli b/lib/picos/picos.mli index f7ed275c3..3623b4483 100644 --- a/lib/picos/picos.mli +++ b/lib/picos/picos.mli @@ -543,6 +543,64 @@ module Computation : sig (** [capture computation fn x] is equivalent to {{!try_capture} [try_capture computation fn x |> ignore]}. *) + module Tx : sig + (** Transactional interface for atomically completing multiple computations. + + ⚠️ The implementation of this mechanism is designed to avoid making the + single computation completing operations, + i.e. {{!Computation.try_return} [try_return]} and + {{!Computation.try_cancel} [try_cancel]}, slower and to avoid making + computations heavier. For this reason the transaction mechanism is only + {{:https://en.wikipedia.org/wiki/Non-blocking_algorithm#Obstruction-freedom} + obstruction-free}. What this means is that a transaction may be aborted + by another transaction or by a single computation manipulating + operation. *) + + type 'a computation := 'a t + (** Destructively substituted alias for {!Computation.t}. *) + + val same : _ computation -> _ computation -> bool + (** [same computation1 computation2] determines whether the two computations + are the one and the same. *) + + type t + (** Represents a transaction. *) + + val create : unit -> t + (** [create ()] returns a new empty transaction. *) + + val try_return : t -> 'a computation -> 'a -> bool + (** [try_return tx computation value] adds the completion of the + [computation] as having returned the given [value] to the transaction. + Returns [true] in case the computation had not yet been completed and + the transaction was still alive. Otherwise returns [false] which means + that either the computation had already been completed or that the + transaction has been {{!abort} aborted}. *) + + val try_cancel : t -> 'a computation -> Exn_bt.t -> bool + (** [try_cancel tx computation exn_bt] adds the completion of the + [computation] as having canceled with the given [exn_bt] to the + transaction. Returns [true] in case the computation had not yet been + completed and the transaction was still alive. Otherwise returns + [false] which means that either the computation had already been + completed or that the transaction has been {{!abort} aborted}. *) + + val try_commit : t -> bool + (** [try_commit tx] attempts to mark the transaction as committed + successfully. Returns [true] in case of success, which means that all + the completions added to the transaction have been performed atomically. + Otherwise returns [false] which means that transaction was aborted and + it is as if none of the completions succesfully added to the transaction + have taken place. *) + + val abort : t -> unit + (** [abort tx] marks the transaction as aborted and rolls back changes made + by the transaction. + + @raise Invalid_argument in case the transaction had already been + {{!try_commit} committed} successfully. *) + end + (** {2 Interface for canceling} *) (** An existential wrapper for computations. *) diff --git a/lib/picos_sync/ch.ml b/lib/picos_sync/ch.ml new file mode 100644 index 000000000..2a6a08bd5 --- /dev/null +++ b/lib/picos_sync/ch.ml @@ -0,0 +1,173 @@ +module Q = struct + (** TODO: Better non-blocking queue/bag *) + + type 'a t = 'a list Atomic.t + + let create ?(padded = false) () = + Atomic.make [] |> if padded then Multicore_magic.copy_as_padded else Fun.id + + let rec add t x backoff = + let before = Atomic.get t in + let after = x :: before in + if not (Atomic.compare_and_set t before after) then + add t x (Backoff.once backoff) + + let add t x = add t x Backoff.default + + let rec remove t x backoff = + let before = Atomic.get t in + let after = List.filter (( != ) x) before in + if not (Atomic.compare_and_set t before after) then + remove t x (Backoff.once backoff) + + let remove t x = remove t x Backoff.default + let all t = Atomic.get t + + exception Empty + + let rec take_exn t backoff = + match Atomic.get t with + | [] -> raise_notrace Empty + | x :: after as before -> + if Atomic.compare_and_set t before after then x + else take_exn t (Backoff.once backoff) + + let take_exn t = take_exn t Backoff.default +end + +open Picos +module Tx = Computation.Tx + +type 'a taker = + | T : { + computation : (unit -> 'r) Computation.t; + result : 'a -> 'r; + } + -> 'a taker + +type 'a giver = + | G : { + computation : (unit -> 'r) Computation.t; + result : unit -> 'r; + value : 'a; + } + -> 'a giver + +type 'a t = { givers : 'a giver Q.t; takers : 'a taker Q.t } + +let create ?padded () = + let givers = Q.create ?padded () and takers = Q.create ?padded () in + { givers; takers } + |> + match padded with + | Some true -> Multicore_magic.copy_as_padded + | None | Some false -> Fun.id + +(* *) + +let remover _ self queue = Q.remove queue self +let exit_exn_bt = Exn_bt.get_callstack 0 Exit + +let await computation = + if Computation.is_running computation then begin + let trigger = Trigger.create () in + if Computation.try_attach computation trigger then begin + match Trigger.await trigger with + | None -> () + | Some exn_bt -> + if Computation.try_cancel computation exit_exn_bt then + Exn_bt.raise exn_bt + end + end; + Computation.await computation () + +(* *) + +let rec give_as t (G gr as self) backoff = function + | [] -> () + | T tr :: trs as retry -> + if Tx.same tr.computation gr.computation then give_as t self backoff trs + else + let tx = Tx.create () in + let result = tr.result in + let value = gr.value in + if not (Tx.try_return tx tr.computation (fun () -> result value)) then + give_as t self backoff trs + else if + (not (Tx.try_return tx gr.computation gr.result)) + || not (Tx.try_commit tx) + then begin + Tx.abort tx; + let backoff = Backoff.once backoff in + if Computation.is_running gr.computation then + give_as t self backoff + (if Computation.is_running tr.computation then retry else trs) + end + +let give_as t (G gr as self) = + let[@alert "-handler"] remover = Trigger.from_action self t.givers remover in + if Computation.try_attach gr.computation remover then begin + Q.add t.givers self; + give_as t self Backoff.default (Q.all t.takers) + end + +let rec give t value = + match Q.take_exn t.takers with + | T tr -> + let result = tr.result in + if not (Computation.try_return tr.computation (fun () -> result value)) + then give t value (* backoff at this point seems unnecesssary *) + | exception Q.Empty -> + let computation = Computation.create () in + give_as t @@ G { computation; result = Fun.id; value }; + await computation + +let give_evt t value = + let request computation result = + give_as t (G { computation; result; value }) + in + Event.from_request { request } + +(* *) + +let rec take_as t (T tr as self) backoff = function + | [] -> () + | G gr :: grs as retry -> + if Tx.same tr.computation gr.computation then take_as t self backoff grs + else + let tx = Tx.create () in + let result = tr.result in + let value = gr.value in + if not (Tx.try_return tx gr.computation gr.result) then + take_as t self backoff grs + else if + (not (Tx.try_return tx tr.computation (fun () -> result value))) + || not (Tx.try_commit tx) + then begin + Tx.abort tx; + let backoff = Backoff.once backoff in + if Computation.is_running tr.computation then + take_as t self backoff + (if Computation.is_running gr.computation then retry else grs) + end + +let take_as t (T tr as self) = + let[@alert "-handler"] remover = Trigger.from_action self t.takers remover in + if Computation.try_attach tr.computation remover then begin + Q.add t.takers self; + take_as t self Backoff.default (Q.all t.givers) + end + +let rec take t = + match Q.take_exn t.givers with + | G r -> + if Computation.try_return r.computation r.result then r.value + else take t (* backoff at this point seems unnecesssary *) + | exception Q.Empty -> + let computation = Computation.create () in + take_as t @@ T { computation; result = Fun.id }; + await computation + +let take_evt t = + let request computation result = take_as t (T { computation; result }) in + Event.from_request { request } diff --git a/lib/picos_sync/event.ml b/lib/picos_sync/event.ml index d7c238cd7..b2e5a92e6 100644 --- a/lib/picos_sync/event.ml +++ b/lib/picos_sync/event.ml @@ -84,6 +84,12 @@ let[@alert "-handler"] from_computation source = in Request { request } +let always value = + let request computation to_result = + Computation.return computation @@ fun () -> to_result value + in + Request { request } + type 'a event = 'a t let[@inline] from_request p = Request p diff --git a/lib/picos_sync/picos_sync.ml b/lib/picos_sync/picos_sync.ml index 084882fba..ae5e74e92 100644 --- a/lib/picos_sync/picos_sync.ml +++ b/lib/picos_sync/picos_sync.ml @@ -2,3 +2,4 @@ module Mutex = Mutex module Condition = Condition module Lazy = Lazy module Event = Event +module Ch = Ch diff --git a/lib/picos_sync/picos_sync.mli b/lib/picos_sync/picos_sync.mli index a517898bc..c57dfab28 100644 --- a/lib/picos_sync/picos_sync.mli +++ b/lib/picos_sync/picos_sync.mli @@ -14,7 +14,7 @@ (** {1 Modules} *) module Mutex : sig - (** A mutex implementation for {!Picos}. + (** A mutual-exclusion lock or mutex. ℹ️ This intentionally mimics the interface of {!Stdlib.Mutex}. Unlike with the standard library mutex, blocking on this mutex potentially allows an @@ -74,7 +74,7 @@ module Mutex : sig end module Condition : sig - (** A condition implementation for {!Picos}. + (** A condition variable. ℹ️ This intentionally mimics the interface of {!Stdlib.Condition}. Unlike with the standard library condition variable, blocking on this condition @@ -105,7 +105,7 @@ module Condition : sig end module Lazy : sig - (** A lazy implementation for {!Picos}. + (** A lazy suspension. ℹ️ This intentionally mimics the interface of {!Stdlib.Lazy}. Unlike with the standard library suspensions an attempt to force a suspension from @@ -162,7 +162,7 @@ module Lazy : sig end module Event : sig - (** An implementation of first-class synchronous communication for {!Picos}. + (** A first-class synchronous communication abstraction. Events describe a thing that might happen in the future, or a concurrent offer or request that might be accepted or succeed, but is cancelable if @@ -182,6 +182,10 @@ module Event : sig {{:https://ocaml.org/manual/5.2/api/Event.html} [Event]} module signature. *) + val always : 'a -> 'a t + (** [always value] returns an event that can always be committed to resulting + in the given [value]. *) + (** {2 Composing events} *) val choose : 'a t list -> 'a t @@ -263,6 +267,35 @@ module Event : sig computation. *) end +module Ch : sig + (** A synchronous channel. *) + + type !'a t + (** Represents a synchronous channel for handing over messages of type + ['a]. *) + + val create : ?padded:bool -> unit -> 'a t + (** [create ()] creates a new synchronous channel. *) + + val give : 'a t -> 'a -> unit + (** [give ch value] waits until another fiber is ready to take a message on + the [ch]annel and gives the message to it. *) + + val give_evt : 'a t -> 'a -> unit Event.t + (** [give_evt ch value] returns an event that can be committed to once another + fiber is ready to take a message on the [ch]annel. Committing to the + event results in giving the message to the other fiber. *) + + val take : 'a t -> 'a + (** [take ch] waits until another fiber is ready to give a message on the + [ch]annel and takes the message from it. *) + + val take_evt : 'a t -> 'a Event.t + (** [take_evt ch] returns an event that can be committed to once another fiber + is ready to give a message on the [ch]annel. Committing to the event + results in taking the message from the other fiber. *) +end + (** {1 Examples} {2 A simple bounded queue} diff --git a/test/test_picos_dscheck.ml b/test/test_picos_dscheck.ml index d1716b258..cf170b1b6 100644 --- a/test/test_picos_dscheck.ml +++ b/test/test_picos_dscheck.ml @@ -111,8 +111,8 @@ let test_computation_removes_triggers () = Array.for_all Trigger.is_signaled triggers && match Atomic.get computation with - | Canceled _ | Returned _ -> false - | Continue { balance_and_mode; triggers } -> + | S (Canceled _) | S (Returned _) -> false + | S (Continue { balance_and_mode; triggers }) -> balance_and_mode <= Computation.fifo_bit && List.length triggers <= 2 && @@ -120,8 +120,8 @@ let test_computation_removes_triggers () = Computation.try_attach computation trigger && begin match Atomic.get computation with - | Canceled _ | Returned _ -> false - | Continue { balance_and_mode; triggers } -> + | S (Canceled _) | S (Returned _) -> false + | S (Continue { balance_and_mode; triggers }) -> balance_and_mode <= Computation.one + Computation.fifo_bit && triggers = [ trigger ] end diff --git a/test/test_sync.ml b/test/test_sync.ml index 1a5ea5670..5a5a2b5d9 100644 --- a/test/test_sync.ml +++ b/test/test_sync.ml @@ -209,7 +209,7 @@ let test_event_basics () = Event.select [ Event.from_computation (Computation.create ()); - Event.from_computation (Computation.returned 51) |> Event.map (( + ) 50); + Event.always 51 |> Event.map (( + ) 50); ] = 101); begin @@ -221,16 +221,34 @@ let test_event_basics () = end; begin match - [ - Event.guard (fun () -> raise Exit); - Event.from_computation (Computation.returned 42); - ] + [ Event.guard (fun () -> raise Exit); Event.always 42 ] |> Event.choose |> Event.sync with | _ -> assert false | exception Exit -> () end +let test_ch () = + Test_scheduler.run ~max_domains:2 @@ fun () -> + Bundle.join_after @@ fun bundle -> + let ch = Ch.create () in + assert ( + 76 + == Event.select + [ + Event.wrap (Ch.give_evt ch 42) (fun () -> assert false); + Ch.take_evt ch; + Event.always 76; + ]); + [ (fun () -> Ch.take ch); (fun () -> Event.sync (Ch.take_evt ch)) ] + |> List.iter @@ fun take -> + [ Ch.give ch; (fun value -> Event.sync (Ch.give_evt ch value)) ] + |> List.iter @@ fun give -> + let value = Random.bits () in + let promise = Bundle.fork_as_promise bundle take in + Bundle.fork bundle (fun () -> give value); + assert (value == Promise.await promise) + let () = [ ( "Mutex and Condition", @@ -246,5 +264,6 @@ let () = Alcotest.test_case "cancelation" `Quick test_lazy_cancelation; ] ); ("Event", [ Alcotest.test_case "basics" `Quick test_event_basics ]); + ("Ch", [ Alcotest.test_case "basics" `Quick test_ch ]); ] |> Alcotest.run "Picos_sync"