From f88a519126758905f124f144e50fffed7bb709d5 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Sat, 10 Aug 2024 03:58:01 +0300 Subject: [PATCH] Introduce `Per_thread` cache --- lib/picos/bootstrap/dune | 1 + lib/picos/bootstrap/picos_bootstrap.ml | 11 ++++++ lib/picos/ocaml5/picos_ocaml.ml | 6 +++- lib/picos/picos.common.ml | 8 +++-- lib/picos/picos.mli | 10 ++++++ lib/picos_fifos/picos_fifos.ml | 28 +++++++-------- lib/picos_randos/picos_randos.ml | 48 ++++++++++++-------------- test/dune | 2 +- 8 files changed, 71 insertions(+), 43 deletions(-) diff --git a/lib/picos/bootstrap/dune b/lib/picos/bootstrap/dune index b525fc31d..942cbece2 100644 --- a/lib/picos/bootstrap/dune +++ b/lib/picos/bootstrap/dune @@ -3,4 +3,5 @@ (package picos) (libraries (re_export picos.exn_bt) + picos.thread backoff)) diff --git a/lib/picos/bootstrap/picos_bootstrap.ml b/lib/picos/bootstrap/picos_bootstrap.ml index 870fc84d8..6ebdf5505 100644 --- a/lib/picos/bootstrap/picos_bootstrap.ml +++ b/lib/picos/bootstrap/picos_bootstrap.ml @@ -230,6 +230,10 @@ module Fiber = struct type t = [ `Fiber ] tdt + module Maybe = struct + type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed] + end + let create_packed ~forbid packed = Fiber { forbid; packed; fls = [||] } let create ~forbid computation = @@ -392,3 +396,10 @@ module Handler = struct await : 'c -> Trigger.t -> Exn_bt.t option; } end + +module Per_thread = struct + type t = { mutable current : Fiber.Maybe.t (** *) } + + let key = Picos_thread.TLS.new_key @@ fun () -> { current = T Nothing } + let get () = Picos_thread.TLS.get key +end diff --git a/lib/picos/ocaml5/picos_ocaml.ml b/lib/picos/ocaml5/picos_ocaml.ml index 6a8085a47..ba3e1eca2 100644 --- a/lib/picos/ocaml5/picos_ocaml.ml +++ b/lib/picos/ocaml5/picos_ocaml.ml @@ -22,7 +22,11 @@ module Fiber = struct type _ Effect.t += Current : Fiber.t Effect.t - let current () = Effect.perform Current + let current () = + let p = Per_thread.get () in + match p.current with + | T Nothing -> Effect.perform Current + | T (Fiber _ as fiber) -> fiber type _ Effect.t += | Spawn : { fiber : Fiber.t; main : Fiber.t -> unit } -> unit Effect.t diff --git a/lib/picos/picos.common.ml b/lib/picos/picos.common.ml index 52e539231..1caef169c 100644 --- a/lib/picos/picos.common.ml +++ b/lib/picos/picos.common.ml @@ -29,9 +29,9 @@ module Fiber = struct include Picos_ocaml.Fiber module Maybe = struct - let[@inline never] not_a_fiber () = invalid_arg "not a fiber" + include Maybe - type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed] + let[@inline never] not_a_fiber () = invalid_arg "not a fiber" let[@inline] to_fiber_or_current = function | T Nothing -> current () @@ -85,3 +85,7 @@ module Handler = struct include Picos_bootstrap.Handler include Picos_ocaml.Handler end + +module Per_thread = struct + include Picos_bootstrap.Per_thread +end diff --git a/lib/picos/picos.mli b/lib/picos/picos.mli index 4c61edf94..8a1c9ca7b 100644 --- a/lib/picos/picos.mli +++ b/lib/picos/picos.mli @@ -1152,3 +1152,13 @@ module Handler : sig implements an effect handler directly, because that is likely to perform better. *) end + +module Per_thread : sig + (** *) + + type t = { mutable current : Fiber.Maybe.t (** *) } + (** *) + + val get : unit -> t + (** *) +end diff --git a/lib/picos_fifos/picos_fifos.ml b/lib/picos_fifos/picos_fifos.ml index 1267334bd..b5a1c89c1 100644 --- a/lib/picos_fifos/picos_fifos.ml +++ b/lib/picos_fifos/picos_fifos.ml @@ -26,31 +26,31 @@ type t = { return : ((unit, unit) Effect.Deep.continuation -> unit) option; discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option; handler : (unit, unit) Effect.Deep.handler; + per_thread : Per_thread.t; quota : int; - mutable fiber : Fiber.Maybe.t; mutable remaining_quota : int; } let rec next t = match Picos_mpscq.pop_exn t.ready with | Spawn (fiber, main) -> - t.fiber <- Fiber.Maybe.of_fiber fiber; + t.per_thread.current <- Fiber.Maybe.of_fiber fiber; t.remaining_quota <- t.quota; Effect.Deep.match_with main fiber t.handler | Return (fiber, k) -> - t.fiber <- fiber; + t.per_thread.current <- fiber; t.remaining_quota <- t.quota; Effect.Deep.continue k () | Continue (fiber, k) -> - t.fiber <- Fiber.Maybe.of_fiber fiber; + t.per_thread.current <- Fiber.Maybe.of_fiber fiber; t.remaining_quota <- t.quota; Fiber.continue fiber k () | Resume (fiber, k) -> - t.fiber <- Fiber.Maybe.of_fiber fiber; + t.per_thread.current <- Fiber.Maybe.of_fiber fiber; t.remaining_quota <- t.quota; Fiber.resume fiber k | exception Picos_mpscq.Empty -> - t.fiber <- Fiber.Maybe.nothing; + t.per_thread.current <- Fiber.Maybe.nothing; if Atomic.get t.num_alive_fibers <> 0 then begin if Atomic.get t.needs_wakeup then begin Mutex.lock t.mutex; @@ -87,7 +87,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = let rec t = { ready; - fiber = Fiber.Maybe.nothing; + per_thread = Per_thread.get (); needs_wakeup; num_alive_fibers; mutex; @@ -107,12 +107,12 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = later. *) Some (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in Effect.Deep.continue k fiber) and yield = Some (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in Picos_mpscq.push t.ready (Continue (fiber, k)); next t) and return = @@ -124,13 +124,13 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = Effect.Deep.continue k () end else begin - Picos_mpscq.push t.ready (Return (t.fiber, k)); + Picos_mpscq.push t.ready (Return (t.per_thread.current, k)); next t end) and discontinue = Some (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in Fiber.continue fiber k ()) and handler = { retc; exnc; effc } and[@alert "-handler"] effc : @@ -143,7 +143,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = | Fiber.Spawn r -> (* We check cancelation status once and then either perform the whole operation or discontinue the fiber. *) - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in if Fiber.is_canceled fiber then t.discontinue else begin Atomic.incr t.num_alive_fibers; @@ -154,7 +154,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = | Computation.Cancel_after r -> begin (* We check cancelation status once and then either perform the whole operation or discontinue the fiber. *) - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in if Fiber.is_canceled fiber then t.discontinue else match @@ -168,7 +168,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = | Trigger.Await trigger -> Some (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in if Fiber.try_suspend fiber trigger fiber k t.resume then next t else let remaining_quota = t.remaining_quota - 1 in diff --git a/lib/picos_randos/picos_randos.ml b/lib/picos_randos/picos_randos.ml index eb8863a8b..5cff35d36 100644 --- a/lib/picos_randos/picos_randos.ml +++ b/lib/picos_randos/picos_randos.ml @@ -53,30 +53,28 @@ type t = { mutable run : bool; } -let fiber_key = Picos_thread.TLS.new_key @@ fun () -> ref Fiber.Maybe.nothing - -let rec next p t = +let rec next (p : Per_thread.t) t = match Collection.pop_exn t.ready with | Spawn (fiber, main) -> - p := Fiber.Maybe.of_fiber fiber; + p.current <- Fiber.Maybe.of_fiber fiber; Effect.Deep.match_with main fiber t.handler | Raise (fiber, k, exn_bt) -> - p := Fiber.Maybe.of_fiber fiber; + p.current <- Fiber.Maybe.of_fiber fiber; Exn_bt.discontinue k exn_bt | Return (fiber, k) -> - p := Fiber.Maybe.of_fiber fiber; + p.current <- Fiber.Maybe.of_fiber fiber; Effect.Deep.continue k () | Current (fiber, k) -> - p := Fiber.Maybe.of_fiber fiber; + p.current <- Fiber.Maybe.of_fiber fiber; Effect.Deep.continue k fiber | Continue (fiber, k) -> - p := Fiber.Maybe.of_fiber fiber; + p.current <- Fiber.Maybe.of_fiber fiber; Fiber.continue fiber k () | Resume (fiber, k) -> - p := Fiber.Maybe.of_fiber fiber; + p.current <- Fiber.Maybe.of_fiber fiber; Fiber.resume fiber k | exception Not_found -> - p := Fiber.Maybe.nothing; + p.current <- Fiber.Maybe.nothing; if Atomic.get t.num_alive_fibers <> 0 then begin Mutex.lock t.mutex; if Collection.is_empty t.ready && Atomic.get t.num_alive_fibers <> 0 @@ -158,22 +156,22 @@ let context ?fatal_exn_handler () = and current = Some (fun k -> - let p = Picos_thread.TLS.get fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in Collection.push t.ready (Current (fiber, k)); next p t) and yield = Some (fun k -> - let p = Picos_thread.TLS.get fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in Collection.push t.ready (Continue (fiber, k)); next p t) and return = Some (fun k -> - let p = Picos_thread.TLS.get fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in Collection.push t.ready (Return (fiber, k)); next p t) and handler = { retc; exnc; effc } @@ -182,8 +180,8 @@ let context ?fatal_exn_handler () = function | Fiber.Current -> t.current | Fiber.Spawn r -> - let p = Picos_thread.TLS.get fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in if Fiber.is_canceled fiber then t.yield else begin Atomic.incr t.num_alive_fibers; @@ -193,8 +191,8 @@ let context ?fatal_exn_handler () = end | Fiber.Yield -> t.yield | Computation.Cancel_after r -> begin - let p = Picos_thread.TLS.get fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in if Fiber.is_canceled fiber then t.yield else match @@ -211,8 +209,8 @@ let context ?fatal_exn_handler () = | Trigger.Await trigger -> Some (fun k -> - let p = Picos_thread.TLS.get fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in if Fiber.try_suspend fiber trigger fiber k t.resume then next p t else begin Collection.push t.ready (Resume (fiber, k)); @@ -221,14 +219,14 @@ let context ?fatal_exn_handler () = | _ -> None and retc () = Atomic.decr t.num_alive_fibers; - let p = Picos_thread.TLS.get fiber_key in + let p = Per_thread.get () in next p t in t let runner_on_this_thread t = Select.check_configured (); - next (Picos_thread.TLS.get fiber_key) t + next (Per_thread.get ()) t let rec await t = if !(t.num_waiters_non_zero) then begin @@ -257,7 +255,7 @@ let run_fiber ?context:t_opt fiber main = t.run <- true; Mutex.unlock t.mutex; Collection.push t.ready (Spawn (fiber, main)); - next (Picos_thread.TLS.get fiber_key) t; + next (Per_thread.get ()) t; Mutex.lock t.mutex; await t end diff --git a/test/dune b/test/dune index 3296bdf10..e97f788b1 100644 --- a/test/dune +++ b/test/dune @@ -66,7 +66,7 @@ (modules test_picos_dscheck picos_bootstrap) (build_if (>= %{ocaml_version} 5)) - (libraries picos.exn_bt backoff traced_atomic dscheck alcotest)) + (libraries picos.exn_bt picos.thread backoff traced_atomic dscheck alcotest)) ;;