diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 9344b4b05ea477c..f70d3e925cd8f0a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -140,6 +140,11 @@ PipelineFragmentContext::~PipelineFragmentContext() { } } _tasks.clear(); + for (auto& holder : _task_holders) { + auto expected = TaskState::VALID; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::INVALID)); + } + _task_holders.clear(); for (auto& runtime_states : _task_runtime_states) { for (auto& runtime_state : runtime_states) { runtime_state.reset(); @@ -445,14 +450,16 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); task_runtime_state->set_task_num(pipeline->num_tasks()); - auto task = std::make_unique( - new PipelineTask(pipeline, cur_task_id, task_runtime_state.get(), this, - pipeline_id_to_profile[pip_idx].get(), - get_local_exchange_state(pipeline), i)); + auto task = std::make_unique(pipeline, cur_task_id, + task_runtime_state.get(), this, + pipeline_id_to_profile[pip_idx].get(), + get_local_exchange_state(pipeline), i); pipeline->incr_created_tasks(i, task.get()); task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); + _task_holders.emplace_back( + std::shared_ptr(new TaskHolder(_tasks[i].back().get()))); } } @@ -1677,17 +1684,15 @@ Status PipelineFragmentContext::submit() { int submit_tasks = 0; Status st; auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); - for (auto& task : _tasks) { - for (auto& t : task) { - st = scheduler->schedule_task(t.get()); - if (!st) { - cancel(Status::InternalError("submit context to executor fail")); - std::lock_guard l(_task_mutex); - _total_tasks = submit_tasks; - break; - } - submit_tasks++; - } + for (auto& holder : _task_holders) { + st = scheduler->schedule_task(holder); + if (!st) { + cancel(Status::InternalError("submit context to executor fail")); + std::lock_guard l(_task_mutex); + _total_tasks = submit_tasks; + break; + } + submit_tasks++; } if (!st.ok()) { std::lock_guard l(_task_mutex); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 1674afa886d520d..59f9bd18af5632b 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -227,6 +227,7 @@ class PipelineFragmentContext : public TaskExecutionContext { OperatorPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector>> _tasks; + std::vector _task_holders; // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both // of it in pipeline task not the fragment_context diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index f7c5934d1d0a2df..6ed0244a21fa645 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -27,6 +27,7 @@ #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" #include "pipeline/pipeline.h" +#include "pipeline/task_queue.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/core/block.h" @@ -41,8 +42,6 @@ class PipelineFragmentContext; namespace doris::pipeline { -class MultiCoreTaskQueue; -class PriorityTaskQueue; class Dependency; class PipelineTask { @@ -218,6 +217,8 @@ class PipelineTask { bool wake_up_early() const { return _wake_up_early; } + void set_holder(std::shared_ptr holder) { _holder = holder; } + private: friend class RuntimeFilterDependency; bool _is_blocked(); @@ -299,6 +300,7 @@ class PipelineTask { std::atomic _running = false; std::atomic _eos = false; std::atomic _wake_up_early = false; + std::weak_ptr _holder; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 57c758a3a016a79..25d5cbc04f98201 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -29,7 +29,11 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" -PipelineTask* SubTaskQueue::try_take(bool is_steal) { +TaskHolder::TaskHolder(PipelineTask* task_) : task(task_), state(TaskState::VALID) { + task_->set_holder(std::enable_shared_from_this()); +} + +TaskHolderSPtr SubTaskQueue::try_take(bool is_steal) { if (_queue.empty()) { return nullptr; } @@ -54,7 +58,7 @@ void PriorityTaskQueue::close() { _wait_task.notify_all(); } -PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { +TaskHolderSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } @@ -90,13 +94,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) { return SUB_QUEUE_LEVEL - 1; } -PipelineTask* PriorityTaskQueue::try_take(bool is_steal) { +TaskHolderSPtr PriorityTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock lock(_work_size_mutex); return _try_take_unprotected(is_steal); } -PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { +TaskHolderSPtr PriorityTaskQueue::take(uint32_t timeout_ms) { std::unique_lock lock(_work_size_mutex); auto task = _try_take_unprotected(false); if (task) { @@ -111,7 +115,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { } } -Status PriorityTaskQueue::push(PipelineTask* task) { +Status PriorityTaskQueue::push(TaskHolderSPtr task) { if (_closed) { return Status::InternalError("WorkTaskQueue closed"); } @@ -149,8 +153,8 @@ void MultiCoreTaskQueue::close() { [](auto& prio_task_queue) { prio_task_queue.close(); }); } -PipelineTask* MultiCoreTaskQueue::take(int core_id) { - PipelineTask* task = nullptr; +TaskHolderSPtr MultiCoreTaskQueue::take(int core_id) { + TaskHolderSPtr task = nullptr; while (!_closed) { DCHECK(_prio_task_queues.size() > core_id) << " list size: " << _prio_task_queues.size() << " core_id: " << core_id @@ -178,7 +182,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { +TaskHolderSPtr MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(core_id < _core_size); int next_id = core_id; for (int i = 1; i < _core_size; ++i) { @@ -196,7 +200,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { return nullptr; } -Status MultiCoreTaskQueue::push_back(std::shared_ptr task) { +Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task) { int core_id = task->get_previous_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; @@ -204,7 +208,7 @@ Status MultiCoreTaskQueue::push_back(std::shared_ptr task) { return push_back(task, core_id); } -Status MultiCoreTaskQueue::push_back(std::shared_ptr& task, int core_id) { +Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task, int core_id) { DCHECK(core_id < _core_size || task->query_context()->is_cancelled()); task->put_in_runnable_queue(); return _prio_task_queues[task->query_context()->is_cancelled() ? _urgent_queue_idx : core_id] diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 9afb28efcd430ad..4d7b93f0e994c1f 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -34,13 +34,27 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" +enum class TaskState { + VALID = 0, // Valid task which is not running + RUNNING = 1, // Valid task which is executed by a thread + INVALID = 2, // Invalid task which already de-constructed. +}; + +struct TaskHolder : public std::enable_shared_from_this { + PipelineTask* task; + std::atomic state; + TaskHolder(PipelineTask* task_); +}; + +using TaskHolderSPtr = std::shared_ptr; + class SubTaskQueue { friend class PriorityTaskQueue; public: - void push_back(PipelineTask* task) { _queue.emplace(task); } + void push_back(TaskHolderSPtr task) { _queue.emplace(task); } - PipelineTask* try_take(bool is_steal); + TaskHolderSPtr try_take(bool is_steal); void set_level_factor(double level_factor) { _level_factor = level_factor; } @@ -58,7 +72,7 @@ class SubTaskQueue { bool empty() { return _queue.empty(); } private: - std::queue _queue; + std::queue _queue; // depends on LEVEL_QUEUE_TIME_FACTOR double _level_factor = 1; @@ -72,18 +86,18 @@ class PriorityTaskQueue { void close(); - PipelineTask* try_take(bool is_steal); + TaskHolderSPtr try_take(bool is_steal); - PipelineTask* take(uint32_t timeout_ms = 0); + TaskHolderSPtr take(uint32_t timeout_ms = 0); - Status push(PipelineTask* task); + Status push(TaskHolderSPtr task); void inc_sub_queue_runtime(int level, uint64_t runtime) { _sub_queues[level].inc_runtime(runtime); } private: - PipelineTask* _try_take_unprotected(bool is_steal); + TaskHolderSPtr _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; static constexpr size_t SUB_QUEUE_LEVEL = 6; SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL]; @@ -112,19 +126,19 @@ class MultiCoreTaskQueue { void close(); // Get the task by core id. - PipelineTask* take(int core_id); + TaskHolderSPtr take(int core_id); // TODO combine these methods to `push_back(task, core_id = -1)` - Status push_back(std::shared_ptr task); + Status push_back(TaskHolderSPtr task); - Status push_back(PipelineTask* task, int core_id); + Status push_back(TaskHolderSPtr, int core_id); void update_statistics(PipelineTask* task, int64_t time_spent); int num_queues() const { return cast_set(_prio_task_queues.size()); } private: - PipelineTask* _steal_take(int core_id); + TaskHolderSPtr _steal_take(int core_id); std::vector _prio_task_queues; std::atomic _next_core = 0; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 6420e453a5bd9b1..2cc3e654a95a843 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -66,18 +66,22 @@ Status TaskScheduler::start() { return Status::OK(); } -Status TaskScheduler::schedule_task(PipelineTask* task) { +Status TaskScheduler::schedule_task(TaskHolderSPtr task) { return _task_queue.push_back(task); } // after _close_task, task maybe destructed. -void _close_task(PipelineTask* task, Status exec_status) { +void _close_task(TaskHolderSPtr holder, Status exec_status) { // Has to attach memory tracker here, because the close task will also release some memory. // Should count the memory to the query or the query's memory will not decrease when part of // task finished. + auto* task = holder->task; SCOPED_ATTACH_TASK(task->runtime_state()); if (task->is_finalized()) { - task->set_running(false); + { + auto expected = TaskState::RUNNING; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID)); + } return; } // close_a_pipeline may delete fragment context and will core in some defer @@ -94,21 +98,32 @@ void _close_task(PipelineTask* task, Status exec_status) { } task->finalize(); task->set_running(false); + { + auto expected = TaskState::RUNNING; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID)); + } task->fragment_context()->close_a_pipeline(task->pipeline_id()); } void TaskScheduler::_do_work(int index) { while (_markers[index]) { - auto* task = _task_queue.take(index); - if (!task) { + auto holder = _task_queue.take(index); + if (!holder) { continue; } - if (task->is_running()) { - static_cast(_task_queue.push_back(task, index)); - continue; + { + auto expected = TaskState::VALID; + if (!holder->state.compare_exchange_strong(expected, TaskState::RUNNING)) { + if (expected == TaskState::RUNNING) { + static_cast(_task_queue.push_back(holder, index)); + } else { + DCHECK(expected == TaskState::INVALID); + } + continue; + } } + auto* task = holder->task; task->log_detail_if_need(); - task->set_running(true); task->set_task_queue(&_task_queue); auto* fragment_ctx = task->fragment_context(); bool canceled = fragment_ctx->is_canceled(); @@ -122,7 +137,7 @@ void TaskScheduler::_do_work(int index) { // If pipeline is canceled, it will report after pipeline closed, and will propagate // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); - _close_task(task, fragment_ctx->get_query_ctx()->exec_status()); + _close_task(holder, fragment_ctx->get_query_ctx()->exec_status()); continue; } @@ -165,7 +180,7 @@ void TaskScheduler::_do_work(int index) { LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}", print_id(task->query_context()->query_id()), status.to_string()); - _close_task(task, status); + _close_task(holder, status); continue; } fragment_ctx->trigger_report_if_necessary(); @@ -175,15 +190,21 @@ void TaskScheduler::_do_work(int index) { // added to running queue when dependency is ready. if (task->is_pending_finish()) { // Only meet eos, should set task to PENDING_FINISH state - task->set_running(false); + { + auto expected = TaskState::RUNNING; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID)); + } } else { Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, exec_status); + _close_task(holder, exec_status); } continue; } - task->set_running(false); + { + auto expected = TaskState::RUNNING; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID)); + } } } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 3c1b08063dfa61e..84bba2ae101948a 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -51,7 +51,7 @@ class TaskScheduler { ~TaskScheduler(); - Status schedule_task(PipelineTask* task); + Status schedule_task(TaskHolderSPtr task); Status start();