diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index 70e4bb80..5be467fa 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -77,7 +77,9 @@ let[@inline] try_wake_someone_ (self : state) : unit = Mutex.unlock self.mutex ) -let schedule_on_w (self : worker_state) task : unit = +(** Push into worker's local queue, open to work stealing. + precondition: this runs on the worker thread whose state is [self] *) +let schedule_on_current_worker (self : worker_state) task : unit = (* we're on this same pool, schedule in the worker's state. Otherwise we might also be on pool A but asking to schedule on pool B, so we have to check that identifiers match. *) @@ -92,7 +94,8 @@ let schedule_on_w (self : worker_state) task : unit = Mutex.unlock self.st.mutex ) -let schedule_on_main (self : state) task : unit = +(** Push into the shared queue of this pool *) +let schedule_in_main_queue (self : state) task : unit = if A.get self.active then ( (* push into the main queue *) Mutex.lock self.mutex; @@ -108,8 +111,8 @@ let schedule_from_w (self : worker_state) (task : WL.task_full) : unit = match get_current_worker_ () with | Some w when Id.equal self.st.id_ w.st.id_ -> (* use worker from the same pool *) - schedule_on_w w task - | _ -> schedule_on_main self.st task + schedule_on_current_worker w task + | _ -> schedule_in_main_queue self.st task exception Got_task of WL.task_full @@ -139,19 +142,19 @@ let[@inline] wait_for_condition_ (self : state) : unit = if self.n_waiting = 0 then self.n_waiting_nonzero <- false let rec get_next_task (self : worker_state) : WL.task_full = - if not (A.get self.st.active) then raise WL.No_more_tasks; + (* see if we can empty the local queue *) match WSQ.pop_exn self.q with | task -> try_wake_someone_ self.st; task - | exception WSQ.Empty -> try_steal_from_other_workers_ self + | exception WSQ.Empty -> try_to_steal_from_other_workers_ self -and try_steal_from_other_workers_ (self : worker_state) = +and try_to_steal_from_other_workers_ (self : worker_state) = match try_to_steal_work_once_ self with | exception Got_task task -> task - | () -> wait_on_worker self + | () -> wait_on_main_queue self -and wait_on_worker (self : worker_state) : WL.task_full = +and wait_on_main_queue (self : worker_state) : WL.task_full = Mutex.lock self.st.mutex; match Queue.pop self.st.main_q with | task -> @@ -169,7 +172,7 @@ and wait_on_worker (self : worker_state) : WL.task_full = task | exception Queue.Empty -> Mutex.unlock self.st.mutex; - try_steal_from_other_workers_ self + try_to_steal_from_other_workers_ self ) else ( (* do nothing more: no task in main queue, and we are shutting down so no new task should arrive. @@ -230,7 +233,8 @@ let shutdown_ ~wait (self : state) : unit = let as_runner_ (self : state) : t = Runner.For_runner_implementors.create ~shutdown:(fun ~wait () -> shutdown_ self ~wait) - ~run_async:(fun ~fiber f -> schedule_on_main self @@ T_start { fiber; f }) + ~run_async:(fun ~fiber f -> + schedule_in_main_queue self @@ T_start { fiber; f }) ~size:(fun () -> size_ self) ~num_tasks:(fun () -> num_tasks_ self) ()