Skip to content

Commit

Permalink
Add a scalable Countdown counter
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Oct 11, 2024
1 parent 56caff3 commit 7ab74a1
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 49 deletions.
28 changes: 13 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,30 +110,28 @@ create a module `Bench_atomic` for our benchmarks suite on atomics:
it difficult to get useful results. *)
in
(* We store the number of operations to perform in an atomic. The idea
is that we want all the workers or domains to work at the same time
as much as possible, because we want to measure performance under
contention. So, instead of e.g. simply having each domain run a
fixed count loop, which could lead to some domains finishing well
before others, we let the number of operations performed by each domain
vary. *)
(* We store the number of operations to perform in a scalable countdown
counter. The idea is that we want all the workers or domains to work
at the same time as much as possible, because we want to measure
performance under contention. So, instead of e.g. simply having each
domain run a fixed count loop, which could lead to some domains
finishing well before others, we let the number of operations performed
by each domain vary. *)
let n_ops_to_do =
Atomic.make 0
|> Multicore_magic.copy_as_padded
(* We also explicitly pad the number of ops to avoid false sharing. *)
Countdown.create ~n_domains ()
in
(* [init] is called on each domain before [work]. The return value of
[init] is passed to [work]. *)
let init _domain_index =
(* It doesn't matter that we set the atomic multiple times. We could
also use a [before] callback to do setup before [work]. *)
Atomic.set n_ops_to_do n
(* It doesn't matter that we set the countdown counter multiple times.
We could also use a [before] callback to do setup before [work]. *)
Countdown.non_atomic_set n_ops_to_do n
in
(* [work] is called on each domain and the time it takes is recorded.
The second argument comes from [init]. *)
let work _domain_index () =
let work domain_index () =
(* Because we are benchmarking operations that take a very small amount
of time, we run our own loop to perform the operations. This has
pros and cons. One con is that the loop overhead will be part of the
Expand All @@ -142,7 +140,7 @@ create a module `Bench_atomic` for our benchmarks suite on atomics:
ways. *)
let rec work () =
(* We try to allocate some number of operations to perform. *)
let n = Util.alloc n_ops_to_do in
let n = Countdown.alloc n_ops_to_do ~domain_index ~batch:100 in
(* If we got zero, then we should stop. *)
if n <> 0 then begin
(* Otherwise we perform the operations and try again. *)
Expand Down
14 changes: 8 additions & 6 deletions bench/bench_bounded_q.ml
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,19 @@ let run_one ~budgetf ~n_adders ~n_takers ?(n_msgs = 50 * Util.iter_factor) () =

let t = Bounded_q.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in
let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in

let init _ =
assert (Bounded_q.is_empty t);
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
Countdown.non_atomic_set n_msgs_to_take n_msgs;
Countdown.non_atomic_set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let domain_index = i in
let rec work () =
let n = Util.alloc n_msgs_to_add in
let n = Countdown.alloc n_msgs_to_add ~domain_index ~batch:100 in
if 0 < n then begin
for i = 1 to n do
Bounded_q.push t i
Expand All @@ -119,8 +120,9 @@ let run_one ~budgetf ~n_adders ~n_takers ?(n_msgs = 50 * Util.iter_factor) () =
in
work ()
else
let domain_index = i - n_adders in
let rec work () =
let n = Util.alloc n_msgs_to_take in
let n = Countdown.alloc n_msgs_to_take ~domain_index ~batch:100 in
if n <> 0 then begin
for _ = 1 to n do
ignore (Bounded_q.pop t)
Expand Down
12 changes: 6 additions & 6 deletions bench/bench_hashtbl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ let run_one ~budgetf ~n_domains ~use_mutex ?(n_keys = 1000) ~percent_mem
let n_ops = (if use_mutex then 100 else 400) * Util.iter_factor in
let n_ops = (100 + percent_mem) * n_ops / 100 in

let n_ops_todo = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_ops_todo = Countdown.create ~n_domains () in

let init _ =
Atomic.set n_ops_todo n_ops;
Countdown.non_atomic_set n_ops_todo n_ops;
Random.State.make_self_init ()
in
let work_no_mutex _ state =
let work_no_mutex domain_index state =
let rec work () =
let n = Util.alloc n_ops_todo in
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:100 in
if n <> 0 then
let rec loop n =
if 0 < n then
Expand All @@ -65,9 +65,9 @@ let run_one ~budgetf ~n_domains ~use_mutex ?(n_keys = 1000) ~percent_mem
in
work ()
in
let work_mutex _ state =
let work_mutex domain_index state =
let rec work () =
let n = Util.alloc n_ops_todo in
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:100 in
if n <> 0 then
let rec loop n =
if 0 < n then
Expand Down
12 changes: 6 additions & 6 deletions bench/bench_incr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ let run_one ~budgetf ~n_domains ~approach () =

let n_ops = 500 * Util.iter_factor / n_domains in

let n_ops_todo = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_ops_todo = Countdown.create ~n_domains () in

let init _ = Atomic.set n_ops_todo n_ops in
let work _ () =
let init _ = Countdown.non_atomic_set n_ops_todo n_ops in
let work domain_index () =
match approach with
| `Cas ->
let rec work () =
let n = Util.alloc n_ops_todo in
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:100 in
if n <> 0 then
let rec loop n =
if 0 < n then begin
Expand All @@ -27,7 +27,7 @@ let run_one ~budgetf ~n_domains ~approach () =
work ()
| `Cas_backoff ->
let rec work () =
let n = Util.alloc n_ops_todo in
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:100 in
if n <> 0 then
let rec loop backoff n =
if 0 < n then begin
Expand All @@ -43,7 +43,7 @@ let run_one ~budgetf ~n_domains ~approach () =
work ()
| `Incr ->
let rec work () =
let n = Util.alloc n_ops_todo in
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:100 in
if n <> 0 then
let rec loop n =
if 0 < n then begin
Expand Down
60 changes: 60 additions & 0 deletions lib/countdown.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
module Atomic = Multicore_magic.Transparent_atomic

type t = int Atomic.t array

let ceil_pow_2_minus_1 n =
let n = Nativeint.of_int n in
let n = Nativeint.logor n (Nativeint.shift_right_logical n 1) in
let n = Nativeint.logor n (Nativeint.shift_right_logical n 2) in
let n = Nativeint.logor n (Nativeint.shift_right_logical n 4) in
let n = Nativeint.logor n (Nativeint.shift_right_logical n 8) in
let n = Nativeint.logor n (Nativeint.shift_right_logical n 16) in
Nativeint.to_int
(if Sys.int_size > 32 then
Nativeint.logor n (Nativeint.shift_right_logical n 32)
else n)

let create ~n_domains () =
if n_domains < 1 then invalid_arg "n_domains < 1";
let n = ceil_pow_2_minus_1 n_domains in
let atomics = Array.init n_domains (fun _ -> Atomic.make_contended 0) in
Array.init n @@ fun i -> Array.unsafe_get atomics (i mod n_domains)

let rec arity t i =
if i < Array.length t && Array.unsafe_get t i != Array.unsafe_get t 0 then
arity t (i + 1)
else i

let[@inline] arity t = arity t 1

let non_atomic_set t count =
if count < 0 then invalid_arg "count < 0";
let n = arity t in
let d = count / n in
let j = count - (n * d) in
for i = 0 to n - 1 do
Atomic.set (Array.unsafe_get t i) (d + Bool.to_int (i < j))
done

let rec get t count i =
if i < Array.length t && Array.unsafe_get t i != Array.unsafe_get t 0 then
get t (count + Int.max 0 (Atomic.get (Array.unsafe_get t i))) (i + 1)
else count

let[@inline] get t = get t (Int.max 0 (Atomic.get (Array.unsafe_get t 0))) 1

let rec alloc t ~batch i =
if i < Array.length t then
let c = Array.unsafe_get t i in
if 0 < Atomic.get c then
let n = Atomic.fetch_and_add c (-batch) in
if 0 < n then Int.min n batch else alloc t ~batch (i + 1)
else alloc t ~batch (i + 1)
else 0

let[@inline] alloc t ~domain_index ~batch =
let c = Array.unsafe_get t domain_index in
if 0 < Atomic.get c then
let n = Atomic.fetch_and_add c (-batch) in
if 0 < n then Int.min n batch else alloc t ~batch 0
else alloc t ~batch 0
6 changes: 6 additions & 0 deletions lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ let () =
Jbuild_plugin.V1.send
@@ {|

(rule
(enabled_if
(< %{ocaml_version} 4.13.0))
(action
(copy int.ocaml_4_12.ml int.ml)))

(library
(public_name multicore-bench)
(name multicore_bench)
Expand Down
4 changes: 4 additions & 0 deletions lib/int.ocaml_4_12.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
include Stdlib.Int

let min (x : t) (y : t) = if x < y then x else y
let max (x : t) (y : t) = if x < y then y else x
1 change: 1 addition & 0 deletions lib/multicore_bench.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ module Unit_of_time = Unit_of_time
module Times = Times
module Suite = Suite
module Cmd = Cmd
module Countdown = Countdown
module Util = Util
27 changes: 27 additions & 0 deletions lib/multicore_bench.mli
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,33 @@ module Cmd : sig
the benchmark executable. *)
end

module Countdown : sig
(** Scalable low-level countdown. *)

type t
(** Represents a countdown counter. *)

val create : n_domains:int -> unit -> t
(** [create ~n_domains ()] returns a new countdown counter with initial value
of [0]. *)

val non_atomic_set : t -> int -> unit
(** [non_atomic_set countdown count] sets the [count] of the [countdown].
⚠️ This operation is not atomic. However, it is safe to call
[non_atomic_set] with the same [countdown] and [count] in parallel,
because the [countdown] will be initialized deterministically. *)

val get : t -> int
(** [get countdown] returns the count of the [countdown]. *)

val alloc : t -> domain_index:int -> batch:int -> int
(** [alloc countdown ~domain_index ~batch] tries to reduce the count of the
[countdown] by at most [batch] (which must be positive) and returns the
number by which the count was reduced or [0] in case the count was already
[0]. *)
end

module Util : sig
(** Utilities for creating benchmarks.
Expand Down
2 changes: 1 addition & 1 deletion lib/times.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ let record (type a) ~budgetf ~n_domains ?(ensure_multi_domain = true)
wrap;
results =
Array.init n_domains (fun _ ->
Float.Array.create (max n_runs_min n_runs_max));
Float.Array.create (Int.max n_runs_min n_runs_max));
budget_start = Mtime_clock.elapsed ();
before;
init;
Expand Down
32 changes: 17 additions & 15 deletions lib/util.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ let rec alloc ?(batch = 1000) counter =
let n = Atomic.get counter in
if n = 0 then 0
else
let batch = min n batch in
let batch = Int.min n batch in
if Atomic.compare_and_set counter n (n - batch) then batch
else alloc ~batch counter

Expand All @@ -33,21 +33,23 @@ module Bits = struct

let length t = t.length

let rec iter fn t i =
let iter fn t =
let i = ref 0 in
let n = t.length in
if i < n then begin
let byte = Char.code (Bytes.unsafe_get t.bytes (i lsr 3)) in
let bit = ref 1 in
let bit_limit = 1 lsl if n - i < 8 then n - i else 8 in
while !bit < bit_limit do
let b = 0 <> byte land !bit in
bit := !bit + !bit;
fn b
done;
iter fn t (i + 8)
end

let iter fn t = iter fn t 0
while !i < n do
let ix = !i in
i := !i + 8;
let byte = Char.code (Bytes.unsafe_get t.bytes (ix lsr 3)) in
let n = n - ix in
fn (0 <> byte land 1);
if 1 < n then fn (0 <> byte land 2);
if 2 < n then fn (0 <> byte land 4);
if 3 < n then fn (0 <> byte land 8);
if 4 < n then fn (0 <> byte land 16);
if 5 < n then fn (0 <> byte land 32);
if 6 < n then fn (0 <> byte land 64);
if 7 < n then fn (0 <> byte land 128)
done
end

let generate_push_and_pop_sequence ?(state = Random.State.make_self_init ())
Expand Down

0 comments on commit 7ab74a1

Please sign in to comment.