-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
560 additions
and
149 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
open Multicore_bench | ||
open Picos | ||
open Picos_std_sync | ||
open Picos_std_structured | ||
|
||
let run_one_domain ~budgetf () = | ||
let n_msgs = 200 * Util.iter_factor in | ||
let t = Ch.create () in | ||
let giver () = | ||
for i = 1 to n_msgs do | ||
Ch.give t i | ||
done | ||
and taker () = | ||
for _ = 1 to n_msgs do | ||
Ch.take t |> ignore | ||
done | ||
in | ||
let init _ = () in | ||
let wrap _ () = Scheduler.run in | ||
let work _ () = | ||
Run.all (if Random.bool () then [ taker; giver ] else [ giver; taker ]) | ||
in | ||
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () | ||
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" | ||
|
||
let yielder () = | ||
while true do | ||
Fiber.yield () | ||
done | ||
|
||
let run_one ~budgetf ~n_givers ~n_takers () = | ||
let n_domains = n_givers + n_takers in | ||
|
||
let n_msgs = 200 / n_domains * Util.iter_factor in | ||
|
||
let t = Ch.create ~padded:true () in | ||
|
||
let n_msgs_to_give = Atomic.make 0 |> Multicore_magic.copy_as_padded in | ||
let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in | ||
|
||
let init _ = | ||
Atomic.set n_msgs_to_give n_msgs; | ||
Atomic.set n_msgs_to_take n_msgs | ||
in | ||
let wrap _ () = Scheduler.run in | ||
let work i () = | ||
Flock.join_after ~on_return:`Terminate @@ fun () -> | ||
Flock.fork yielder; | ||
begin | ||
if i < n_givers then | ||
let rec work () = | ||
let n = Util.alloc n_msgs_to_give in | ||
if 0 < n then begin | ||
for i = 1 to n do | ||
Ch.give t i | ||
done; | ||
work () | ||
end | ||
in | ||
work () | ||
else | ||
let rec work () = | ||
let n = Util.alloc n_msgs_to_take in | ||
if 0 < n then begin | ||
for _ = 1 to n do | ||
Ch.take t |> ignore | ||
done; | ||
work () | ||
end | ||
in | ||
work () | ||
end | ||
in | ||
|
||
let config = | ||
let format role n = | ||
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") | ||
in | ||
Printf.sprintf "%s, %s" (format "giver" n_givers) (format "taker" n_takers) | ||
in | ||
Times.record ~budgetf ~n_domains ~init ~wrap ~work () | ||
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config | ||
|
||
let run_suite ~budgetf = | ||
run_one_domain ~budgetf () | ||
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ] | ||
|> List.concat_map @@ fun (n_givers, n_takers) -> | ||
if Picos_domain.recommended_domain_count () < n_givers + n_takers then [] | ||
else run_one ~budgetf ~n_givers ~n_takers ()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
open Picos_std_event | ||
open Picos | ||
module Atomic = Multicore_magic.Transparent_atomic | ||
module Tx = Computation.Tx | ||
|
||
type 'a taker = | ||
| T : { | ||
computation : (unit -> 'r) Computation.t; | ||
result : 'a -> 'r; | ||
} | ||
-> 'a taker | ||
|
||
type 'a giver = | ||
| G : { | ||
computation : (unit -> 'r) Computation.t; | ||
result : unit -> 'r; | ||
value : 'a; | ||
} | ||
-> 'a giver | ||
|
||
type 'a state = { givers : 'a giver Q.t; takers : 'a taker Q.t } | ||
|
||
let empty = { givers = T Zero; takers = T Zero } | ||
|
||
type 'a t = 'a state Atomic.t | ||
|
||
let create ?padded () = Atomic.make empty |> Multicore_magic.copy_as ?padded | ||
|
||
(* *) | ||
|
||
let[@inline never] wait computation = | ||
let trigger = Trigger.create () in | ||
if Computation.try_attach computation trigger then | ||
match Trigger.await trigger with | ||
| None -> () | ||
| Some (exn, bt) -> | ||
if Computation.try_cancel computation Exit bt then | ||
Printexc.raise_with_backtrace exn bt | ||
|
||
(* *) | ||
|
||
let rec give t value backoff = | ||
let before = Atomic.fenceless_get t in | ||
match before.takers with | ||
| Q.T Zero -> | ||
let computation = Computation.create ~mode:`LIFO () in | ||
let self = G { computation; result = Fun.id; value } in | ||
let givers = Q.add before.givers self in | ||
let after = { givers; takers = T Zero } in | ||
if Atomic.compare_and_set t before after then wait computation | ||
else give t value (Backoff.once backoff) | ||
| Q.T (One _ as takers) -> | ||
let (T { computation; result }) = Q.head takers in | ||
let got = Computation.try_return computation (fun () -> result value) in | ||
let takers = Q.tail takers in | ||
let givers = before.givers in | ||
let after = | ||
if takers == T Zero && givers == T Zero then empty | ||
else { givers; takers } | ||
in | ||
let no_contention = Atomic.compare_and_set t before after in | ||
if not got then | ||
give t value (if no_contention then backoff else Backoff.once backoff) | ||
|
||
let rec take t backoff = | ||
let before = Atomic.fenceless_get t in | ||
match before.givers with | ||
| Q.T Zero -> | ||
let computation = Computation.create ~mode:`LIFO () in | ||
let self = T { computation; result = Fun.id } in | ||
let takers = Q.add before.takers self in | ||
let after = { givers = T Zero; takers } in | ||
if Atomic.compare_and_set t before after then begin | ||
wait computation; | ||
Computation.await computation () | ||
end | ||
else take t (Backoff.once backoff) | ||
| Q.T (One _ as givers) -> | ||
let (G { computation; result; value }) = Q.head givers in | ||
let got = Computation.try_return computation result in | ||
let givers = Q.tail givers in | ||
let takers = before.takers in | ||
let after = | ||
if givers == T Zero && takers == T Zero then empty | ||
else { givers; takers } | ||
in | ||
let no_contention = Atomic.compare_and_set t before after in | ||
if got then value | ||
else take t (if no_contention then backoff else Backoff.once backoff) | ||
|
||
(* *) | ||
|
||
let rec give_as t (G gr as self) before selfs (Cons head_r as head : _ S.cons) | ||
tail = | ||
let (T tr as taker) = head_r.value in | ||
if Tx.same tr.computation gr.computation then | ||
let selfs = S.cons taker selfs in | ||
give_as_advance t self before selfs head tail | ||
else | ||
let tx = Tx.create () in | ||
let result = tr.result in | ||
let value = gr.value in | ||
if not (Tx.try_return tx tr.computation (fun () -> result value)) then | ||
give_as_advance t self before selfs head tail | ||
else if | ||
(not (Tx.try_return tx gr.computation gr.result)) | ||
|| not (Tx.try_commit tx) | ||
then | ||
if not (Computation.is_running gr.computation) then ( (* TODO *) ) | ||
else if Computation.is_running tr.computation then | ||
give_as t self before selfs head tail | ||
else give_as_advance t self before selfs head tail | ||
else | ||
let takers = | ||
if head == tail then Q.reverse_as_queue selfs | ||
else | ||
let head = S.reverse_to (S.as_cons head_r.next) selfs in | ||
Q.T (One { head; tail; cons = tail }) | ||
in | ||
let givers = before.givers in | ||
let after = | ||
if takers == Q.T Zero && givers == Q.T Zero then empty | ||
else { givers; takers } | ||
in | ||
if not (Atomic.compare_and_set t before after) then | ||
( (* TODO: avoid leak *) ) | ||
|
||
and give_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail = | ||
if head != tail then give_as t self before selfs (S.as_cons head_r.next) tail | ||
else | ||
let takers = Q.reverse_as_queue selfs in | ||
let givers = Q.add before.givers self in | ||
let after = { givers; takers } in | ||
if not (Atomic.compare_and_set t before after) then give_as_start t self | ||
|
||
and give_as_start t self = | ||
let before = Atomic.get t in | ||
match before.takers with | ||
| Q.T Zero -> | ||
let takers = Q.T Zero in | ||
let givers = Q.singleton self in | ||
let after = { givers; takers } in | ||
if not (Atomic.compare_and_set t before after) then give_as_start t self | ||
| Q.T (One r as o) -> | ||
Q.exec o; | ||
give_as t self before (T Nil) r.head r.cons | ||
|
||
let give_evt t value = | ||
let request computation result = | ||
give_as_start t (G { computation; result; value }) | ||
in | ||
Event.from_request { request } | ||
|
||
(* *) | ||
|
||
let rec take_as t (T tr as self) before selfs (Cons head_r as head : _ S.cons) | ||
tail = | ||
let (G gr as giver) = head_r.value in | ||
if Tx.same tr.computation gr.computation then | ||
let selfs = S.cons giver selfs in | ||
take_as_advance t self before selfs head tail | ||
else | ||
let tx = Tx.create () in | ||
let result = tr.result in | ||
let value = gr.value in | ||
if not (Tx.try_return tx gr.computation gr.result) then | ||
take_as_advance t self before selfs head tail | ||
else if | ||
(not (Tx.try_return tx tr.computation (fun () -> result value))) | ||
|| not (Tx.try_commit tx) | ||
then | ||
if not (Computation.is_running gr.computation) then ( (* TODO *) ) | ||
else if Computation.is_running tr.computation then | ||
take_as t self before selfs head tail | ||
else take_as_advance t self before selfs head tail | ||
else | ||
let takers = before.takers in | ||
let givers = | ||
if head == tail then Q.reverse_as_queue selfs | ||
else | ||
let head = S.reverse_to (S.as_cons head_r.next) selfs in | ||
Q.T (One { head; tail; cons = tail }) | ||
in | ||
let after = | ||
if takers == Q.T Zero && givers == Q.T Zero then empty | ||
else { givers; takers } | ||
in | ||
if not (Atomic.compare_and_set t before after) then | ||
( (* TODO: avoid leak *) ) | ||
|
||
and take_as_advance t self before selfs (Cons head_r as head : _ S.cons) tail = | ||
if head != tail then take_as t self before selfs (S.as_cons head_r.next) tail | ||
else | ||
let givers = Q.reverse_as_queue selfs in | ||
let takers = Q.add before.takers self in | ||
let after = { givers; takers } in | ||
if not (Atomic.compare_and_set t before after) then take_as_start t self | ||
|
||
and take_as_start t self = | ||
let before = Atomic.get t in | ||
match before.givers with | ||
| Q.T Zero -> | ||
let givers = Q.T Zero in | ||
let takers = Q.singleton self in | ||
let after = { givers; takers } in | ||
if not (Atomic.compare_and_set t before after) then take_as_start t self | ||
| Q.T (One r as o) -> | ||
Q.exec o; | ||
take_as t self before (T Nil) r.head r.cons | ||
|
||
let take_evt t = | ||
let request computation result = | ||
take_as_start t (T { computation; result }) | ||
in | ||
Event.from_request { request } | ||
|
||
(* *) | ||
|
||
let[@inline] take t = take t Backoff.default | ||
let[@inline] give t value = give t value Backoff.default |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.