Skip to content

Commit

Permalink
Add CML style Ch
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Oct 2, 2024
1 parent 849dea9 commit 34e8de5
Show file tree
Hide file tree
Showing 16 changed files with 575 additions and 149 deletions.
89 changes: 89 additions & 0 deletions bench/bench_ch.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
open Multicore_bench
open Picos
open Picos_std_sync
open Picos_std_structured

let run_one_domain ~budgetf () =
let n_msgs = 200 * Util.iter_factor in
let t = Ch.create () in
let giver () =
for i = 1 to n_msgs do
Ch.give t i
done
and taker () =
for _ = 1 to n_msgs do
Ch.take t |> ignore
done
in
let init _ = () in
let wrap _ () = Scheduler.run in
let work _ () =
Run.all (if Random.bool () then [ taker; giver ] else [ giver; taker ])
in
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let yielder () =
while true do
Fiber.yield ()
done

let run_one ~budgetf ~n_givers ~n_takers () =
let n_domains = n_givers + n_takers in

let n_msgs = 200 / n_domains * Util.iter_factor in

let t = Ch.create ~padded:true () in

let n_msgs_to_give = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
Atomic.set n_msgs_to_give n_msgs;
Atomic.set n_msgs_to_take n_msgs
in
let wrap _ () = Scheduler.run in
let work i () =
Flock.join_after ~on_return:`Terminate @@ fun () ->
Flock.fork yielder;
begin
if i < n_givers then
let rec work () =
let n = Util.alloc n_msgs_to_give in
if 0 < n then begin
for i = 1 to n do
Ch.give t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if 0 < n then begin
for _ = 1 to n do
Ch.take t |> ignore
done;
work ()
end
in
work ()
end
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s" (format "giver" n_givers) (format "taker" n_takers)
in
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
|> List.concat_map @@ fun (n_givers, n_takers) ->
if Picos_domain.recommended_domain_count () < n_givers + n_takers then []
else run_one ~budgetf ~n_givers ~n_takers ())
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
(run %{test} -brief "Picos_mpscq")
(run %{test} -brief "Picos_htbl")
(run %{test} -brief "Picos_stdio")
(run %{test} -brief "Picos_sync Ch")
(run %{test} -brief "Picos_sync Stream")
(run %{test} -brief "Fib")
(run %{test} -brief "Picos binaries")
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ let benchmarks =
("Picos_mpscq", Bench_mpscq.run_suite);
("Picos_htbl", Bench_htbl.run_suite);
("Picos_stdio", Bench_stdio.run_suite);
("Picos_sync Ch", Bench_ch.run_suite);
("Picos_sync Stream", Bench_stream.run_suite);
("Fib", Bench_fib.run_suite);
("Picos binaries", Bench_binaries.run_suite);
Expand Down
11 changes: 11 additions & 0 deletions lib/picos/bootstrap/picos_bootstrap.ml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,17 @@ module Computation = struct

let returned value = Atomic.make (make_returned value)
let finished = Atomic.make (make_returned ())

let exited : unit t =
Atomic.make
(S (Canceled { exn = Exit; bt = Printexc.get_callstack 0; tx = Stopped }))

let exited () =
let open struct
external unsafe_generalize : unit t -> 'a t = "%identity"
end in
unsafe_generalize exited

let try_return t value = try_terminate t (make_returned value) Backoff.default
let try_finish t = try_terminate t returned_unit Backoff.default

Expand Down
4 changes: 4 additions & 0 deletions lib/picos/picos.mli
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,10 @@ module Computation : sig
val finished : unit t
(** [finished] is a constant finished computation. *)

val exited : unit -> 'a t
(** [exited ()] returns a constant computation canceled with the {!Exit}
exception. *)

val try_return : 'a t -> 'a -> bool
(** [try_return computation value] attempts to complete the computation with
the specified [value] and returns [true] on success. Otherwise returns
Expand Down
7 changes: 7 additions & 0 deletions lib/picos_std.structured/picos_std_structured.mli
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ end
Ivar.read ivar
end;
Flock.fork begin fun () ->
let ch = Ch.create () in
Ch.give ch "never"
end;
Flock.fork begin fun () ->
let stream = Stream.create () in
Stream.read (Stream.tap stream)
Expand Down Expand Up @@ -514,6 +519,8 @@ end
count of the latch never reaches [0],
- {{!Picos_std_sync.Ivar.read} [Ivar.read]} never returns, because the
incremental variable is never filled,
- {{!Picos_std_sync.Ch.give} [Ch.give]} never returns, because no fiber
{{!Picos_std_sync.Ch.take} takes} the message,
- {{!Picos_std_sync.Stream.read} [Stream.read]} never returns, because the
stream is never pushed to,
- {{!Picos_io.Unix.read} [Unix.read]} never returns, because the socket is
Expand Down
220 changes: 220 additions & 0 deletions lib/picos_std.sync/ch.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
open Picos_std_event
open Picos
module Atomic = Multicore_magic.Transparent_atomic
module Tx = Computation.Tx

type 'a taker =
| T : {
computation : (unit -> 'r) Computation.t;
result : 'a -> 'r;
}
-> 'a taker

type 'a giver =
| G : {
computation : (unit -> 'r) Computation.t;
result : unit -> 'r;
value : 'a;
}
-> 'a giver

type 'a state = { givers : 'a giver Q.t; takers : 'a taker Q.t }

let empty = { givers = T Zero; takers = T Zero }

type 'a t = 'a state Atomic.t

let create ?padded () = Atomic.make empty |> Multicore_magic.copy_as ?padded

(* *)

let[@inline never] wait computation =
let trigger = Trigger.create () in
if Computation.try_attach computation trigger then
match Trigger.await trigger with
| None -> ()
| Some (exn, bt) ->
if Computation.try_cancel computation Exit bt then
Printexc.raise_with_backtrace exn bt

(* *)

let rec give t value backoff =
let before = Atomic.fenceless_get t in
match before.takers with
| Q.T Zero ->
let computation = Computation.create ~mode:`LIFO () in
let self = G { computation; result = Fun.id; value } in
let givers = Q.add before.givers self in
let after = { givers; takers = T Zero } in
if Atomic.compare_and_set t before after then wait computation
else give t value (Backoff.once backoff)
| Q.T (One _ as takers) ->
let (T { computation; result }) = Q.head takers in
let got = Computation.try_return computation (fun () -> result value) in
let takers = Q.tail takers in
let givers = before.givers in
let after =
if takers == T Zero && givers == T Zero then empty
else { givers; takers }
in
let no_contention = Atomic.compare_and_set t before after in
if not got then
give t value (if no_contention then backoff else Backoff.once backoff)

let rec take t backoff =
let before = Atomic.fenceless_get t in
match before.givers with
| Q.T Zero ->
let computation = Computation.create ~mode:`LIFO () in
let self = T { computation; result = Fun.id } in
let takers = Q.add before.takers self in
let after = { givers = T Zero; takers } in
if Atomic.compare_and_set t before after then begin
wait computation;
Computation.await computation ()
end
else take t (Backoff.once backoff)
| Q.T (One _ as givers) ->
let (G { computation; result; value }) = Q.head givers in
let got = Computation.try_return computation result in
let givers = Q.tail givers in
let takers = before.takers in
let after =
if givers == T Zero && takers == T Zero then empty
else { givers; takers }
in
let no_contention = Atomic.compare_and_set t before after in
if got then value
else take t (if no_contention then backoff else Backoff.once backoff)

(* *)

let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ S.cons)
tail =
let (T tr as taker) = head_r.value in
if Tx.same tr.computation gr.computation then
let selfs = S.cons taker selfs in
give_as_advance t self before selfs head tail
else
let tx = Tx.create () in
let result = tr.result in
let value = gr.value in
if not (Tx.try_return tx tr.computation (fun () -> result value)) then
give_as_advance t self before selfs head tail
else if
(not (Tx.try_return tx gr.computation gr.result))
|| not (Tx.try_commit tx)
then
if not (Computation.is_running gr.computation) then ( (* TODO *) )
else if Computation.is_running tr.computation then
give_as t self before selfs head tail
else give_as_advance t self before selfs head tail
else
let takers =
if head == tail then Q.reverse_as_queue selfs
else
let head = S.reverse_to (S.as_cons head_r.next) selfs in
Q.T (One { head; tail; cons = tail })
in
let givers = before.givers in
let after =
if takers == Q.T Zero && givers == Q.T Zero then empty
else { givers; takers }
in
if not (Atomic.compare_and_set t before after) then
( (* TODO: avoid leak *) )

and give_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
if head != tail then give_as t self before selfs (S.as_cons head_r.next) tail
else
let takers = Q.reverse_as_queue selfs in
let givers = Q.add before.givers self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then give_as_start t self

and give_as_start t self =
let before = Atomic.get t in
match before.takers with
| Q.T Zero ->
let takers = Q.T Zero in
let givers = Q.singleton self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then give_as_start t self
| Q.T (One r as o) ->
Q.exec o;
give_as t self before (T Nil) r.head r.cons

let give_evt t value =
let request computation result =
give_as_start t (G { computation; result; value })
in
Event.from_request { request }

(* *)

let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ S.cons)
tail =
let (G gr as giver) = head_r.value in
if Tx.same tr.computation gr.computation then
let selfs = S.cons giver selfs in
take_as_advance t self before selfs head tail
else
let tx = Tx.create () in
let result = tr.result in
let value = gr.value in
if not (Tx.try_return tx gr.computation gr.result) then
take_as_advance t self before selfs head tail
else if
(not (Tx.try_return tx tr.computation (fun () -> result value)))
|| not (Tx.try_commit tx)
then
if not (Computation.is_running gr.computation) then ( (* TODO *) )
else if Computation.is_running tr.computation then
take_as t self before selfs head tail
else take_as_advance t self before selfs head tail
else
let takers = before.takers in
let givers =
if head == tail then Q.reverse_as_queue selfs
else
let head = S.reverse_to (S.as_cons head_r.next) selfs in
Q.T (One { head; tail; cons = tail })
in
let after =
if takers == Q.T Zero && givers == Q.T Zero then empty
else { givers; takers }
in
if not (Atomic.compare_and_set t before after) then
( (* TODO: avoid leak *) )

and take_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail =
if head != tail then take_as t self before selfs (S.as_cons head_r.next) tail
else
let givers = Q.reverse_as_queue selfs in
let takers = Q.add before.takers self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then take_as_start t self

and take_as_start t self =
let before = Atomic.get t in
match before.givers with
| Q.T Zero ->
let givers = Q.T Zero in
let takers = Q.singleton self in
let after = { givers; takers } in
if not (Atomic.compare_and_set t before after) then take_as_start t self
| Q.T (One r as o) ->
Q.exec o;
take_as t self before (T Nil) r.head r.cons

let take_evt t =
let request computation result =
take_as_start t (T { computation; result })
in
Event.from_request { request }

(* *)

let[@inline] take t = take t Backoff.default
let[@inline] give t value = give t value Backoff.default
Loading

0 comments on commit 34e8de5

Please sign in to comment.