Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Per_thread cache #214

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions lib/picos/dune
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,25 @@

;;

(library
(name picos_thread_ocaml4)
(package picos)
(modules)
(rule
(enabled_if
(< %{ocaml_version} 5.0.0))
(libraries
(re_export picos.thread)))
(action
(copy intf.ocaml4.ml intf.ml)))

(rule
(enabled_if
(<= 5.0.0 %{ocaml_version}))
(action
(copy intf.ocaml5.ml intf.ml)))

;;

(library
(name picos)
(public_name picos)
(modules picos intf)
(libraries
backoff
(select
intf.ml
from
(picos_thread_ocaml4 -> intf.ocaml4.ml)
(-> intf.ocaml5.ml))))
(libraries backoff picos.thread))

(mdx
(package picos_meta)
Expand Down
1 change: 1 addition & 0 deletions lib/picos/intf.ocaml4.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ end

module type Fiber = sig
type t
type maybe
type _ computation
end
11 changes: 11 additions & 0 deletions lib/picos/intf.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ end

module type Fiber = sig
type t
type maybe
type _ computation

val resume :
Expand Down Expand Up @@ -158,4 +159,14 @@ module type Fiber = sig
type _ Effect.t +=
private
| Spawn : { fiber : t; main : t -> unit } -> unit Effect.t

module Per_thread : sig
(** *)

type t = { mutable current : maybe (** *) }
(** *)

val get : unit -> t
(** *)
end
end
5 changes: 4 additions & 1 deletion lib/picos/picos.mli
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,10 @@ module Fiber : sig
@raise Invalid_argument if the trigger is not in the signaled state. *)

include
Intf.Fiber with type t := t with type 'a computation := 'a Computation.t
Intf.Fiber
with type t := t
with type maybe = Maybe.t
with type 'a computation := 'a Computation.t

(** {2 Design rationale}

Expand Down
24 changes: 22 additions & 2 deletions lib/picos/picos.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ module Fiber = struct
if key < Array.length fls then Array.unsafe_set fls key unique
end

type maybe = T : [< `Nothing | `Fiber ] tdt -> maybe [@@unboxed]

(* END FIBER BOOTSTRAP *)

let resume t k = Effect.Deep.continue k (canceled t)
Expand All @@ -603,9 +605,27 @@ module Fiber = struct
| None -> Effect.Shallow.continue_with k v h
| Some (exn, bt) -> Effect.Shallow.discontinue_with_backtrace k exn bt h

module Per_thread = struct
type t = { mutable current : maybe (** *) }

let key = Picos_thread.TLS.create ()

let get () =
match Picos_thread.TLS.get_exn key with
| p -> p
| exception Picos_thread.TLS.Not_set ->
let p = { current = T Nothing } in
Picos_thread.TLS.set key p;
p
end

type _ Effect.t += Current : t Effect.t

let current () = Effect.perform Current
let current () =
let p = Per_thread.get () in
match p.current with
| T (Fiber _ as fiber) -> fiber
| T Nothing -> Effect.perform Current

type _ Effect.t += Spawn : { fiber : t; main : t -> unit } -> unit Effect.t

Expand All @@ -620,7 +640,7 @@ module Fiber = struct
module Maybe = struct
let[@inline never] not_a_fiber _ = invalid_arg "not a fiber"

type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed]
type t = maybe

let[@inline] to_fiber_or_current = function
| T Nothing -> current ()
Expand Down
25 changes: 14 additions & 11 deletions lib/picos_mux.fifo/picos_mux_fifo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type t = {
mutable return : ((unit, unit) Effect.Deep.continuation -> unit) option;
mutable discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option;
mutable handler : (unit, unit) Effect.Deep.handler;
per_thread : Fiber.Per_thread.t;
quota : int;
mutable fiber : Fiber.Maybe.t;
mutable remaining_quota : int;
mutable num_alive_fibers : int;
}
Expand All @@ -37,7 +37,7 @@ let rec next t =
match Mpscq.pop_exn t.ready with
| ready -> begin
t.remaining_quota <- t.quota;
t.fiber <-
t.per_thread.current <-
(match ready with
| Spawn (fiber, _)
| Continue (fiber, _)
Expand All @@ -51,7 +51,7 @@ let rec next t =
| Resume (fiber, k) -> Fiber.resume fiber k
end
| exception Mpscq.Empty ->
t.fiber <- Fiber.Maybe.nothing;
t.per_thread.current <- Fiber.Maybe.nothing;
if t.num_alive_fibers <> 0 then begin
if Atomic.get t.needs_wakeup then begin
Mutex.lock t.mutex;
Expand All @@ -75,6 +75,8 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
| None -> Int.max_int
| Some quota -> if quota <= 0 then quota_non_positive quota else quota
in
let per_thread = Fiber.Per_thread.get () in
per_thread.current <- Fiber.Maybe.of_fiber fiber;
{
ready = Mpscq.create ~padded:true ();
needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded;
Expand All @@ -86,8 +88,8 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
return = Obj.magic ();
discontinue = Obj.magic ();
handler = Obj.magic ();
per_thread;
quota;
fiber = Fiber.Maybe.of_fiber fiber;
remaining_quota = quota;
num_alive_fibers = 1;
}
Expand All @@ -101,7 +103,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
| Fiber.Current ->
(t.current : ((a, _) Effect.Deep.continuation -> _) option)
| Fiber.Spawn r ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.is_canceled fiber then t.discontinue
else begin
t.num_alive_fibers <- t.num_alive_fibers + 1;
Expand All @@ -110,7 +112,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.is_canceled fiber then t.discontinue
else
match
Expand All @@ -126,7 +128,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
| Trigger.Await trigger ->
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.try_suspend fiber trigger fiber k t.resume then
next t
else
Expand Down Expand Up @@ -164,12 +166,12 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
t.current <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Effect.Deep.continue k fiber);
t.yield <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Mpscq.push t.ready (Continue (fiber, k));
next t);
t.return <-
Expand All @@ -181,13 +183,14 @@ let run_fiber ?quota ?fatal_exn_handler fiber main =
Effect.Deep.continue k ()
end
else begin
Mpscq.push t.ready (Return (Fiber.Maybe.to_fiber t.fiber, k));
Mpscq.push t.ready
(Return (Fiber.Maybe.to_fiber t.per_thread.current, k));
next t
end);
t.discontinue <-
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Fiber.continue fiber k ());
Effect.Deep.match_with main fiber t.handler

Expand Down
66 changes: 27 additions & 39 deletions lib/picos_mux.random/picos_mux_random.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ type t = {
mutable run : bool;
}

let fiber_key : Fiber.Maybe.t ref Picos_thread.TLS.t =
Picos_thread.TLS.create ()

let get () =
match Picos_thread.TLS.get_exn fiber_key with
| p -> p
| exception Picos_thread.TLS.Not_set ->
let p = ref Fiber.Maybe.nothing in
Picos_thread.TLS.set fiber_key p;
p

let[@inline] relaxed_wakeup t ~known_not_empty =
if
t.num_waiters_non_zero
Expand All @@ -79,18 +68,16 @@ let[@inline] relaxed_wakeup t ~known_not_empty =
Condition.signal t.condition
end

let exec p t ready =
begin
p :=
match ready with
| Spawn (fiber, _)
| Raise (fiber, _, _, _)
| Return (fiber, _)
| Current (fiber, _)
| Continue (fiber, _)
| Resume (fiber, _) ->
Fiber.Maybe.of_fiber fiber
end;
let exec (p : Fiber.Per_thread.t) t ready =
p.current <-
(match ready with
| Spawn (fiber, _)
| Raise (fiber, _, _, _)
| Return (fiber, _)
| Current (fiber, _)
| Continue (fiber, _)
| Resume (fiber, _) ->
Fiber.Maybe.of_fiber fiber);
match ready with
| Spawn (fiber, main) -> Effect.Deep.match_with main fiber t.handler
| Raise (_, k, exn, bt) -> Effect.Deep.discontinue_with_backtrace k exn bt
Expand All @@ -105,7 +92,7 @@ let rec next p t =
relaxed_wakeup t ~known_not_empty:false;
exec p t ready
| exception Not_found ->
p := Fiber.Maybe.nothing;
p.current <- Fiber.Maybe.nothing;
if Atomic.get t.num_alive_fibers <> 0 then begin
Mutex.lock t.mutex;
let n = !(t.num_waiters) + 1 in
Expand Down Expand Up @@ -192,30 +179,30 @@ let context ?fatal_exn_handler () =
t.current <-
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Current (fiber, k));
next p t);
t.yield <-
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Continue (fiber, k));
next p t);
t.return <-
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Return (fiber, k));
next p t);
t.handler <-
{
retc =
(fun () ->
Atomic.decr t.num_alive_fibers;
let p = Picos_thread.TLS.get_exn fiber_key in
let p = Fiber.Per_thread.get () in
next p t);
exnc;
effc =
Expand All @@ -224,8 +211,8 @@ let context ?fatal_exn_handler () =
| Fiber.Current ->
(t.current : ((a, _) Effect.Deep.continuation -> _) option)
| Fiber.Spawn r ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.is_canceled fiber then t.yield
else begin
Atomic.incr t.num_alive_fibers;
Expand All @@ -235,8 +222,8 @@ let context ?fatal_exn_handler () =
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.is_canceled fiber then t.yield
else
match
Expand All @@ -254,8 +241,8 @@ let context ?fatal_exn_handler () =
| Trigger.Await trigger ->
Some
(fun k ->
let p = Picos_thread.TLS.get_exn fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Fiber.Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.try_suspend fiber trigger fiber k t.resume then
next p t
else begin
Expand All @@ -268,7 +255,7 @@ let context ?fatal_exn_handler () =

let runner_on_this_thread t =
Select.check_configured ();
next (get ()) t
next (Fiber.Per_thread.get ()) t

let rec await t =
if t.num_waiters_non_zero then begin
Expand Down Expand Up @@ -296,7 +283,8 @@ let run_fiber ?context:t_opt fiber main =
else begin
t.run <- true;
Mutex.unlock t.mutex;
get () := Fiber.Maybe.of_fiber fiber;
let p = Fiber.Per_thread.get () in
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.match_with main fiber t.handler;
Mutex.lock t.mutex;
await t
Expand Down
2 changes: 1 addition & 1 deletion test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
(modules test_picos_dscheck picos)
(build_if
(>= %{ocaml_version} 5))
(libraries backoff traced_atomic dscheck alcotest))
(libraries picos.thread backoff traced_atomic dscheck alcotest))

;;

Expand Down
Loading