Skip to content

Commit

Permalink
[Improvement](scheduler) Use a separate eager queue to execute cancel…
Browse files Browse the repository at this point in the history
…ed tasks
  • Loading branch information
Gabriel39 committed Dec 30, 2024
1 parent f167700 commit 592a371
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 96 deletions.
27 changes: 16 additions & 11 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ PipelineFragmentContext::~PipelineFragmentContext() {
}
}
_tasks.clear();
for (auto& holder : _task_holders) {
auto expected = TaskState::VALID;
CHECK(holder->state.compare_exchange_strong(expected, TaskState::INVALID));
}
_task_holders.clear();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
runtime_state.reset();
Expand Down Expand Up @@ -453,6 +458,8 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
_task_holders.emplace_back(
std::shared_ptr<TaskHolder>(new TaskHolder(_tasks[i].back().get())));
}
}

Expand Down Expand Up @@ -1677,17 +1684,15 @@ Status PipelineFragmentContext::submit() {
int submit_tasks = 0;
Status st;
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
st = scheduler->schedule_task(t.get());
if (!st) {
cancel(Status::InternalError("submit context to executor fail"));
std::lock_guard<std::mutex> l(_task_mutex);
_total_tasks = submit_tasks;
break;
}
submit_tasks++;
}
for (auto& holder : _task_holders) {
st = scheduler->schedule_task(holder);
if (!st) {
cancel(Status::InternalError("submit context to executor fail"));
std::lock_guard<std::mutex> l(_task_mutex);
_total_tasks = submit_tasks;
break;
}
submit_tasks++;
}
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;
std::vector<TaskHolderSPtr> _task_holders;

// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
Expand Down
27 changes: 25 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ Status PipelineTask::execute(bool* eos) {
}
}

RETURN_IF_ERROR(get_task_queue()->push_back(this));
RETURN_IF_ERROR(get_task_queue()->push_back(_holder));
return Status::OK();
}

Expand Down Expand Up @@ -554,7 +554,30 @@ std::string PipelineTask::debug_string() {

void PipelineTask::wake_up() {
// call by dependency
static_cast<void>(get_task_queue()->push_back(this));
static_cast<void>(get_task_queue()->push_back(_holder));
}

void PipelineTask::clear_blocking_state() {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
}
for (auto& deps : _read_dependencies) {
for (auto* dep : deps) {
dep->set_always_ready();
}
}
for (auto* dep : _write_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _finish_dependencies) {
dep->set_always_ready();
}
}
}

QueryContext* PipelineTask::query_context() {
Expand Down
29 changes: 5 additions & 24 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline.h"
#include "pipeline/task_queue.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
Expand All @@ -41,8 +42,6 @@ class PipelineFragmentContext;

namespace doris::pipeline {

class MultiCoreTaskQueue;
class PriorityTaskQueue;
class Dependency;

class PipelineTask {
Expand Down Expand Up @@ -137,28 +136,7 @@ class PipelineTask {

void set_wake_up_early() { _wake_up_early = true; }

void clear_blocking_state() {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
}
for (auto& deps : _read_dependencies) {
for (auto* dep : deps) {
dep->set_always_ready();
}
}
for (auto* dep : _write_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _finish_dependencies) {
dep->set_always_ready();
}
}
}
void clear_blocking_state();

void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; }
MultiCoreTaskQueue* get_task_queue() { return _task_queue; }
Expand Down Expand Up @@ -239,6 +217,8 @@ class PipelineTask {

bool wake_up_early() const { return _wake_up_early; }

void set_holder(std::shared_ptr<TaskHolder> holder) { _holder = holder; }

private:
friend class RuntimeFilterDependency;
bool _is_blocked();
Expand Down Expand Up @@ -320,6 +300,7 @@ class PipelineTask {
std::atomic<bool> _running = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_early = false;
std::shared_ptr<TaskHolder> _holder;
};

} // namespace doris::pipeline
60 changes: 36 additions & 24 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

PipelineTask* SubTaskQueue::try_take(bool is_steal) {
TaskHolder::TaskHolder(PipelineTask* task_) : task(task_), state(TaskState::VALID) {
task_->set_holder(shared_from_this());
}

TaskHolderSPtr SubTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
Expand All @@ -54,7 +58,7 @@ void PriorityTaskQueue::close() {
_wait_task.notify_all();
}

PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
TaskHolderSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
Expand All @@ -75,7 +79,7 @@ PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {

auto task = _sub_queues[level].try_take(is_steal);
if (task) {
task->update_queue_level(level);
task->task->update_queue_level(level);
_total_task_size--;
}
return task;
Expand All @@ -90,13 +94,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
return SUB_QUEUE_LEVEL - 1;
}

PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
TaskHolderSPtr PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return _try_take_unprotected(is_steal);
}

PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
TaskHolderSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = _try_take_unprotected(false);
if (task) {
Expand All @@ -111,11 +115,11 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
}
}

Status PriorityTaskQueue::push(PipelineTask* task) {
Status PriorityTaskQueue::push(TaskHolderSPtr task) {
if (_closed) {
return Status::InternalError("WorkTaskQueue closed");
}
auto level = _compute_level(task->get_runtime_ns());
auto level = _compute_level(task->task->get_runtime_ns());
std::unique_lock<std::mutex> lock(_work_size_mutex);

// update empty queue's runtime, to avoid too high priority
Expand All @@ -132,8 +136,12 @@ Status PriorityTaskQueue::push(PipelineTask* task) {

MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;

static constexpr int NUM_EAGER_QUEUES = 1;
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
: _prio_task_queues(core_size), _closed(false), _core_size(core_size) {}
: _prio_task_queues(core_size + NUM_EAGER_QUEUES),
_closed(false),
_core_size(core_size),
_urgent_queue_idx(core_size) {}

void MultiCoreTaskQueue::close() {
if (_closed) {
Expand All @@ -145,34 +153,36 @@ void MultiCoreTaskQueue::close() {
[](auto& prio_task_queue) { prio_task_queue.close(); });
}

PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
TaskHolderSPtr MultiCoreTaskQueue::take(int core_id) {
TaskHolderSPtr task = nullptr;
while (!_closed) {
DCHECK(_prio_task_queues.size() > core_id)
<< " list size: " << _prio_task_queues.size() << " core_id: " << core_id
<< " _core_size: " << _core_size << " _next_core: " << _next_core.load();
task = _prio_task_queues[core_id].try_take(false);
if (task) {
task->set_core_id(core_id);
task->task->set_core_id(core_id);
break;
}
task = _steal_take(core_id);
if (task) {
break;
if (core_id != _urgent_queue_idx) {
task = _steal_take(core_id);
if (task) {
break;
}
}
task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
task->task->set_core_id(core_id);
break;
}
}
if (task) {
task->pop_out_runnable_queue();
task->task->pop_out_runnable_queue();
}
return task;
}

PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
TaskHolderSPtr MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(core_id < _core_size);
int next_id = core_id;
for (int i = 1; i < _core_size; ++i) {
Expand All @@ -183,25 +193,27 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(next_id < _core_size);
auto task = _prio_task_queues[next_id].try_take(true);
if (task) {
task->set_core_id(next_id);
task->task->set_core_id(next_id);
return task;
}
}
return nullptr;
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
int core_id = task->get_previous_core_id();
Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task) {
int core_id = task->task->get_previous_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
return push_back(task, core_id);
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
return _prio_task_queues[core_id].push(task);
Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task, int core_id) {
DCHECK(core_id < _core_size || task->task->query_context()->is_cancelled());
task->task->put_in_runnable_queue();
return _prio_task_queues[task->task->query_context()->is_cancelled() ? _urgent_queue_idx
: core_id]
.push(task);
}

void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
Expand Down
Loading

0 comments on commit 592a371

Please sign in to comment.