Skip to content

Commit

Permalink
Introduce Per_thread cache
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Nov 28, 2024
1 parent 84d5428 commit c63d381
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 67 deletions.
25 changes: 12 additions & 13 deletions lib/picos/dune
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,25 @@

;;

(library
(name picos_thread_ocaml4)
(package picos)
(modules)
(rule
(enabled_if
(< %{ocaml_version} 5.0.0))
(libraries
(re_export picos.thread)))
(action
(copy intf.ocaml4.ml intf.ml)))

(rule
(enabled_if
(<= 5.0.0 %{ocaml_version}))
(action
(copy intf.ocaml5.ml intf.ml)))

;;

(library
(name picos)
(public_name picos)
(modules picos intf)
(libraries
backoff
(select
intf.ml
from
(picos_thread_ocaml4 -> intf.ocaml4.ml)
(-> intf.ocaml5.ml))))
(libraries backoff picos.thread))

(mdx
(package picos_meta)
Expand Down
1 change: 1 addition & 0 deletions lib/picos/intf.ocaml4.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ end

module type Fiber = sig
type t
type maybe
type _ computation
end
11 changes: 11 additions & 0 deletions lib/picos/intf.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ end

module type Fiber = sig
type t
type maybe
type _ computation

val resume :
Expand Down Expand Up @@ -159,4 +160,14 @@ module type Fiber = sig
type _ Effect.t +=
private
| Spawn : { fiber : t; main : t -> unit } -> unit Effect.t

module Per_thread : sig
(** *)

type t = { mutable current : maybe (** *) }
(** *)

val get : unit -> t
(** *)
end
end
5 changes: 4 additions & 1 deletion lib/picos/picos.mli
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,10 @@ module Fiber : sig
@raise Invalid_argument if the trigger is not in the signaled state. *)

include
Intf.Fiber with type t := t with type 'a computation := 'a Computation.t
Intf.Fiber
with type t := t
with type maybe = Maybe.t
with type 'a computation := 'a Computation.t

(** {2 Design rationale}
Expand Down
24 changes: 22 additions & 2 deletions lib/picos/picos.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ module Fiber = struct
if key < Array.length fls then Array.unsafe_set fls key unique
end

type maybe = T : [< `Nothing | `Fiber ] tdt -> maybe [@@unboxed]

(* END FIBER BOOTSTRAP *)

let resume t k = Effect.Deep.continue k (canceled t)
Expand All @@ -603,9 +605,27 @@ module Fiber = struct
| None -> Effect.Shallow.continue_with k v h
| Some (exn, bt) -> Effect.Shallow.discontinue_with_backtrace k exn bt h

module Per_thread = struct
type t = { mutable current : maybe (** *) }

let key = Picos_thread.TLS.create ()

let get () =
match Picos_thread.TLS.get_exn key with
| p -> p
| exception Picos_thread.TLS.Not_set ->
let p = { current = T Nothing } in
Picos_thread.TLS.set key p;
p
end

type _ Effect.t += Current : t Effect.t

let current () = Effect.perform Current
let current () =
let p = Per_thread.get () in
match p.current with
| T (Fiber _ as fiber) -> fiber
| T Nothing -> Effect.perform Current

type _ Effect.t += Spawn : { fiber : t; main : t -> unit } -> unit Effect.t

Expand All @@ -620,7 +640,7 @@ module Fiber = struct
module Maybe = struct
let[@inline never] not_a_fiber _ = invalid_arg "not a fiber"

type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed]
type t = maybe

let[@inline] to_fiber_or_current = function
| T Nothing -> current ()
Expand Down
25 changes: 14 additions & 11 deletions lib/picos_mux.fifo/picos_mux_fifo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type t = {
mutable return : ((unit, unit) Effect.Deep.continuation -> unit) option;
mutable discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option;
mutable handler : (unit, unit) Effect.Deep.handler;
per_thread : Fiber.Per_thread.t;
quota : int;
mutable fiber : Fiber.Maybe.t;
mutable remaining_quota : int;
mutable num_alive_fibers : int;
}
Expand All @@ -37,7 +37,7 @@ let rec next t =
match Mpscq.pop_exn t.ready with
| ready -> begin
t.remaining_quota <- t.quota;
t.fiber <-
t.per_thread.current <-
(match ready with
| Spawn (fiber, _)
| Continue (fiber, _)
Expand All @@ -51,7 +51,7 @@ let rec next t =
| Resume (fiber, k) -> Fiber.resume fiber k
end
| exception Mpscq.Empty ->
t.fiber <- Fiber.Maybe.nothing;
t.per_thread.current <- Fiber.Maybe.nothing;
if t.num_alive_fibers <> 0 then begin
if Atomic.get t.needs_wakeup then begin
Mutex.lock t.mutex;
Expand All @@ -75,6 +75,8 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
| None -> Int.max_int
| Some quota -> if quota <= 0 then quota_non_positive quota else quota
in
let per_thread = Fiber.Per_thread.get () in
per_thread.current <- Fiber.Maybe.of_fiber fiber;
{
ready = Mpscq.create ~padded:true ();
needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded;
Expand All @@ -86,8 +88,8 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
return = Obj.magic ();
discontinue = Obj.magic ();
handler = Obj.magic ();
per_thread;
quota;
fiber = Fiber.Maybe.of_fiber fiber;
remaining_quota = quota;
num_alive_fibers = 1;
}
Expand All @@ -101,7 +103,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
match e with
| Fiber.Current -> t.current
| Fiber.Spawn r ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.is_canceled fiber then t.discontinue
else begin
t.num_alive_fibers <- t.num_alive_fibers + 1;
Expand All @@ -110,7 +112,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.is_canceled fiber then t.discontinue
else
match
Expand All @@ -126,7 +128,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
| Trigger.Await trigger ->
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.try_suspend fiber trigger fiber k t.resume then
next t
else
Expand Down Expand Up @@ -164,12 +166,12 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
t.current <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Effect.Deep.continue k fiber);
t.yield <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Mpscq.push t.ready (Continue (fiber, k));
next t);
t.return <-
Expand All @@ -181,13 +183,14 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
Effect.Deep.continue k ()
end
else begin
Mpscq.push t.ready (Return (Fiber.Maybe.to_fiber t.fiber, k));
Mpscq.push t.ready
(Return (Fiber.Maybe.to_fiber t.per_thread.current, k));
next t
end);
t.discontinue <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Fiber.continue fiber k ());
Effect.Deep.match_with main fiber t.handler

Expand Down
66 changes: 27 additions & 39 deletions lib/picos_mux.random/picos_mux_random.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ type t = {
mutable run : bool;
}

let fiber_key : Fiber.Maybe.t ref Picos_thread.TLS.t =
Picos_thread.TLS.create ()

let get () =
match Picos_thread.TLS.get_exn fiber_key with
| p -> p
| exception Picos_thread.TLS.Not_set ->
let p = ref Fiber.Maybe.nothing in
Picos_thread.TLS.set fiber_key p;
p

let[@inline] relaxed_wakeup t ~known_not_empty =
if
t.num_waiters_non_zero
Expand All @@ -79,18 +68,16 @@ let[@inline] relaxed_wakeup t ~known_not_empty =
Condition.signal t.condition
end

let exec p t ready =
begin
p :=
match ready with
| Spawn (fiber, _)
| Raise (fiber, _, _, _)
| Return (fiber, _)
| Current (fiber, _)
| Continue (fiber, _)
| Resume (fiber, _) ->
Fiber.Maybe.of_fiber fiber
end;
let exec (p : Fiber.Per_thread.t) t ready =
p.current <-
(match ready with
| Spawn (fiber, _)
| Raise (fiber, _, _, _)
| Return (fiber, _)
| Current (fiber, _)
| Continue (fiber, _)
| Resume (fiber, _) ->
Fiber.Maybe.of_fiber fiber);
match ready with
| Spawn (fiber, main) -> Effect.Deep.match_with main fiber t.handler
| Raise (_, k, exn, bt) -> Effect.Deep.discontinue_with_backtrace k exn bt
Expand All @@ -105,7 +92,7 @@ let rec next p t =
relaxed_wakeup t ~known_not_empty:false;
exec p t ready
| exception Not_found ->
p := Fiber.Maybe.nothing;
p.current <- Fiber.Maybe.nothing;
if Atomic.get t.num_alive_fibers <> 0 then begin
Mutex.lock t.mutex;
let n = !(t.num_waiters) + 1 in
Expand Down Expand Up @@ -192,30 +179,30 @@ let context ?fatal_exn_handler () =
t.current <-
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Current (fiber, k));
next p t);
t.yield <-
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Continue (fiber, k));
next p t);
t.return <-
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Return (fiber, k));
next p t);
t.handler <-
{
retc =
(fun () ->
Atomic.decr t.num_alive_fibers;
let p = Picos_thread.TLS.get_exn fiber_key in
let p = Fiber.Per_thread.get () in
next p t);
exnc;
effc =
Expand All @@ -224,8 +211,8 @@ let context ?fatal_exn_handler () =
match e with
| Fiber.Current -> t.current
| Fiber.Spawn r ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.is_canceled fiber then t.yield
else begin
Atomic.incr t.num_alive_fibers;
Expand All @@ -235,8 +222,8 @@ let context ?fatal_exn_handler () =
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.is_canceled fiber then t.yield
else
match
Expand All @@ -254,8 +241,8 @@ let context ?fatal_exn_handler () =
| Trigger.Await trigger ->
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.try_suspend fiber trigger fiber k t.resume then
next p t
else begin
Expand All @@ -268,7 +255,7 @@ let context ?fatal_exn_handler () =

let runner_on_this_thread t =
Select.check_configured ();
next (get ()) t
next (Fiber.Per_thread.get ()) t

let rec await t =
if t.num_waiters_non_zero then begin
Expand Down Expand Up @@ -296,7 +283,8 @@ let run_fiber ?context:t_opt fiber main =
else begin
t.run <- true;
Mutex.unlock t.mutex;
get () := Fiber.Maybe.of_fiber fiber;
let p = Fiber.Per_thread.get () in
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.match_with main fiber t.handler;
Mutex.lock t.mutex;
await t
Expand Down
2 changes: 1 addition & 1 deletion test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
(modules test_picos_dscheck picos)
(build_if
(>= %{ocaml_version} 5))
(libraries backoff traced_atomic dscheck alcotest))
(libraries picos.thread backoff traced_atomic dscheck alcotest))

;;

Expand Down

0 comments on commit c63d381

Please sign in to comment.