Skip to content

Commit

Permalink
Add CML style Ch
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Nov 28, 2024
1 parent 84d5428 commit 129075f
Show file tree
Hide file tree
Showing 14 changed files with 560 additions and 149 deletions.
89 changes: 89 additions & 0 deletions bench/bench_ch.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
open Multicore_bench
open Picos
open Picos_std_sync
open Picos_std_structured

let run_one_domain ~budgetf () =
let n_msgs = 200 * Util.iter_factor in
let t = Ch.create () in
let giver () =
for i = 1 to n_msgs do
Ch.give t i
done
and taker () =
for _ = 1 to n_msgs do
Ch.take t |> ignore
done
in
let init _ = () in
let wrap _ () = Scheduler.run in
let work _ () =
Run.all (if Random.bool () then [ taker; giver ] else [ giver; taker ])
in
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let yielder () =
while true do
Fiber.yield ()
done

let run_one ~budgetf ~n_givers ~n_takers () =
let n_domains = n_givers + n_takers in

let n_msgs = 200 / n_domains * Util.iter_factor in

let t = Ch.create ~padded:true () in

let n_msgs_to_give = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
Atomic.set n_msgs_to_give n_msgs;
Atomic.set n_msgs_to_take n_msgs
in
let wrap _ () = Scheduler.run in
let work i () =
Flock.join_after ~on_return:`Terminate @@ fun () ->
Flock.fork yielder;
begin
if i < n_givers then
let rec work () =
let n = Util.alloc n_msgs_to_give in
if 0 < n then begin
for i = 1 to n do
Ch.give t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if 0 < n then begin
for _ = 1 to n do
Ch.take t |> ignore
done;
work ()
end
in
work ()
end
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s" (format "giver" n_givers) (format "taker" n_takers)
in
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
|> List.concat_map @@ fun (n_givers, n_takers) ->
if Picos_domain.recommended_domain_count () < n_givers + n_takers then []
else run_one ~budgetf ~n_givers ~n_takers ())
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
(run %{test} -brief "Picos_mpscq")
(run %{test} -brief "Picos_htbl")
(run %{test} -brief "Picos_stdio")
(run %{test} -brief "Picos_sync Ch")
(run %{test} -brief "Picos_sync Stream")
(run %{test} -brief "Fib")
(run %{test} -brief "Picos binaries")
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ let benchmarks =
("Picos_mpscq", Bench_mpscq.run_suite);
("Picos_htbl", Bench_htbl.run_suite);
("Picos_stdio", Bench_stdio.run_suite);
("Picos_sync Ch", Bench_ch.run_suite);
("Picos_sync Stream", Bench_stream.run_suite);
("Fib", Bench_fib.run_suite);
("Picos binaries", Bench_binaries.run_suite);
Expand Down
7 changes: 7 additions & 0 deletions lib/picos_std.structured/picos_std_structured.mli
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ end
Ivar.read ivar
end;
Flock.fork begin fun () ->
let ch = Ch.create () in
Ch.give ch "never"
end;
Flock.fork begin fun () ->
let stream = Stream.create () in
Stream.read (Stream.tap stream)
Expand Down Expand Up @@ -514,6 +519,8 @@ end
count of the latch never reaches [0],
- {{!Picos_std_sync.Ivar.read} [Ivar.read]} never returns, because the
incremental variable is never filled,
- {{!Picos_std_sync.Ch.give} [Ch.give]} never returns, because no fiber
{{!Picos_std_sync.Ch.take} takes} the message,
- {{!Picos_std_sync.Stream.read} [Stream.read]} never returns, because the
stream is never pushed to,
- {{!Picos_io.Unix.read} [Unix.read]} never returns, because the socket is
Expand Down
220 changes: 220 additions & 0 deletions lib/picos_std.sync/ch.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
open Picos_std_event
open Picos
module Atomic = Multicore_magic.Transparent_atomic
module Tx = Computation.Tx

type 'a taker =
| T : {
computation : (unit -> 'r) Computation.t;
result : 'a -> 'r;
}
-> 'a taker

type 'a giver =
| G : {
computation : (unit -> 'r) Computation.t;
result : unit -> 'r;
value : 'a;
}
-> 'a giver

type 'a state = { givers : 'a giver Q.t; takers : 'a taker Q.t }

let empty = { givers = T Zero; takers = T Zero }

type 'a t = 'a state Atomic.t

let create ?padded () = Atomic.make empty |> Multicore_magic.copy_as ?padded

(* *)

let[@inline never] wait computation =
let trigger = Trigger.create () in
if Computation.try_attach computation trigger then
match Trigger.await trigger with
| None -> ()
| Some (exn, bt) ->
if Computation.try_cancel computation Exit bt then
Printexc.raise_with_backtrace exn bt

(* *)

let rec give t value backoff =
let before = Atomic.fenceless_get t in
match before.takers with
| Q.T Zero ->
let computation = Computation.create ~mode:`LIFO () in
let self = G { computation; result = Fun.id; value } in
let givers = Q.add before.givers self in
let after = { givers; takers = T Zero } in
if Atomic.compare_and_set t before after then wait computation
else give t value (Backoff.once backoff)
| Q.T (One _ as takers) ->
let (T { computation; result }) = Q.head takers in
let got = Computation.try_return computation (fun () -> result value) in
let takers = Q.tail takers in
let givers = before.givers in
let after =
if takers == T Zero && givers == T Zero then empty
else { givers; takers }
in
let no_contention = Atomic.compare_and_set t before after in
if not got then
give t value (if no_contention then backoff else Backoff.once backoff)

let rec take t backoff =
let before = Atomic.fenceless_get t in
match before.givers with
| Q.T Zero ->
let computation = Computation.create ~mode:`LIFO () in
let self = T { computation; result = Fun.id } in
let takers = Q.add before.takers self in
let after = { givers = T Zero; takers } in
if Atomic.compare_and_set t before after then begin
wait computation;
Computation.await computation ()
end
else take t (Backoff.once backoff)
| Q.T (One _ as givers) ->
let (G { computation; result; value }) = Q.head givers in
let got = Computation.try_return computation result in
let givers = Q.tail givers in
let takers = before.takers in
let after =
if givers == T Zero && takers == T Zero then empty
else { givers; takers }
in
let no_contention = Atomic.compare_and_set t before after in
if got then value
else take t (if no_contention then backoff else Backoff.once backoff)

(* *)

let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ S.cons)
tail =
let (T tr as taker) = head_r.value in
if Tx.same tr.computation gr.computation then
let selfs = S.cons taker selfs in
give_as_advance t self before selfs head tail
else
let tx = Tx.create () in
let result = tr.result in
let value = gr.value in
if not (Tx.try_return tx tr.computation (fun () -> result value)) then
give_as_advance t self before selfs head tail
else if
(not (Tx.try_return tx gr.computation gr.result))
|| not (Tx.try_commit tx)
then
if not (Computation.is_running gr.computation) then ( (* TODO *) )
else if Computation.is_running tr.computation then
give_as t self before selfs head tail
else give_as_advance t self before selfs head tail
else
let takers =
if head == tail then Q.reverse_as_queue selfs
else
let head = S.reverse_to (S.as_cons head_r.next) selfs in
Q.T (One { head; tail; cons = tail })
in
let givers = before.givers in
let after =
if takers == Q.T Zero && givers == Q.T Zero then empty
else { givers; takers }
in
if not (Atomic.compare_and_set t before after) then
( (* TODO: avoid leak *) )

and give_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
if head != tail then give_as t self before selfs (S.as_cons head_r.next) tail
else
let takers = Q.reverse_as_queue selfs in
let givers = Q.add before.givers self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then give_as_start t self

and give_as_start t self =
let before = Atomic.get t in
match before.takers with
| Q.T Zero ->
let takers = Q.T Zero in
let givers = Q.singleton self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then give_as_start t self
| Q.T (One r as o) ->
Q.exec o;
give_as t self before (T Nil) r.head r.cons

let give_evt t value =
let request computation result =
give_as_start t (G { computation; result; value })
in
Event.from_request { request }

(* *)

let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ S.cons)
tail =
let (G gr as giver) = head_r.value in
if Tx.same tr.computation gr.computation then
let selfs = S.cons giver selfs in
take_as_advance t self before selfs head tail
else
let tx = Tx.create () in
let result = tr.result in
let value = gr.value in
if not (Tx.try_return tx gr.computation gr.result) then
take_as_advance t self before selfs head tail
else if
(not (Tx.try_return tx tr.computation (fun () -> result value)))
|| not (Tx.try_commit tx)
then
if not (Computation.is_running gr.computation) then ( (* TODO *) )
else if Computation.is_running tr.computation then
take_as t self before selfs head tail
else take_as_advance t self before selfs head tail
else
let takers = before.takers in
let givers =
if head == tail then Q.reverse_as_queue selfs
else
let head = S.reverse_to (S.as_cons head_r.next) selfs in
Q.T (One { head; tail; cons = tail })
in
let after =
if takers == Q.T Zero && givers == Q.T Zero then empty
else { givers; takers }
in
if not (Atomic.compare_and_set t before after) then
( (* TODO: avoid leak *) )

and take_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
if head != tail then take_as t self before selfs (S.as_cons head_r.next) tail
else
let givers = Q.reverse_as_queue selfs in
let takers = Q.add before.takers self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then take_as_start t self

and take_as_start t self =
let before = Atomic.get t in
match before.givers with
| Q.T Zero ->
let givers = Q.T Zero in
let takers = Q.singleton self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then take_as_start t self
| Q.T (One r as o) ->
Q.exec o;
take_as t self before (T Nil) r.head r.cons

let take_evt t =
let request computation result =
take_as_start t (T { computation; result })
in
Event.from_request { request }

(* *)

let[@inline] take t = take t Backoff.default
let[@inline] give t value = give t value Backoff.default
14 changes: 8 additions & 6 deletions lib/picos_std.sync/condition.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ let broadcast (t : t) =
if Atomic.get t != T Zero then
match Atomic.exchange t (T Zero) with
| T Zero -> ()
| T (One _ as q) -> Q.iter q Trigger.signal
| T (One _ as q) -> Q.iter Trigger.signal q

(* We try to avoid starvation of signal by making it so that when, at the start
of signal or wait, the head is empty, the tail is reversed into the head.
Expand Down Expand Up @@ -38,26 +38,28 @@ let rec cleanup backoff trigger (t : t) =
else if not (Atomic.compare_and_set t before after) then
cleanup (Backoff.once backoff) trigger t

let rec wait (t : t) mutex trigger fiber backoff =
let rec wait (t : t) mutex cons fiber backoff =
let before = Atomic.get t in
let after = Q.add before trigger in
let after = Q.add_cons before cons in
if Atomic.compare_and_set t before after then begin
Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default;
let trigger = S.value cons in
let result = Trigger.await trigger in
let forbid = Fiber.exchange fiber ~forbid:true in
Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex Nothing Backoff.default;
Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex (T Nil) Backoff.default;
Fiber.set fiber ~forbid;
match result with
| None -> ()
| Some (exn, bt) ->
cleanup Backoff.default trigger t;
Printexc.raise_with_backtrace exn bt
end
else wait t mutex trigger fiber (Backoff.once backoff)
else wait t mutex cons fiber (Backoff.once backoff)

let wait t mutex =
let fiber = Fiber.current () in
let trigger = Trigger.create () in
wait t mutex trigger fiber Backoff.default
let cons = S.Cons { value = trigger; next = T Nil } in
wait t mutex cons fiber Backoff.default

let[@inline] signal t = signal t Backoff.default
Loading

0 comments on commit 129075f

Please sign in to comment.