Skip to content

Commit

Permalink
Introduce Per_thread cache
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Oct 2, 2024
1 parent 849dea9 commit 906d924
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 53 deletions.
2 changes: 1 addition & 1 deletion lib/picos/bootstrap/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name picos_bootstrap)
(package picos)
(libraries backoff))
(libraries picos.thread backoff))
18 changes: 18 additions & 0 deletions lib/picos/bootstrap/picos_bootstrap.ml
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ module Fiber = struct

type t = [ `Fiber ] tdt

module Maybe = struct
type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed]
end

let create_packed ~forbid packed = Fiber { forbid; packed; fls = [||] }

let create ~forbid computation =
Expand Down Expand Up @@ -526,3 +530,17 @@ module Handler = struct
await : 'c -> Trigger.t -> (exn * Printexc.raw_backtrace) option;
}
end

module Per_thread = struct
type t = { mutable current : Fiber.Maybe.t (** *) }

let key = Picos_thread.TLS.create ()

let get () =
match Picos_thread.TLS.get_exn key with
| p -> p
| exception Picos_thread.TLS.Not_set ->
let p = { current = T Nothing } in
Picos_thread.TLS.set key p;
p
end
6 changes: 5 additions & 1 deletion lib/picos/ocaml5/picos_ocaml.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ module Fiber = struct

type _ Effect.t += Current : Fiber.t Effect.t

let current () = Effect.perform Current
let current () =
let p = Per_thread.get () in
match p.current with
| T Nothing -> Effect.perform Current
| T (Fiber _ as fiber) -> fiber

type _ Effect.t +=
| Spawn : { fiber : Fiber.t; main : Fiber.t -> unit } -> unit Effect.t
Expand Down
8 changes: 6 additions & 2 deletions lib/picos/picos.common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ module Fiber = struct
include Picos_ocaml.Fiber

module Maybe = struct
let[@inline never] not_a_fiber () = invalid_arg "not a fiber"
include Maybe

type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed]
let[@inline never] not_a_fiber () = invalid_arg "not a fiber"

let[@inline] to_fiber_or_current = function
| T Nothing -> current ()
Expand Down Expand Up @@ -83,3 +83,7 @@ module Handler = struct
include Picos_bootstrap.Handler
include Picos_ocaml.Handler
end

module Per_thread = struct
include Picos_bootstrap.Per_thread
end
10 changes: 10 additions & 0 deletions lib/picos/picos.mli
Original file line number Diff line number Diff line change
Expand Up @@ -1254,3 +1254,13 @@ module Handler : sig
implements an effect handler directly, because that is likely to perform
better. *)
end

module Per_thread : sig
(** *)

type t = { mutable current : Fiber.Maybe.t (** *) }
(** *)

val get : unit -> t
(** *)
end
28 changes: 14 additions & 14 deletions lib/picos_mux.fifo/picos_mux_fifo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,32 @@ type t = {
return : ((unit, unit) Effect.Deep.continuation -> unit) option;
discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option;
handler : (unit, unit) Effect.Deep.handler;
per_thread : Per_thread.t;
quota : int;
mutable fiber : Fiber.Maybe.t;
mutable remaining_quota : int;
mutable num_alive_fibers : int;
}

let rec next t =
match Mpscq.pop_exn t.ready with
| Spawn (fiber, main) ->
t.fiber <- Fiber.Maybe.of_fiber fiber;
t.per_thread.current <- Fiber.Maybe.of_fiber fiber;
t.remaining_quota <- t.quota;
Effect.Deep.match_with main fiber t.handler
| Return (fiber, k) ->
t.fiber <- fiber;
t.per_thread.current <- fiber;
t.remaining_quota <- t.quota;
Effect.Deep.continue k ()
| Continue (fiber, k) ->
t.fiber <- Fiber.Maybe.of_fiber fiber;
t.per_thread.current <- Fiber.Maybe.of_fiber fiber;
t.remaining_quota <- t.quota;
Fiber.continue fiber k ()
| Resume (fiber, k) ->
t.fiber <- Fiber.Maybe.of_fiber fiber;
t.per_thread.current <- Fiber.Maybe.of_fiber fiber;
t.remaining_quota <- t.quota;
Fiber.resume fiber k
| exception Mpscq.Empty ->
t.fiber <- Fiber.Maybe.nothing;
t.per_thread.current <- Fiber.Maybe.nothing;
if t.num_alive_fibers <> 0 then begin
if Atomic.get t.needs_wakeup then begin
Mutex.lock t.mutex;
Expand Down Expand Up @@ -84,7 +84,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
let rec t =
{
ready;
fiber = Fiber.Maybe.nothing;
per_thread = Per_thread.get ();
needs_wakeup;
mutex;
condition;
Expand All @@ -101,12 +101,12 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
and current =
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Effect.Deep.continue k fiber)
and yield =
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Mpscq.push t.ready (Continue (fiber, k));
next t)
and return =
Expand All @@ -118,21 +118,21 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
Effect.Deep.continue k ()
end
else begin
Mpscq.push t.ready (Return (t.fiber, k));
Mpscq.push t.ready (Return (t.per_thread.current, k));
next t
end)
and discontinue =
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Fiber.continue fiber k ())
and handler = { retc; exnc; effc }
and[@alert "-handler"] effc :
type a. a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option =
function
| Fiber.Current -> t.current
| Fiber.Spawn r ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.is_canceled fiber then t.discontinue
else begin
t.num_alive_fibers <- t.num_alive_fibers + 1;
Expand All @@ -141,7 +141,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.is_canceled fiber then t.discontinue
else
match
Expand All @@ -155,7 +155,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
| Trigger.Await trigger ->
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.try_suspend fiber trigger fiber k t.resume then next t
else
let remaining_quota = t.remaining_quota - 1 in
Expand Down
57 changes: 23 additions & 34 deletions lib/picos_mux.random/picos_mux_random.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ type t = {
mutable run : bool;
}

let fiber_key : Fiber.Maybe.t ref Picos_thread.TLS.t =
Picos_thread.TLS.create ()

let get () =
match Picos_thread.TLS.get_exn fiber_key with
| p -> p
| exception Picos_thread.TLS.Not_set ->
let p = ref Fiber.Maybe.nothing in
Picos_thread.TLS.set fiber_key p;
p

let[@inline] relaxed_wakeup t ~known_not_empty =
if
t.num_waiters_non_zero
Expand All @@ -79,24 +68,24 @@ let[@inline] relaxed_wakeup t ~known_not_empty =
Condition.signal t.condition
end

let exec p t = function
let exec (p : Per_thread.t) t = function
| Spawn (fiber, main) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.match_with main fiber t.handler
| Raise (fiber, k, exn, bt) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.discontinue_with_backtrace k exn bt
| Return (fiber, k) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.continue k ()
| Current (fiber, k) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.continue k fiber
| Continue (fiber, k) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Fiber.continue fiber k ()
| Resume (fiber, k) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Fiber.resume fiber k

let rec next p t =
Expand All @@ -105,7 +94,7 @@ let rec next p t =
relaxed_wakeup t ~known_not_empty:false;
exec p t ready
| exception Not_found ->
p := Fiber.Maybe.nothing;
p.current <- Fiber.Maybe.nothing;
if Atomic.get t.num_alive_fibers <> 0 then begin
Mutex.lock t.mutex;
let n = !(t.num_waiters) + 1 in
Expand Down Expand Up @@ -190,22 +179,22 @@ let context ?fatal_exn_handler () =
and current =
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Current (fiber, k));
next p t)
and yield =
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Continue (fiber, k));
next p t)
and return =
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Return (fiber, k));
next p t)
and handler = { retc; exnc; effc }
Expand All @@ -214,8 +203,8 @@ let context ?fatal_exn_handler () =
function
| Fiber.Current -> t.current
| Fiber.Spawn r ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.is_canceled fiber then t.yield
else begin
Atomic.incr t.num_alive_fibers;
Expand All @@ -225,8 +214,8 @@ let context ?fatal_exn_handler () =
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.is_canceled fiber then t.yield
else
match
Expand All @@ -243,8 +232,8 @@ let context ?fatal_exn_handler () =
| Trigger.Await trigger ->
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.try_suspend fiber trigger fiber k t.resume then next p t
else begin
Collection.push t.ready (Resume (fiber, k));
Expand All @@ -253,14 +242,14 @@ let context ?fatal_exn_handler () =
| _ -> None
and retc () =
Atomic.decr t.num_alive_fibers;
let p = Picos_thread.TLS.get_exn fiber_key in
let p = Per_thread.get () in
next p t
in
t

let runner_on_this_thread t =
Select.check_configured ();
next (get ()) t
next (Per_thread.get ()) t

let rec await t =
if t.num_waiters_non_zero then begin
Expand Down Expand Up @@ -289,7 +278,7 @@ let run_fiber ?context:t_opt fiber main =
t.run <- true;
Mutex.unlock t.mutex;
Collection.push t.ready (Spawn (fiber, main));
next (get ()) t;
next (Per_thread.get ()) t;
Mutex.lock t.mutex;
await t
end
Expand Down
2 changes: 1 addition & 1 deletion test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
(modules test_picos_dscheck picos_bootstrap)
(build_if
(>= %{ocaml_version} 5))
(libraries picos backoff traced_atomic dscheck alcotest))
(libraries picos.thread backoff traced_atomic dscheck alcotest))

;;

Expand Down

0 comments on commit 906d924

Please sign in to comment.