diff --git a/lib/picos/dune b/lib/picos/dune index 9ae3539c6..ab6f13573 100644 --- a/lib/picos/dune +++ b/lib/picos/dune @@ -35,26 +35,25 @@ ;; -(library - (name picos_thread_ocaml4) - (package picos) - (modules) +(rule (enabled_if (< %{ocaml_version} 5.0.0)) - (libraries - (re_export picos.thread))) + (action + (copy intf.ocaml4.ml intf.ml))) + +(rule + (enabled_if + (<= 5.0.0 %{ocaml_version})) + (action + (copy intf.ocaml5.ml intf.ml))) + +;; (library (name picos) (public_name picos) (modules picos intf) - (libraries - backoff - (select - intf.ml - from - (picos_thread_ocaml4 -> intf.ocaml4.ml) - (-> intf.ocaml5.ml)))) + (libraries backoff picos.thread)) (mdx (package picos_meta) diff --git a/lib/picos/intf.ocaml4.ml b/lib/picos/intf.ocaml4.ml index cbe151fbc..52032439d 100644 --- a/lib/picos/intf.ocaml4.ml +++ b/lib/picos/intf.ocaml4.ml @@ -8,5 +8,6 @@ end module type Fiber = sig type t + type maybe type _ computation end diff --git a/lib/picos/intf.ocaml5.ml b/lib/picos/intf.ocaml5.ml index 08fe0e970..3989486e4 100644 --- a/lib/picos/intf.ocaml5.ml +++ b/lib/picos/intf.ocaml5.ml @@ -65,6 +65,7 @@ end module type Fiber = sig type t + type maybe type _ computation val resume : @@ -152,4 +153,14 @@ module type Fiber = sig type _ Effect.t += private | Spawn : { fiber : t; main : t -> unit } -> unit Effect.t + + module Per_thread : sig + (** *) + + type t = { mutable current : maybe (** *) } + (** *) + + val get : unit -> t + (** *) + end end diff --git a/lib/picos/picos.mli b/lib/picos/picos.mli index 57daa745e..435822c12 100644 --- a/lib/picos/picos.mli +++ b/lib/picos/picos.mli @@ -1192,7 +1192,10 @@ module Fiber : sig @raise Invalid_argument if the trigger is not in the signaled state. *) include - Intf.Fiber with type t := t with type 'a computation := 'a Computation.t + Intf.Fiber + with type t := t + with type maybe = Maybe.t + with type 'a computation := 'a Computation.t (** {2 Design rationale} diff --git a/lib/picos/picos.ocaml5.ml b/lib/picos/picos.ocaml5.ml index 1dd789989..4e9b0dbb6 100644 --- a/lib/picos/picos.ocaml5.ml +++ b/lib/picos/picos.ocaml5.ml @@ -588,6 +588,8 @@ module Fiber = struct if key < Array.length fls then Array.unsafe_set fls key unique end + type maybe = T : [< `Nothing | `Fiber ] tdt -> maybe [@@unboxed] + (* END FIBER BOOTSTRAP *) let resume t k = Effect.Deep.continue k (canceled t) @@ -603,9 +605,27 @@ module Fiber = struct | None -> Effect.Shallow.continue_with k v h | Some (exn, bt) -> Effect.Shallow.discontinue_with_backtrace k exn bt h + module Per_thread = struct + type t = { mutable current : maybe (** *) } + + let key = Picos_thread.TLS.create () + + let get () = + match Picos_thread.TLS.get_exn key with + | p -> p + | exception Picos_thread.TLS.Not_set -> + let p = { current = T Nothing } in + Picos_thread.TLS.set key p; + p + end + type _ Effect.t += Current : t Effect.t - let current () = Effect.perform Current + let current () = + let p = Per_thread.get () in + match p.current with + | T (Fiber _ as fiber) -> fiber + | T Nothing -> Effect.perform Current type _ Effect.t += Spawn : { fiber : t; main : t -> unit } -> unit Effect.t @@ -620,7 +640,7 @@ module Fiber = struct module Maybe = struct let[@inline never] not_a_fiber _ = invalid_arg "not a fiber" - type t = T : [< `Nothing | `Fiber ] tdt -> t [@@unboxed] + type t = maybe let[@inline] to_fiber_or_current = function | T Nothing -> current () diff --git a/lib/picos_mux.fifo/picos_mux_fifo.ml b/lib/picos_mux.fifo/picos_mux_fifo.ml index 0c77b191e..dabd67dfa 100644 --- a/lib/picos_mux.fifo/picos_mux_fifo.ml +++ b/lib/picos_mux.fifo/picos_mux_fifo.ml @@ -27,8 +27,8 @@ type t = { mutable return : ((unit, unit) Effect.Deep.continuation -> unit) option; mutable discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option; mutable handler : (unit, unit) Effect.Deep.handler; + per_thread : Fiber.Per_thread.t; quota : int; - mutable fiber : Fiber.Maybe.t; mutable remaining_quota : int; mutable num_alive_fibers : int; } @@ -37,7 +37,7 @@ let rec next t = match Mpscq.pop_exn t.ready with | ready -> begin t.remaining_quota <- t.quota; - t.fiber <- + t.per_thread.current <- (match ready with | Spawn (fiber, _) | Continue (fiber, _) @@ -51,7 +51,7 @@ let rec next t = | Resume (fiber, k) -> Fiber.resume fiber k end | exception Mpscq.Empty -> - t.fiber <- Fiber.Maybe.nothing; + t.per_thread.current <- Fiber.Maybe.nothing; if t.num_alive_fibers <> 0 then begin if Atomic.get t.needs_wakeup then begin Mutex.lock t.mutex; @@ -75,6 +75,8 @@ let run_fiber ?quota ?fatal_exn_handler fiber main = | None -> Int.max_int | Some quota -> if quota <= 0 then quota_non_positive quota else quota in + let per_thread = Fiber.Per_thread.get () in + per_thread.current <- Fiber.Maybe.of_fiber fiber; { ready = Mpscq.create ~padded:true (); needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded; @@ -86,8 +88,8 @@ let run_fiber ?quota ?fatal_exn_handler fiber main = return = Obj.magic (); discontinue = Obj.magic (); handler = Obj.magic (); + per_thread; quota; - fiber = Fiber.Maybe.of_fiber fiber; remaining_quota = quota; num_alive_fibers = 1; } @@ -101,7 +103,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main = match e with | Fiber.Current -> t.current | Fiber.Spawn r -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in if Fiber.is_canceled fiber then t.discontinue else begin t.num_alive_fibers <- t.num_alive_fibers + 1; @@ -110,7 +112,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main = end | Fiber.Yield -> t.yield | Computation.Cancel_after r -> begin - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in if Fiber.is_canceled fiber then t.discontinue else match @@ -126,7 +128,7 @@ let run_fiber ?quota ?fatal_exn_handler fiber main = | Trigger.Await trigger -> Some (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in if Fiber.try_suspend fiber trigger fiber k t.resume then next t else @@ -164,12 +166,12 @@ let run_fiber ?quota ?fatal_exn_handler fiber main = t.current <- Some (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in Effect.Deep.continue k fiber); t.yield <- Some (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in Mpscq.push t.ready (Continue (fiber, k)); next t); t.return <- @@ -181,13 +183,14 @@ let run_fiber ?quota ?fatal_exn_handler fiber main = Effect.Deep.continue k () end else begin - Mpscq.push t.ready (Return (Fiber.Maybe.to_fiber t.fiber, k)); + Mpscq.push t.ready + (Return (Fiber.Maybe.to_fiber t.per_thread.current, k)); next t end); t.discontinue <- Some (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in + let fiber = Fiber.Maybe.to_fiber t.per_thread.current in Fiber.continue fiber k ()); Effect.Deep.match_with main fiber t.handler diff --git a/lib/picos_mux.random/picos_mux_random.ml b/lib/picos_mux.random/picos_mux_random.ml index 9948b416a..4b85c42cc 100644 --- a/lib/picos_mux.random/picos_mux_random.ml +++ b/lib/picos_mux.random/picos_mux_random.ml @@ -58,17 +58,6 @@ type t = { mutable run : bool; } -let fiber_key : Fiber.Maybe.t ref Picos_thread.TLS.t = - Picos_thread.TLS.create () - -let get () = - match Picos_thread.TLS.get_exn fiber_key with - | p -> p - | exception Picos_thread.TLS.Not_set -> - let p = ref Fiber.Maybe.nothing in - Picos_thread.TLS.set fiber_key p; - p - let[@inline] relaxed_wakeup t ~known_not_empty = if t.num_waiters_non_zero @@ -79,18 +68,16 @@ let[@inline] relaxed_wakeup t ~known_not_empty = Condition.signal t.condition end -let exec p t ready = - begin - p := - match ready with - | Spawn (fiber, _) - | Raise (fiber, _, _, _) - | Return (fiber, _) - | Current (fiber, _) - | Continue (fiber, _) - | Resume (fiber, _) -> - Fiber.Maybe.of_fiber fiber - end; +let exec (p : Fiber.Per_thread.t) t ready = + p.current <- + (match ready with + | Spawn (fiber, _) + | Raise (fiber, _, _, _) + | Return (fiber, _) + | Current (fiber, _) + | Continue (fiber, _) + | Resume (fiber, _) -> + Fiber.Maybe.of_fiber fiber); match ready with | Spawn (fiber, main) -> Effect.Deep.match_with main fiber t.handler | Raise (_, k, exn, bt) -> Effect.Deep.discontinue_with_backtrace k exn bt @@ -105,7 +92,7 @@ let rec next p t = relaxed_wakeup t ~known_not_empty:false; exec p t ready | exception Not_found -> - p := Fiber.Maybe.nothing; + p.current <- Fiber.Maybe.nothing; if Atomic.get t.num_alive_fibers <> 0 then begin Mutex.lock t.mutex; let n = !(t.num_waiters) + 1 in @@ -192,22 +179,22 @@ let context ?fatal_exn_handler () = t.current <- Some (fun k -> - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Fiber.Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in Collection.push t.ready (Current (fiber, k)); next p t); t.yield <- Some (fun k -> - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Fiber.Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in Collection.push t.ready (Continue (fiber, k)); next p t); t.return <- Some (fun k -> - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Fiber.Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in Collection.push t.ready (Return (fiber, k)); next p t); t.handler <- @@ -215,7 +202,7 @@ let context ?fatal_exn_handler () = retc = (fun () -> Atomic.decr t.num_alive_fibers; - let p = Picos_thread.TLS.get_exn fiber_key in + let p = Fiber.Per_thread.get () in next p t); exnc; effc = @@ -224,8 +211,8 @@ let context ?fatal_exn_handler () = match e with | Fiber.Current -> t.current | Fiber.Spawn r -> - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Fiber.Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in if Fiber.is_canceled fiber then t.yield else begin Atomic.incr t.num_alive_fibers; @@ -235,8 +222,8 @@ let context ?fatal_exn_handler () = end | Fiber.Yield -> t.yield | Computation.Cancel_after r -> begin - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Fiber.Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in if Fiber.is_canceled fiber then t.yield else match @@ -254,8 +241,8 @@ let context ?fatal_exn_handler () = | Trigger.Await trigger -> Some (fun k -> - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in + let p = Fiber.Per_thread.get () in + let fiber = Fiber.Maybe.to_fiber p.current in if Fiber.try_suspend fiber trigger fiber k t.resume then next p t else begin @@ -268,7 +255,7 @@ let context ?fatal_exn_handler () = let runner_on_this_thread t = Select.check_configured (); - next (get ()) t + next (Fiber.Per_thread.get ()) t let rec await t = if t.num_waiters_non_zero then begin @@ -296,7 +283,8 @@ let run_fiber ?context:t_opt fiber main = else begin t.run <- true; Mutex.unlock t.mutex; - get () := Fiber.Maybe.of_fiber fiber; + let p = Fiber.Per_thread.get () in + p.current <- Fiber.Maybe.of_fiber fiber; Effect.Deep.match_with main fiber t.handler; Mutex.lock t.mutex; await t diff --git a/test/dune b/test/dune index 0910e40ec..63e778ddf 100644 --- a/test/dune +++ b/test/dune @@ -87,7 +87,7 @@ (modules test_picos_dscheck picos) (build_if (>= %{ocaml_version} 5)) - (libraries backoff traced_atomic dscheck alcotest)) + (libraries picos.thread backoff traced_atomic dscheck alcotest)) ;;