Skip to content

Commit

Permalink
fix pool: only return No_more_tasks when local and global q empty
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Aug 29, 2024
1 parent 7a8a32c commit 10fd733
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions src/core/ws_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ let[@inline] try_wake_someone_ (self : state) : unit =
Mutex.unlock self.mutex
)

let schedule_on_w (self : worker_state) task : unit =
(** Push into worker's local queue, open to work stealing.
precondition: this runs on the worker thread whose state is [self] *)
let schedule_on_current_worker (self : worker_state) task : unit =
(* we're on this same pool, schedule in the worker's state. Otherwise
we might also be on pool A but asking to schedule on pool B,
so we have to check that identifiers match. *)
Expand All @@ -92,7 +94,8 @@ let schedule_on_w (self : worker_state) task : unit =
Mutex.unlock self.st.mutex
)

let schedule_on_main (self : state) task : unit =
(** Push into the shared queue of this pool *)
let schedule_in_main_queue (self : state) task : unit =
if A.get self.active then (
(* push into the main queue *)
Mutex.lock self.mutex;
Expand All @@ -108,8 +111,8 @@ let schedule_from_w (self : worker_state) (task : WL.task_full) : unit =
match get_current_worker_ () with
| Some w when Id.equal self.st.id_ w.st.id_ ->
(* use worker from the same pool *)
schedule_on_w w task
| _ -> schedule_on_main self.st task
schedule_on_current_worker w task
| _ -> schedule_in_main_queue self.st task

exception Got_task of WL.task_full

Expand Down Expand Up @@ -139,19 +142,19 @@ let[@inline] wait_for_condition_ (self : state) : unit =
if self.n_waiting = 0 then self.n_waiting_nonzero <- false

let rec get_next_task (self : worker_state) : WL.task_full =
if not (A.get self.st.active) then raise WL.No_more_tasks;
(* see if we can empty the local queue *)
match WSQ.pop_exn self.q with
| task ->
try_wake_someone_ self.st;
task
| exception WSQ.Empty -> try_steal_from_other_workers_ self
| exception WSQ.Empty -> try_to_steal_from_other_workers_ self

and try_steal_from_other_workers_ (self : worker_state) =
and try_to_steal_from_other_workers_ (self : worker_state) =
match try_to_steal_work_once_ self with
| exception Got_task task -> task
| () -> wait_on_worker self
| () -> wait_on_main_queue self

and wait_on_worker (self : worker_state) : WL.task_full =
and wait_on_main_queue (self : worker_state) : WL.task_full =
Mutex.lock self.st.mutex;
match Queue.pop self.st.main_q with
| task ->
Expand All @@ -169,7 +172,7 @@ and wait_on_worker (self : worker_state) : WL.task_full =
task
| exception Queue.Empty ->
Mutex.unlock self.st.mutex;
try_steal_from_other_workers_ self
try_to_steal_from_other_workers_ self
) else (
(* do nothing more: no task in main queue, and we are shutting
down so no new task should arrive.
Expand Down Expand Up @@ -230,7 +233,8 @@ let shutdown_ ~wait (self : state) : unit =
let as_runner_ (self : state) : t =
Runner.For_runner_implementors.create
~shutdown:(fun ~wait () -> shutdown_ self ~wait)
~run_async:(fun ~fiber f -> schedule_on_main self @@ T_start { fiber; f })
~run_async:(fun ~fiber f ->
schedule_in_main_queue self @@ T_start { fiber; f })
~size:(fun () -> size_ self)
~num_tasks:(fun () -> num_tasks_ self)
()
Expand Down

0 comments on commit 10fd733

Please sign in to comment.