Skip to content

Commit

Permalink
Introduce Per_thread cache
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Aug 19, 2024
1 parent ceaefe5 commit 850e944
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 43 deletions.
1 change: 1 addition & 0 deletions lib/picos/bootstrap/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
(package picos)
(libraries
(re_export picos.exn_bt)
picos.thread
backoff))
11 changes: 11 additions & 0 deletions lib/picos/bootstrap/picos_bootstrap.ml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ module Fiber = struct

type t = [ `Fiber ] tdt

module Maybe = struct
type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed]
end

let create_packed ~forbid packed = Fiber { forbid; packed; fls = [||] }

let create ~forbid computation =
Expand Down Expand Up @@ -392,3 +396,10 @@ module Handler = struct
await : 'c -> Trigger.t -> Exn_bt.t option;
}
end

module Per_thread = struct
type t = { mutable current : Fiber.Maybe.t (** *) }

let key = Picos_thread.TLS.new_key @@ fun () -> { current = T Nothing }
let get () = Picos_thread.TLS.get key
end
6 changes: 5 additions & 1 deletion lib/picos/ocaml5/picos_ocaml.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ module Fiber = struct

type _ Effect.t += Current : Fiber.t Effect.t

let current () = Effect.perform Current
let current () =
let p = Per_thread.get () in
match p.current with
| T Nothing -> Effect.perform Current
| T (Fiber _ as fiber) -> fiber

type _ Effect.t +=
| Spawn : { fiber : Fiber.t; main : Fiber.t -> unit } -> unit Effect.t
Expand Down
8 changes: 6 additions & 2 deletions lib/picos/picos.common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ module Fiber = struct
include Picos_ocaml.Fiber

module Maybe = struct
let[@inline never] not_a_fiber () = invalid_arg "not a fiber"
include Maybe

type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed]
let[@inline never] not_a_fiber () = invalid_arg "not a fiber"

let[@inline] to_fiber_or_current = function
| T Nothing -> current ()
Expand Down Expand Up @@ -85,3 +85,7 @@ module Handler = struct
include Picos_bootstrap.Handler
include Picos_ocaml.Handler
end

module Per_thread = struct
include Picos_bootstrap.Per_thread
end
10 changes: 10 additions & 0 deletions lib/picos/picos.mli
Original file line number Diff line number Diff line change
Expand Up @@ -1152,3 +1152,13 @@ module Handler : sig
implements an effect handler directly, because that is likely to perform
better. *)
end

module Per_thread : sig
(** *)

type t = { mutable current : Fiber.Maybe.t (** *) }
(** *)

val get : unit -> t
(** *)
end
28 changes: 14 additions & 14 deletions lib/picos_fifos/picos_fifos.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,31 @@ type t = {
return : ((unit, unit) Effect.Deep.continuation -> unit) option;
discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option;
handler : (unit, unit) Effect.Deep.handler;
per_thread : Per_thread.t;
quota : int;
mutable fiber : Fiber.Maybe.t;
mutable remaining_quota : int;
}

let rec next t =
match Picos_mpscq.pop_exn t.ready with
| Spawn (fiber, main) ->
t.fiber <- Fiber.Maybe.of_fiber fiber;
t.per_thread.current <- Fiber.Maybe.of_fiber fiber;
t.remaining_quota <- t.quota;
Effect.Deep.match_with main fiber t.handler
| Return (fiber, k) ->
t.fiber <- fiber;
t.per_thread.current <- fiber;
t.remaining_quota <- t.quota;
Effect.Deep.continue k ()
| Continue (fiber, k) ->
t.fiber <- Fiber.Maybe.of_fiber fiber;
t.per_thread.current <- Fiber.Maybe.of_fiber fiber;
t.remaining_quota <- t.quota;
Fiber.continue fiber k ()
| Resume (fiber, k) ->
t.fiber <- Fiber.Maybe.of_fiber fiber;
t.per_thread.current <- Fiber.Maybe.of_fiber fiber;
t.remaining_quota <- t.quota;
Fiber.resume fiber k
| exception Picos_mpscq.Empty ->
t.fiber <- Fiber.Maybe.nothing;
t.per_thread.current <- Fiber.Maybe.nothing;
if Atomic.get t.num_alive_fibers <> 0 then begin
if Atomic.get t.needs_wakeup then begin
Mutex.lock t.mutex;
Expand Down Expand Up @@ -87,7 +87,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
let rec t =
{
ready;
fiber = Fiber.Maybe.nothing;
per_thread = Per_thread.get ();
needs_wakeup;
num_alive_fibers;
mutex;
Expand All @@ -107,12 +107,12 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
later. *)
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Effect.Deep.continue k fiber)
and yield =
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Picos_mpscq.push t.ready (Continue (fiber, k));
next t)
and return =
Expand All @@ -124,13 +124,13 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
Effect.Deep.continue k ()
end
else begin
Picos_mpscq.push t.ready (Return (t.fiber, k));
Picos_mpscq.push t.ready (Return (t.per_thread.current, k));
next t
end)
and discontinue =
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
Fiber.continue fiber k ())
and handler = { retc; exnc; effc }
and[@alert "-handler"] effc :
Expand All @@ -143,7 +143,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
| Fiber.Spawn r ->
(* We check cancelation status once and then either perform the
whole operation or discontinue the fiber. *)
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.is_canceled fiber then t.discontinue
else begin
Atomic.incr t.num_alive_fibers;
Expand All @@ -154,7 +154,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
| Computation.Cancel_after r -> begin
(* We check cancelation status once and then either perform the
whole operation or discontinue the fiber. *)
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.is_canceled fiber then t.discontinue
else
match
Expand All @@ -168,7 +168,7 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
| Trigger.Await trigger ->
Some
(fun k ->
let fiber = Fiber.Maybe.to_fiber t.fiber in
let fiber = Fiber.Maybe.to_fiber t.per_thread.current in
if Fiber.try_suspend fiber trigger fiber k t.resume then next t
else
let remaining_quota = t.remaining_quota - 1 in
Expand Down
48 changes: 23 additions & 25 deletions lib/picos_randos/picos_randos.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,28 @@ type t = {
mutable run : bool;
}

let fiber_key = Picos_thread.TLS.new_key @@ fun () -> ref Fiber.Maybe.nothing

let rec next p t =
let rec next (p : Per_thread.t) t =
match Collection.pop_exn t.ready with
| Spawn (fiber, main) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.match_with main fiber t.handler
| Raise (fiber, k, exn_bt) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Exn_bt.discontinue k exn_bt
| Return (fiber, k) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.continue k ()
| Current (fiber, k) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Effect.Deep.continue k fiber
| Continue (fiber, k) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Fiber.continue fiber k ()
| Resume (fiber, k) ->
p := Fiber.Maybe.of_fiber fiber;
p.current <- Fiber.Maybe.of_fiber fiber;
Fiber.resume fiber k
| exception Not_found ->
p := Fiber.Maybe.nothing;
p.current <- Fiber.Maybe.nothing;
if Atomic.get t.num_alive_fibers <> 0 then begin
Mutex.lock t.mutex;
if Collection.is_empty t.ready && Atomic.get t.num_alive_fibers <> 0
Expand Down Expand Up @@ -158,22 +156,22 @@ let context ?fatal_exn_handler () =
and current =
Some
(fun k ->
let p = Picos_thread.TLS.get fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Current (fiber, k));
next p t)
and yield =
Some
(fun k ->
let p = Picos_thread.TLS.get fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Continue (fiber, k));
next p t)
and return =
Some
(fun k ->
let p = Picos_thread.TLS.get fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
Collection.push t.ready (Return (fiber, k));
next p t)
and handler = { retc; exnc; effc }
Expand All @@ -182,8 +180,8 @@ let context ?fatal_exn_handler () =
function
| Fiber.Current -> t.current
| Fiber.Spawn r ->
let p = Picos_thread.TLS.get fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.is_canceled fiber then t.yield
else begin
Atomic.incr t.num_alive_fibers;
Expand All @@ -193,8 +191,8 @@ let context ?fatal_exn_handler () =
end
| Fiber.Yield -> t.yield
| Computation.Cancel_after r -> begin
let p = Picos_thread.TLS.get fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.is_canceled fiber then t.yield
else
match
Expand All @@ -211,8 +209,8 @@ let context ?fatal_exn_handler () =
| Trigger.Await trigger ->
Some
(fun k ->
let p = Picos_thread.TLS.get fiber_key in
let fiber = Fiber.Maybe.to_fiber !p in
let p = Per_thread.get () in
let fiber = Fiber.Maybe.to_fiber p.current in
if Fiber.try_suspend fiber trigger fiber k t.resume then next p t
else begin
Collection.push t.ready (Resume (fiber, k));
Expand All @@ -221,14 +219,14 @@ let context ?fatal_exn_handler () =
| _ -> None
and retc () =
Atomic.decr t.num_alive_fibers;
let p = Picos_thread.TLS.get fiber_key in
let p = Per_thread.get () in
next p t
in
t

let runner_on_this_thread t =
Select.check_configured ();
next (Picos_thread.TLS.get fiber_key) t
next (Per_thread.get ()) t

let rec await t =
if !(t.num_waiters_non_zero) then begin
Expand Down Expand Up @@ -257,7 +255,7 @@ let run_fiber ?context:t_opt fiber main =
t.run <- true;
Mutex.unlock t.mutex;
Collection.push t.ready (Spawn (fiber, main));
next (Picos_thread.TLS.get fiber_key) t;
next (Per_thread.get ()) t;
Mutex.lock t.mutex;
await t
end
Expand Down
2 changes: 1 addition & 1 deletion test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
(modules test_picos_dscheck picos_bootstrap)
(build_if
(>= %{ocaml_version} 5))
(libraries picos.exn_bt backoff traced_atomic dscheck alcotest))
(libraries picos.exn_bt picos.thread backoff traced_atomic dscheck alcotest))

;;

Expand Down

0 comments on commit 850e944

Please sign in to comment.