From 129075f12d53f37fc2f821c1262760563da07c23 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Sat, 15 Jun 2024 23:59:58 +0200 Subject: [PATCH] Add CML style Ch --- bench/bench_ch.ml | 89 +++++++ bench/dune | 1 + bench/main.ml | 1 + .../picos_std_structured.mli | 7 + lib/picos_std.sync/ch.ml | 220 ++++++++++++++++++ lib/picos_std.sync/condition.ml | 14 +- lib/picos_std.sync/mutex.ml | 179 +++++++------- lib/picos_std.sync/picos_std_sync.ml | 1 + lib/picos_std.sync/picos_std_sync.mli | 29 +++ lib/picos_std.sync/q.ml | 64 +++-- lib/picos_std.sync/s.ml | 41 ++++ lib/picos_std.sync/semaphore.ml | 40 ++-- test/dune | 1 + test/test_sync.ml | 22 ++ 14 files changed, 560 insertions(+), 149 deletions(-) create mode 100644 bench/bench_ch.ml create mode 100644 lib/picos_std.sync/ch.ml create mode 100644 lib/picos_std.sync/s.ml diff --git a/bench/bench_ch.ml b/bench/bench_ch.ml new file mode 100644 index 000000000..ef7070ba6 --- /dev/null +++ b/bench/bench_ch.ml @@ -0,0 +1,89 @@ +open Multicore_bench +open Picos +open Picos_std_sync +open Picos_std_structured + +let run_one_domain ~budgetf () = + let n_msgs = 200 * Util.iter_factor in + let t = Ch.create () in + let giver () = + for i = 1 to n_msgs do + Ch.give t i + done + and taker () = + for _ = 1 to n_msgs do + Ch.take t |> ignore + done + in + let init _ = () in + let wrap _ () = Scheduler.run in + let work _ () = + Run.all (if Random.bool () then [ taker; giver ] else [ giver; taker ]) + in + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let yielder () = + while true do + Fiber.yield () + done + +let run_one ~budgetf ~n_givers ~n_takers () = + let n_domains = n_givers + n_takers in + + let n_msgs = 200 / n_domains * Util.iter_factor in + + let t = Ch.create ~padded:true () in + + let n_msgs_to_give = Atomic.make 0 |> Multicore_magic.copy_as_padded in + let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in + + let init _ = + Atomic.set n_msgs_to_give n_msgs; + Atomic.set n_msgs_to_take n_msgs + in + let wrap _ () = Scheduler.run in + let work i () = + Flock.join_after ~on_return:`Terminate @@ fun () -> + Flock.fork yielder; + begin + if i < n_givers then + let rec work () = + let n = Util.alloc n_msgs_to_give in + if 0 < n then begin + for i = 1 to n do + Ch.give t i + done; + work () + end + in + work () + else + let rec work () = + let n = Util.alloc n_msgs_to_take in + if 0 < n then begin + for _ = 1 to n do + Ch.take t |> ignore + done; + work () + end + in + work () + end + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" (format "giver" n_givers) (format "taker" n_takers) + in + Times.record ~budgetf ~n_domains ~init ~wrap ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ] + |> List.concat_map @@ fun (n_givers, n_takers) -> + if Picos_domain.recommended_domain_count () < n_givers + n_takers then [] + else run_one ~budgetf ~n_givers ~n_takers ()) diff --git a/bench/dune b/bench/dune index 3d27fe7ee..c33addfac 100644 --- a/bench/dune +++ b/bench/dune @@ -18,6 +18,7 @@ (run %{test} -brief "Picos_mpscq") (run %{test} -brief "Picos_htbl") (run %{test} -brief "Picos_stdio") + (run %{test} -brief "Picos_sync Ch") (run %{test} -brief "Picos_sync Stream") (run %{test} -brief "Fib") (run %{test} -brief "Picos binaries") diff --git a/bench/main.ml b/bench/main.ml index 423c3774a..69985e99e 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -17,6 +17,7 @@ let benchmarks = ("Picos_mpscq", Bench_mpscq.run_suite); ("Picos_htbl", Bench_htbl.run_suite); ("Picos_stdio", Bench_stdio.run_suite); + ("Picos_sync Ch", Bench_ch.run_suite); ("Picos_sync Stream", Bench_stream.run_suite); ("Fib", Bench_fib.run_suite); ("Picos binaries", Bench_binaries.run_suite); diff --git a/lib/picos_std.structured/picos_std_structured.mli b/lib/picos_std.structured/picos_std_structured.mli index 78e3e1da8..40c021de8 100644 --- a/lib/picos_std.structured/picos_std_structured.mli +++ b/lib/picos_std.structured/picos_std_structured.mli @@ -456,6 +456,11 @@ end Ivar.read ivar end; + Flock.fork begin fun () -> + let ch = Ch.create () in + Ch.give ch "never" + end; + Flock.fork begin fun () -> let stream = Stream.create () in Stream.read (Stream.tap stream) @@ -514,6 +519,8 @@ end count of the latch never reaches [0], - {{!Picos_std_sync.Ivar.read} [Ivar.read]} never returns, because the incremental variable is never filled, + - {{!Picos_std_sync.Ch.give} [Ch.give]} never returns, because no fiber + {{!Picos_std_sync.Ch.take} takes} the message, - {{!Picos_std_sync.Stream.read} [Stream.read]} never returns, because the stream is never pushed to, - {{!Picos_io.Unix.read} [Unix.read]} never returns, because the socket is diff --git a/lib/picos_std.sync/ch.ml b/lib/picos_std.sync/ch.ml new file mode 100644 index 000000000..d75a93fea --- /dev/null +++ b/lib/picos_std.sync/ch.ml @@ -0,0 +1,220 @@ +open Picos_std_event +open Picos +module Atomic = Multicore_magic.Transparent_atomic +module Tx = Computation.Tx + +type 'a taker = + | T : { + computation : (unit -> 'r) Computation.t; + result : 'a -> 'r; + } + -> 'a taker + +type 'a giver = + | G : { + computation : (unit -> 'r) Computation.t; + result : unit -> 'r; + value : 'a; + } + -> 'a giver + +type 'a state = { givers : 'a giver Q.t; takers : 'a taker Q.t } + +let empty = { givers = T Zero; takers = T Zero } + +type 'a t = 'a state Atomic.t + +let create ?padded () = Atomic.make empty |> Multicore_magic.copy_as ?padded + +(* *) + +let[@inline never] wait computation = + let trigger = Trigger.create () in + if Computation.try_attach computation trigger then + match Trigger.await trigger with + | None -> () + | Some (exn, bt) -> + if Computation.try_cancel computation Exit bt then + Printexc.raise_with_backtrace exn bt + +(* *) + +let rec give t value backoff = + let before = Atomic.fenceless_get t in + match before.takers with + | Q.T Zero -> + let computation = Computation.create ~mode:`LIFO () in + let self = G { computation; result = Fun.id; value } in + let givers = Q.add before.givers self in + let after = { givers; takers = T Zero } in + if Atomic.compare_and_set t before after then wait computation + else give t value (Backoff.once backoff) + | Q.T (One _ as takers) -> + let (T { computation; result }) = Q.head takers in + let got = Computation.try_return computation (fun () -> result value) in + let takers = Q.tail takers in + let givers = before.givers in + let after = + if takers == T Zero && givers == T Zero then empty + else { givers; takers } + in + let no_contention = Atomic.compare_and_set t before after in + if not got then + give t value (if no_contention then backoff else Backoff.once backoff) + +let rec take t backoff = + let before = Atomic.fenceless_get t in + match before.givers with + | Q.T Zero -> + let computation = Computation.create ~mode:`LIFO () in + let self = T { computation; result = Fun.id } in + let takers = Q.add before.takers self in + let after = { givers = T Zero; takers } in + if Atomic.compare_and_set t before after then begin + wait computation; + Computation.await computation () + end + else take t (Backoff.once backoff) + | Q.T (One _ as givers) -> + let (G { computation; result; value }) = Q.head givers in + let got = Computation.try_return computation result in + let givers = Q.tail givers in + let takers = before.takers in + let after = + if givers == T Zero && takers == T Zero then empty + else { givers; takers } + in + let no_contention = Atomic.compare_and_set t before after in + if got then value + else take t (if no_contention then backoff else Backoff.once backoff) + +(* *) + +let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ S.cons) + tail = + let (T tr as taker) = head_r.value in + if Tx.same tr.computation gr.computation then + let selfs = S.cons taker selfs in + give_as_advance t self before selfs head tail + else + let tx = Tx.create () in + let result = tr.result in + let value = gr.value in + if not (Tx.try_return tx tr.computation (fun () -> result value)) then + give_as_advance t self before selfs head tail + else if + (not (Tx.try_return tx gr.computation gr.result)) + || not (Tx.try_commit tx) + then + if not (Computation.is_running gr.computation) then ( (* TODO *) ) + else if Computation.is_running tr.computation then + give_as t self before selfs head tail + else give_as_advance t self before selfs head tail + else + let takers = + if head == tail then Q.reverse_as_queue selfs + else + let head = S.reverse_to (S.as_cons head_r.next) selfs in + Q.T (One { head; tail; cons = tail }) + in + let givers = before.givers in + let after = + if takers == Q.T Zero && givers == Q.T Zero then empty + else { givers; takers } + in + if not (Atomic.compare_and_set t before after) then + ( (* TODO: avoid leak *) ) + +and give_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail = + if head != tail then give_as t self before selfs (S.as_cons head_r.next) tail + else + let takers = Q.reverse_as_queue selfs in + let givers = Q.add before.givers self in + let after = { givers; takers } in + if not (Atomic.compare_and_set t before after) then give_as_start t self + +and give_as_start t self = + let before = Atomic.get t in + match before.takers with + | Q.T Zero -> + let takers = Q.T Zero in + let givers = Q.singleton self in + let after = { givers; takers } in + if not (Atomic.compare_and_set t before after) then give_as_start t self + | Q.T (One r as o) -> + Q.exec o; + give_as t self before (T Nil) r.head r.cons + +let give_evt t value = + let request computation result = + give_as_start t (G { computation; result; value }) + in + Event.from_request { request } + +(* *) + +let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ S.cons) + tail = + let (G gr as giver) = head_r.value in + if Tx.same tr.computation gr.computation then + let selfs = S.cons giver selfs in + take_as_advance t self before selfs head tail + else + let tx = Tx.create () in + let result = tr.result in + let value = gr.value in + if not (Tx.try_return tx gr.computation gr.result) then + take_as_advance t self before selfs head tail + else if + (not (Tx.try_return tx tr.computation (fun () -> result value))) + || not (Tx.try_commit tx) + then + if not (Computation.is_running gr.computation) then ( (* TODO *) ) + else if Computation.is_running tr.computation then + take_as t self before selfs head tail + else take_as_advance t self before selfs head tail + else + let takers = before.takers in + let givers = + if head == tail then Q.reverse_as_queue selfs + else + let head = S.reverse_to (S.as_cons head_r.next) selfs in + Q.T (One { head; tail; cons = tail }) + in + let after = + if takers == Q.T Zero && givers == Q.T Zero then empty + else { givers; takers } + in + if not (Atomic.compare_and_set t before after) then + ( (* TODO: avoid leak *) ) + +and take_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail = + if head != tail then take_as t self before selfs (S.as_cons head_r.next) tail + else + let givers = Q.reverse_as_queue selfs in + let takers = Q.add before.takers self in + let after = { givers; takers } in + if not (Atomic.compare_and_set t before after) then take_as_start t self + +and take_as_start t self = + let before = Atomic.get t in + match before.givers with + | Q.T Zero -> + let givers = Q.T Zero in + let takers = Q.singleton self in + let after = { givers; takers } in + if not (Atomic.compare_and_set t before after) then take_as_start t self + | Q.T (One r as o) -> + Q.exec o; + take_as t self before (T Nil) r.head r.cons + +let take_evt t = + let request computation result = + take_as_start t (T { computation; result }) + in + Event.from_request { request } + +(* *) + +let[@inline] take t = take t Backoff.default +let[@inline] give t value = give t value Backoff.default diff --git a/lib/picos_std.sync/condition.ml b/lib/picos_std.sync/condition.ml index b78ef3671..fc2232c72 100644 --- a/lib/picos_std.sync/condition.ml +++ b/lib/picos_std.sync/condition.ml @@ -9,7 +9,7 @@ let broadcast (t : t) = if Atomic.get t != T Zero then match Atomic.exchange t (T Zero) with | T Zero -> () - | T (One _ as q) -> Q.iter q Trigger.signal + | T (One _ as q) -> Q.iter Trigger.signal q (* We try to avoid starvation of signal by making it so that when, at the start of signal or wait, the head is empty, the tail is reversed into the head. @@ -38,14 +38,15 @@ let rec cleanup backoff trigger (t : t) = else if not (Atomic.compare_and_set t before after) then cleanup (Backoff.once backoff) trigger t -let rec wait (t : t) mutex trigger fiber backoff = +let rec wait (t : t) mutex cons fiber backoff = let before = Atomic.get t in - let after = Q.add before trigger in + let after = Q.add_cons before cons in if Atomic.compare_and_set t before after then begin Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default; + let trigger = S.value cons in let result = Trigger.await trigger in let forbid = Fiber.exchange fiber ~forbid:true in - Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex Nothing Backoff.default; + Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex (T Nil) Backoff.default; Fiber.set fiber ~forbid; match result with | None -> () @@ -53,11 +54,12 @@ let rec wait (t : t) mutex trigger fiber backoff = cleanup Backoff.default trigger t; Printexc.raise_with_backtrace exn bt end - else wait t mutex trigger fiber (Backoff.once backoff) + else wait t mutex cons fiber (Backoff.once backoff) let wait t mutex = let fiber = Fiber.current () in let trigger = Trigger.create () in - wait t mutex trigger fiber Backoff.default + let cons = S.Cons { value = trigger; next = T Nil } in + wait t mutex cons fiber Backoff.default let[@inline] signal t = signal t Backoff.default diff --git a/lib/picos_std.sync/mutex.ml b/lib/picos_std.sync/mutex.ml index 782593222..7ae311da3 100644 --- a/lib/picos_std.sync/mutex.ml +++ b/lib/picos_std.sync/mutex.ml @@ -4,37 +4,55 @@ let[@inline never] owner () = raise (Sys_error "Mutex: owner") let[@inline never] unlocked () = raise (Sys_error "Mutex: unlocked") let[@inline never] not_owner () = raise (Sys_error "Mutex: not owner") +type entry = { trigger : Trigger.t; fiber : Fiber.Maybe.t } + type _ tdt = - | Entry : { trigger : Trigger.t; fiber : Fiber.Maybe.t } -> [> `Entry ] tdt - | Nothing : [> `Nothing ] tdt + | Locked : [> `Locked ] tdt + | Unlocked : [> `Unlocked ] tdt + | Locked_by : { fiber : Fiber.Maybe.t } -> [> `Locked_by ] tdt + | Contended : { + fiber : Fiber.Maybe.t; + head : entry S.cons; + tail : entry S.cons; + cons : entry S.cons; + } + -> [> `Contended ] tdt type state = - | Locked - | Unlocked - | Queued of { fiber : Fiber.Maybe.t; waiters : [ `Entry ] tdt Q.t } + | T : [< `Locked | `Unlocked | `Locked_by | `Contended ] tdt -> state +[@@unboxed] type t = state Atomic.t -let create ?padded () = Multicore_magic.copy_as ?padded @@ Atomic.make Unlocked +let create ?padded () = + Multicore_magic.copy_as ?padded @@ Atomic.make (T Unlocked) let rec unlock_as owner t backoff = match Atomic.get t with - | Unlocked -> unlocked () - | Locked as before -> - if not (Atomic.compare_and_set t before Unlocked) then + | T Unlocked -> unlocked () + | T Locked as before -> + if not (Atomic.compare_and_set t before (T Unlocked)) then unlock_as owner t (Backoff.once backoff) - | Queued r as before -> - if Fiber.Maybe.equal r.fiber owner then - match r.waiters with - | T Zero -> - if not (Atomic.compare_and_set t before Unlocked) then - unlock_as owner t (Backoff.once backoff) - | T (One _ as q) -> - let (Entry { trigger; fiber }) = Q.head q in - let waiters = Q.tail q in - let after = Queued { fiber; waiters } in - if Atomic.compare_and_set t before after then Trigger.signal trigger - else unlock_as owner t (Backoff.once backoff) + | T (Locked_by r) as before -> + if Fiber.Maybe.equal r.fiber owner then begin + if not (Atomic.compare_and_set t before (T Unlocked)) then + unlock_as owner t (Backoff.once backoff) + end + else not_owner () + | T (Contended r) as before -> + if Fiber.Maybe.equal r.fiber owner then begin + S.exec r.tail r.cons; + let { trigger; fiber } = S.value r.head in + let after = + if r.head != r.cons then + let head = S.next_as_cons r.head in + Contended { fiber; head; tail = r.cons; cons = r.cons } + else if fiber == Fiber.Maybe.nothing then Locked + else Locked_by { fiber } + in + if Atomic.compare_and_set t before (T after) then Trigger.signal trigger + else unlock_as owner t (Backoff.once backoff) + end else not_owner () let[@inline] unlock ?checked t = @@ -45,78 +63,76 @@ let[@inline] unlock ?checked t = sequentially consistent. The fenceless get potentially allows us to avoid performing a failed mutation attempt causing cache coherency traffic and fenceless get here performs better on ARM. *) - Multicore_magic.fenceless_get t != Locked - || not (Atomic.compare_and_set t Locked Unlocked) + Multicore_magic.fenceless_get t != T Locked + || not (Atomic.compare_and_set t (T Locked) (T Unlocked)) then unlock_as Fiber.Maybe.nothing t Backoff.default | None | Some true -> let owner = Fiber.Maybe.of_fiber (Fiber.current ()) in unlock_as owner t Backoff.default -let rec cleanup_as (Entry entry_r as entry : [ `Entry ] tdt) t backoff = +let rec cleanup_as entry t backoff = (* We have been canceled. If we are the owner, we must unlock the mutex. Otherwise we must remove our entry from the queue. *) match Atomic.get t with - | Queued r as before -> begin - match r.waiters with - | T Zero -> unlock_as entry_r.fiber t backoff - | T (One _ as q) -> - let waiters = Q.remove q entry in - if r.waiters == waiters then unlock_as entry_r.fiber t backoff - else - let after = Queued { fiber = r.fiber; waiters } in - if not (Atomic.compare_and_set t before after) then - cleanup_as entry t (Backoff.once backoff) + | T (Contended r) as before -> begin + S.exec r.tail r.cons; + match S.reject r.head entry with + | S.T Nil -> + let after = Locked_by { fiber = r.fiber } in + if not (Atomic.compare_and_set t before (T after)) then + cleanup_as entry t (Backoff.once backoff) + | S.T (Cons _ as head) -> + let tail = S.find_tail head in + let after = Contended { fiber = r.fiber; head; tail; cons = tail } in + if not (Atomic.compare_and_set t before (T after)) then + cleanup_as entry t (Backoff.once backoff) + | exception Not_found -> unlock_as entry.fiber t backoff end - | Locked -> unlock_as entry_r.fiber t backoff - | Unlocked -> unlocked () + | T Locked | T (Locked_by _) -> unlock_as entry.fiber t backoff + | T Unlocked -> unlocked () -let rec lock_as fiber t entry backoff = +let rec lock_as fiber t node backoff = match Atomic.get t with - | Unlocked as before -> + | T Unlocked as before -> let after = - if fiber == Fiber.Maybe.nothing then Locked - else Queued { fiber; waiters = T Zero } + if fiber == Fiber.Maybe.nothing then Locked else Locked_by { fiber } in - if not (Atomic.compare_and_set t before after) then - lock_as fiber t entry (Backoff.once backoff) - | Locked as before -> - let (Entry entry_r as entry : [ `Entry ] tdt) = - match entry with - | Nothing -> + if not (Atomic.compare_and_set t before (T after)) then + lock_as fiber t node (Backoff.once backoff) + | T ((Locked | Locked_by _ | Contended _) as other) -> + let cons = + match node with + | S.T Nil -> let trigger = Trigger.create () in - Entry { trigger; fiber } - | Entry _ as entry -> entry + let value = { trigger; fiber } in + S.Cons { value; next = T Nil } + | S.T (Cons _ as cons) -> cons + in + let after = + match (other : [ `Locked | `Locked_by | `Contended ] tdt) with + | Locked -> + Contended + { fiber = Fiber.Maybe.nothing; head = cons; tail = cons; cons } + | Locked_by r -> + if Fiber.Maybe.unequal r.fiber fiber then + Contended { fiber = r.fiber; head = cons; tail = cons; cons } + else owner () + | Contended r -> + if Fiber.Maybe.unequal r.fiber fiber then begin + S.exec r.tail r.cons; + Contended { r with tail = r.cons; cons } + end + else owner () in - let waiters = Q.singleton entry in - let after = Queued { fiber = Fiber.Maybe.nothing; waiters } in - if Atomic.compare_and_set t before after then begin - match Trigger.await entry_r.trigger with + if Atomic.compare_and_set t (T other) (T after) then begin + let entry = S.value cons in + match Trigger.await entry.trigger with | None -> () | Some (exn, bt) -> cleanup_as entry t Backoff.default; Printexc.raise_with_backtrace exn bt end - else lock_as fiber t entry (Backoff.once backoff) - | Queued r as before -> - if Fiber.Maybe.unequal r.fiber fiber then - let (Entry entry_r as entry : [ `Entry ] tdt) = - match entry with - | Nothing -> - let trigger = Trigger.create () in - Entry { trigger; fiber } - | Entry _ as entry -> entry - in - let waiters = Q.add r.waiters entry in - let after = Queued { fiber = r.fiber; waiters } in - if Atomic.compare_and_set t before after then begin - match Trigger.await entry_r.trigger with - | None -> () - | Some (exn, bt) -> - cleanup_as entry t Backoff.default; - Printexc.raise_with_backtrace exn bt - end - else lock_as fiber t entry (Backoff.once backoff) - else owner () + else lock_as fiber t (T cons) (Backoff.once backoff) let[@inline] lock ?checked t = match checked with @@ -126,24 +142,23 @@ let[@inline] lock ?checked t = sequentially consistent. The fenceless get potentially allows us to avoid performing a failed mutation attempt causing cache coherency traffic and fenceless get here performs better on ARM. *) - Multicore_magic.fenceless_get t != Unlocked - || not (Atomic.compare_and_set t Unlocked Locked) - then lock_as Fiber.Maybe.nothing t Nothing Backoff.default + Multicore_magic.fenceless_get t != T Unlocked + || not (Atomic.compare_and_set t (T Unlocked) (T Locked)) + then lock_as Fiber.Maybe.nothing t (T Nil) Backoff.default | None | Some true -> let fiber = Fiber.current () in Fiber.check fiber; - lock_as (Fiber.Maybe.of_fiber fiber) t Nothing Backoff.default + lock_as (Fiber.Maybe.of_fiber fiber) t (T Nil) Backoff.default let try_lock ?checked t = let fiber = Fiber.Maybe.current_and_check_if checked in - Atomic.get t == Unlocked - && Atomic.compare_and_set t Unlocked - (if fiber == Fiber.Maybe.nothing then Locked - else Queued { fiber; waiters = T Zero }) + Atomic.get t == T Unlocked + && Atomic.compare_and_set t (T Unlocked) + (T (if fiber == Fiber.Maybe.nothing then Locked else Locked_by { fiber })) let protect ?checked t body = let fiber = Fiber.Maybe.current_and_check_if checked in - lock_as fiber t Nothing Backoff.default; + lock_as fiber t (T Nil) Backoff.default; match body () with | value -> unlock_as fiber t Backoff.default; diff --git a/lib/picos_std.sync/picos_std_sync.ml b/lib/picos_std.sync/picos_std_sync.ml index 007b8cf07..aafa7d5dc 100644 --- a/lib/picos_std.sync/picos_std_sync.ml +++ b/lib/picos_std.sync/picos_std_sync.ml @@ -4,4 +4,5 @@ module Semaphore = Semaphore module Lazy = Lazy module Latch = Latch module Ivar = Ivar +module Ch = Ch module Stream = Stream diff --git a/lib/picos_std.sync/picos_std_sync.mli b/lib/picos_std.sync/picos_std_sync.mli index c4cd77d8e..468160618 100644 --- a/lib/picos_std.sync/picos_std_sync.mli +++ b/lib/picos_std.sync/picos_std_sync.mli @@ -351,6 +351,35 @@ module Ivar : sig variable has either been assigned a value or has been poisoned. *) end +module Ch : sig + (** A synchronous channel. *) + + type !'a t + (** Represents a synchronous channel for handing over messages of type + ['a]. *) + + val create : ?padded:bool -> unit -> 'a t + (** [create ()] creates a new synchronous channel. *) + + val give : 'a t -> 'a -> unit + (** [give ch value] waits until another fiber is ready to take a message on + the [ch]annel and gives the message to it. *) + + val give_evt : 'a t -> 'a -> unit Event.t + (** [give_evt ch value] returns an event that can be committed to once another + fiber is ready to take a message on the [ch]annel. Committing to the + event results in giving the message to the other fiber. *) + + val take : 'a t -> 'a + (** [take ch] waits until another fiber is ready to give a message on the + [ch]annel and takes the message from it. *) + + val take_evt : 'a t -> 'a Event.t + (** [take_evt ch] returns an event that can be committed to once another fiber + is ready to give a message on the [ch]annel. Committing to the event + results in taking the message from the other fiber. *) +end + module Stream : sig (** A lock-free, poisonable, many-to-many, stream. diff --git a/lib/picos_std.sync/q.ml b/lib/picos_std.sync/q.ml index ada4fa0c6..bef38003d 100644 --- a/lib/picos_std.sync/q.ml +++ b/lib/picos_std.sync/q.ml @@ -1,19 +1,9 @@ -type ('a, _) tdt = - | Nil : ('a, [> `Nil ]) tdt - | Cons : { value : 'a; mutable next : 'a spine } -> ('a, [> `Cons ]) tdt - -and 'a spine = S : ('a, [< `Nil | `Cons ]) tdt -> 'a spine [@@unboxed] - -type 'a cons = ('a, [ `Cons ]) tdt - -external as_cons : 'a spine -> 'a cons = "%identity" - type ('a, _) queue = | Zero : ('a, [> `Zero ]) queue | One : { - head : 'a cons; - tail : 'a cons; - cons : 'a cons; + head : 'a S.cons; + tail : 'a S.cons; + cons : 'a S.cons; } -> ('a, [> `One ]) queue @@ -21,22 +11,29 @@ type ('a, 'n) one = ('a, ([< `One ] as 'n)) queue type 'a t = T : ('a, [< `Zero | `One ]) queue -> 'a t [@@unboxed] let[@inline] singleton value = - let cons = Cons { value; next = S Nil } in + let cons = S.Cons { value; next = T Nil } in T (One { head = cons; tail = cons; cons }) let[@inline] exec (One o : (_, _) one) = if o.tail != o.cons then let (Cons tl) = o.tail in - if tl.next != S o.cons then tl.next <- S o.cons + if tl.next != T o.cons then tl.next <- T o.cons let[@inline] snoc (One o as t : (_, _) one) value = exec t; - let cons = Cons { value; next = S Nil } in + let cons = S.Cons { value; next = T Nil } in T (One { head = o.head; tail = o.cons; cons }) let[@inline] add t value = match t with T Zero -> singleton value | T (One _ as o) -> snoc o value +let[@inline] add_cons t cons = + match t with + | T Zero -> T (One { head = cons; tail = cons; cons }) + | T (One r as o) -> + exec o; + T (One { head = r.head; tail = r.cons; cons }) + let[@inline] head (One { head = Cons hd; _ } : (_, _) one) = hd.value let[@inline] tail (One o as t : (_, _) one) = @@ -44,32 +41,23 @@ let[@inline] tail (One o as t : (_, _) one) = if o.head == o.cons then T Zero else let (Cons hd) = o.head in - T (One { head = as_cons hd.next; tail = o.cons; cons = o.cons }) + T (One { head = S.as_cons hd.next; tail = o.cons; cons = o.cons }) -let rec iter (Cons cons_r : _ cons) action = - action cons_r.value; - match cons_r.next with S Nil -> () | S (Cons _ as cons) -> iter cons action - -let[@inline] iter (One o as t : (_, _) one) action = +let[@inline] iter action (One o as t : (_, _) one) = exec t; - iter o.head action - -let rec find_tail (Cons cons_r as cons : _ cons) = - match cons_r.next with S Nil -> cons | S (Cons _ as cons) -> find_tail cons - -let[@tail_mod_cons] rec reject (Cons cons_r : _ cons) value = - if cons_r.value != value then - match cons_r.next with - | S Nil -> raise_notrace Not_found - | S (Cons _ as cons) -> - S (Cons { value = cons_r.value; next = reject cons value }) - else cons_r.next + S.iter action o.head o.cons let remove (One o as t : (_, _) one) value = exec t; - match reject o.head value with - | S Nil -> T Zero - | S (Cons _ as head) -> - let tail = find_tail head in + match S.reject o.head value with + | S.T Nil -> T Zero + | S.T (Cons _ as head) -> + let tail = S.find_tail head in T (One { head; tail; cons = tail }) | exception Not_found -> T t + +let reverse_as_queue = function + | S.T Nil -> T Zero + | S.T (Cons cons_r as tail) -> + let head = S.reverse_to tail cons_r.next in + T (One { head; tail; cons = tail }) diff --git a/lib/picos_std.sync/s.ml b/lib/picos_std.sync/s.ml new file mode 100644 index 000000000..f9c490bd0 --- /dev/null +++ b/lib/picos_std.sync/s.ml @@ -0,0 +1,41 @@ +type ('a, _) tdt = + | Nil : ('a, [> `Nil ]) tdt + | Cons : { value : 'a; mutable next : 'a t } -> ('a, [> `Cons ]) tdt + +and 'a t = T : ('a, [< `Nil | `Cons ]) tdt -> 'a t [@@unboxed] + +type 'a cons = ('a, [ `Cons ]) tdt + +external as_cons : 'a t -> 'a cons = "%identity" + +let[@inline] value (Cons cons_r : _ cons) = cons_r.value +let[@inline] next_as_cons (Cons cons_r : _ cons) = as_cons cons_r.next + +let[@inline] exec (tail : _ cons) (cons : _ cons) = + if tail != cons then + let (Cons tl) = tail in + if tl.next != T cons then tl.next <- T cons + +let[@inline] cons value next = T (Cons { value; next }) + +let rec reverse_to tail = function + | T Nil -> tail + | T (Cons cons_r as next) -> + let prev = cons_r.next in + cons_r.next <- T tail; + reverse_to next prev + +let rec iter action (Cons head_r as head : _ cons) tail = + action head_r.value; + if head != tail then iter action (as_cons head_r.next) tail + +let[@tail_mod_cons] rec reject (Cons cons_r : _ cons) value = + if cons_r.value != value then + match cons_r.next with + | T Nil -> raise_notrace Not_found + | T (Cons _ as cons) -> + T (Cons { value = cons_r.value; next = reject cons value }) + else cons_r.next + +let rec find_tail (Cons cons_r as cons : _ cons) = + match cons_r.next with T Nil -> cons | T (Cons _ as cons) -> find_tail cons diff --git a/lib/picos_std.sync/semaphore.ml b/lib/picos_std.sync/semaphore.ml index d3c5fa3a0..28e5dceba 100644 --- a/lib/picos_std.sync/semaphore.ml +++ b/lib/picos_std.sync/semaphore.ml @@ -41,40 +41,34 @@ module Counting = struct else if not (Atomic.compare_and_set t (Obj.repr before) (Obj.repr after)) then cleanup t trigger (Backoff.once backoff) - let rec acquire t backoff = + let rec acquire t node backoff = (* The acquire operation will mutate the atomic location and will be sequentially consistent. The fenceless get here performs better on ARM. *) let before = Multicore_magic.fenceless_get t in - if Obj.is_int before then - let count = Obj.obj before in - if 0 < count then begin - let after = Obj.repr (count - 1) in - if not (Atomic.compare_and_set t before after) then - acquire t (Backoff.once backoff) - end - else - let trigger = Trigger.create () in - let after = Q.singleton trigger in - if Atomic.compare_and_set t before (Obj.repr after) then begin - match Trigger.await trigger with - | None -> () - | Some (exn, bt) -> - cleanup t trigger Backoff.default; - Printexc.raise_with_backtrace exn bt - end - else acquire t (Backoff.once backoff) + if Obj.is_int before && 0 < (Obj.obj before : int) then begin + let after = Obj.repr (Obj.obj before - 1) in + if not (Atomic.compare_and_set t before after) then + acquire t node (Backoff.once backoff) + end else - let trigger = Trigger.create () in - let after = Q.snoc (Obj.obj before) trigger in + let cons = + match node with + | S.T Nil -> + let trigger = Trigger.create () in + S.Cons { value = trigger; next = T Nil } + | S.T (Cons _ as cons) -> cons + in + let after = Q.add_cons (Obj.obj before) cons in if Atomic.compare_and_set t before (Obj.repr after) then begin + let trigger = S.value cons in match Trigger.await trigger with | None -> () | Some (exn, bt) -> cleanup t trigger Backoff.default; Printexc.raise_with_backtrace exn bt end - else acquire t (Backoff.once backoff) + else acquire t (T cons) (Backoff.once backoff) let rec try_acquire t backoff = let before = Atomic.get t in @@ -92,7 +86,7 @@ module Counting = struct if Obj.is_int state then Obj.obj state else 0 let[@inline] release t = release t Backoff.default - let[@inline] acquire t = acquire t Backoff.default + let[@inline] acquire t = acquire t (T Nil) Backoff.default let[@inline] try_acquire t = try_acquire t Backoff.default end diff --git a/test/dune b/test/dune index 48b81eb49..4c91a4497 100644 --- a/test/dune +++ b/test/dune @@ -55,6 +55,7 @@ (action (progn (run %{test} -- "Event" 0) + (run %{test} -- "Ch" 0) (run %{test} -- "Lazy" 0) (run %{test} -- "Lazy" 1) (run %{test} -- "Semaphore" 0) diff --git a/test/test_sync.ml b/test/test_sync.ml index 59cf6905d..c88675593 100644 --- a/test/test_sync.ml +++ b/test/test_sync.ml @@ -287,6 +287,27 @@ let test_event_basics () = | exception Exit -> () end +let test_ch () = + Test_scheduler.run ~max_domains:2 @@ fun () -> + Bundle.join_after @@ fun bundle -> + let ch = Ch.create () in + assert ( + 76 + == Event.select + [ + Event.wrap (Ch.give_evt ch 42) (fun () -> assert false); + Ch.take_evt ch; + Event.always 76; + ]); + [ (fun () -> Ch.take ch); (fun () -> Event.sync (Ch.take_evt ch)) ] + |> List.iter @@ fun take -> + [ Ch.give ch; (fun value -> Event.sync (Ch.give_evt ch value)) ] + |> List.iter @@ fun give -> + let value = Random.bits () in + let promise = Bundle.fork_as_promise bundle take in + Bundle.fork bundle (fun () -> give value); + assert (value == Promise.await promise) + let test_non_cancelable_ops () = Test_scheduler.run @@ fun () -> let ivar = Ivar.create () in @@ -328,6 +349,7 @@ let () = Alcotest.test_case "cancelation" `Quick test_lazy_cancelation; ] ); ("Event", [ Alcotest.test_case "basics" `Quick test_event_basics ]); + ("Ch", [ Alcotest.test_case "basics" `Quick test_ch ]); ( "Non-cancelable ops", [ Alcotest.test_case "are not canceled" `Quick test_non_cancelable_ops ] );