Skip to content

Commit

Permalink
udpate
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Dec 30, 2024
1 parent a6fe5b5 commit 50bb1d5
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 53 deletions.
35 changes: 20 additions & 15 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ PipelineFragmentContext::~PipelineFragmentContext() {
}
}
_tasks.clear();
for (auto& holder : _task_holders) {
auto expected = TaskState::VALID;
CHECK(holder->state.compare_exchange_strong(expected, TaskState::INVALID));
}
_task_holders.clear();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
runtime_state.reset();
Expand Down Expand Up @@ -445,14 +450,16 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
auto task = std::make_unique<PipelineTask>(
new PipelineTask(pipeline, cur_task_id, task_runtime_state.get(), this,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i));
auto task = std::make_unique<PipelineTask>(pipeline, cur_task_id,
task_runtime_state.get(), this,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
pipeline->incr_created_tasks(i, task.get());
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
_task_holders.emplace_back(
std::shared_ptr<TaskHolder>(new TaskHolder(_tasks[i].back().get())));
}
}

Expand Down Expand Up @@ -1677,17 +1684,15 @@ Status PipelineFragmentContext::submit() {
int submit_tasks = 0;
Status st;
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
st = scheduler->schedule_task(t.get());
if (!st) {
cancel(Status::InternalError("submit context to executor fail"));
std::lock_guard<std::mutex> l(_task_mutex);
_total_tasks = submit_tasks;
break;
}
submit_tasks++;
}
for (auto& holder : _task_holders) {
st = scheduler->schedule_task(holder);
if (!st) {
cancel(Status::InternalError("submit context to executor fail"));
std::lock_guard<std::mutex> l(_task_mutex);
_total_tasks = submit_tasks;
break;
}
submit_tasks++;
}
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;
std::vector<TaskHolderSPtr> _task_holders;

// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline.h"
#include "pipeline/task_queue.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
Expand All @@ -41,8 +42,6 @@ class PipelineFragmentContext;

namespace doris::pipeline {

class MultiCoreTaskQueue;
class PriorityTaskQueue;
class Dependency;

class PipelineTask {
Expand Down Expand Up @@ -218,6 +217,8 @@ class PipelineTask {

bool wake_up_early() const { return _wake_up_early; }

void set_holder(std::shared_ptr<TaskHolder> holder) { _holder = holder; }

private:
friend class RuntimeFilterDependency;
bool _is_blocked();
Expand Down Expand Up @@ -299,6 +300,7 @@ class PipelineTask {
std::atomic<bool> _running = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_early = false;
std::weak_ptr<TaskHolder> _holder;
};

} // namespace doris::pipeline
24 changes: 14 additions & 10 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

PipelineTask* SubTaskQueue::try_take(bool is_steal) {
TaskHolder::TaskHolder(PipelineTask* task_) : task(task_), state(TaskState::VALID) {
task_->set_holder(std::enable_shared_from_this());
}

TaskHolderSPtr SubTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
Expand All @@ -54,7 +58,7 @@ void PriorityTaskQueue::close() {
_wait_task.notify_all();
}

PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
TaskHolderSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
Expand Down Expand Up @@ -90,13 +94,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
return SUB_QUEUE_LEVEL - 1;
}

PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
TaskHolderSPtr PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return _try_take_unprotected(is_steal);
}

PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
TaskHolderSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = _try_take_unprotected(false);
if (task) {
Expand All @@ -111,7 +115,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
}
}

Status PriorityTaskQueue::push(PipelineTask* task) {
Status PriorityTaskQueue::push(TaskHolderSPtr task) {
if (_closed) {
return Status::InternalError("WorkTaskQueue closed");
}
Expand Down Expand Up @@ -149,8 +153,8 @@ void MultiCoreTaskQueue::close() {
[](auto& prio_task_queue) { prio_task_queue.close(); });
}

PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
TaskHolderSPtr MultiCoreTaskQueue::take(int core_id) {
TaskHolderSPtr task = nullptr;
while (!_closed) {
DCHECK(_prio_task_queues.size() > core_id)
<< " list size: " << _prio_task_queues.size() << " core_id: " << core_id
Expand Down Expand Up @@ -178,7 +182,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
return task;
}

PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
TaskHolderSPtr MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(core_id < _core_size);
int next_id = core_id;
for (int i = 1; i < _core_size; ++i) {
Expand All @@ -196,15 +200,15 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
return nullptr;
}

Status MultiCoreTaskQueue::push_back(std::shared_ptr<PipelineTask> task) {
Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task) {
int core_id = task->get_previous_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
return push_back(task, core_id);
}

Status MultiCoreTaskQueue::push_back(std::shared_ptr<PipelineTask>& task, int core_id) {
Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task, int core_id) {
DCHECK(core_id < _core_size || task->query_context()->is_cancelled());
task->put_in_runnable_queue();
return _prio_task_queues[task->query_context()->is_cancelled() ? _urgent_queue_idx : core_id]
Expand Down
36 changes: 25 additions & 11 deletions be/src/pipeline/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,27 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

enum class TaskState {
VALID = 0, // Valid task which is not running
RUNNING = 1, // Valid task which is executed by a thread
INVALID = 2, // Invalid task which already de-constructed.
};

struct TaskHolder : public std::enable_shared_from_this<TaskExecutionContext> {
PipelineTask* task;
std::atomic<TaskState> state;
TaskHolder(PipelineTask* task_);
};

using TaskHolderSPtr = std::shared_ptr<TaskHolder>;

class SubTaskQueue {
friend class PriorityTaskQueue;

public:
void push_back(PipelineTask* task) { _queue.emplace(task); }
void push_back(TaskHolderSPtr task) { _queue.emplace(task); }

PipelineTask* try_take(bool is_steal);
TaskHolderSPtr try_take(bool is_steal);

void set_level_factor(double level_factor) { _level_factor = level_factor; }

Expand All @@ -58,7 +72,7 @@ class SubTaskQueue {
bool empty() { return _queue.empty(); }

private:
std::queue<PipelineTask*> _queue;
std::queue<TaskHolderSPtr> _queue;
// depends on LEVEL_QUEUE_TIME_FACTOR
double _level_factor = 1;

Expand All @@ -72,18 +86,18 @@ class PriorityTaskQueue {

void close();

PipelineTask* try_take(bool is_steal);
TaskHolderSPtr try_take(bool is_steal);

PipelineTask* take(uint32_t timeout_ms = 0);
TaskHolderSPtr take(uint32_t timeout_ms = 0);

Status push(PipelineTask* task);
Status push(TaskHolderSPtr task);

void inc_sub_queue_runtime(int level, uint64_t runtime) {
_sub_queues[level].inc_runtime(runtime);
}

private:
PipelineTask* _try_take_unprotected(bool is_steal);
TaskHolderSPtr _try_take_unprotected(bool is_steal);
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
static constexpr size_t SUB_QUEUE_LEVEL = 6;
SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
Expand Down Expand Up @@ -112,19 +126,19 @@ class MultiCoreTaskQueue {
void close();

// Get the task by core id.
PipelineTask* take(int core_id);
TaskHolderSPtr take(int core_id);

// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(std::shared_ptr<PipelineTask> task);
Status push_back(TaskHolderSPtr task);

Status push_back(PipelineTask* task, int core_id);
Status push_back(TaskHolderSPtr, int core_id);

void update_statistics(PipelineTask* task, int64_t time_spent);

int num_queues() const { return cast_set<int>(_prio_task_queues.size()); }

private:
PipelineTask* _steal_take(int core_id);
TaskHolderSPtr _steal_take(int core_id);

std::vector<PriorityTaskQueue> _prio_task_queues;
std::atomic<uint32_t> _next_core = 0;
Expand Down
49 changes: 35 additions & 14 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,22 @@ Status TaskScheduler::start() {
return Status::OK();
}

Status TaskScheduler::schedule_task(PipelineTask* task) {
Status TaskScheduler::schedule_task(TaskHolderSPtr task) {
return _task_queue.push_back(task);
}

// after _close_task, task maybe destructed.
void _close_task(PipelineTask* task, Status exec_status) {
void _close_task(TaskHolderSPtr holder, Status exec_status) {
// Has to attach memory tracker here, because the close task will also release some memory.
// Should count the memory to the query or the query's memory will not decrease when part of
// task finished.
auto* task = holder->task;
SCOPED_ATTACH_TASK(task->runtime_state());
if (task->is_finalized()) {
task->set_running(false);
{
auto expected = TaskState::RUNNING;
CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID));
}
return;
}
// close_a_pipeline may delete fragment context and will core in some defer
Expand All @@ -94,21 +98,32 @@ void _close_task(PipelineTask* task, Status exec_status) {
}
task->finalize();
task->set_running(false);
{
auto expected = TaskState::RUNNING;
CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID));
}
task->fragment_context()->close_a_pipeline(task->pipeline_id());
}

void TaskScheduler::_do_work(int index) {
while (_markers[index]) {
auto* task = _task_queue.take(index);
if (!task) {
auto holder = _task_queue.take(index);
if (!holder) {
continue;
}
if (task->is_running()) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
{
auto expected = TaskState::VALID;
if (!holder->state.compare_exchange_strong(expected, TaskState::RUNNING)) {
if (expected == TaskState::RUNNING) {
static_cast<void>(_task_queue.push_back(holder, index));
} else {
DCHECK(expected == TaskState::INVALID);
}
continue;
}
}
auto* task = holder->task;
task->log_detail_if_need();
task->set_running(true);
task->set_task_queue(&_task_queue);
auto* fragment_ctx = task->fragment_context();
bool canceled = fragment_ctx->is_canceled();
Expand All @@ -122,7 +137,7 @@ void TaskScheduler::_do_work(int index) {
// If pipeline is canceled, it will report after pipeline closed, and will propagate
// errors to downstream through exchange. So, here we needn't send_report.
// fragment_ctx->send_report(true);
_close_task(task, fragment_ctx->get_query_ctx()->exec_status());
_close_task(holder, fragment_ctx->get_query_ctx()->exec_status());
continue;
}

Expand Down Expand Up @@ -165,7 +180,7 @@ void TaskScheduler::_do_work(int index) {
LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}",
print_id(task->query_context()->query_id()),
status.to_string());
_close_task(task, status);
_close_task(holder, status);
continue;
}
fragment_ctx->trigger_report_if_necessary();
Expand All @@ -175,15 +190,21 @@ void TaskScheduler::_do_work(int index) {
// added to running queue when dependency is ready.
if (task->is_pending_finish()) {
// Only meet eos, should set task to PENDING_FINISH state
task->set_running(false);
{
auto expected = TaskState::RUNNING;
CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID));
}
} else {
Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
_close_task(task, exec_status);
_close_task(holder, exec_status);
}
continue;
}

task->set_running(false);
{
auto expected = TaskState::RUNNING;
CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TaskScheduler {

~TaskScheduler();

Status schedule_task(PipelineTask* task);
Status schedule_task(TaskHolderSPtr task);

Status start();

Expand Down

0 comments on commit 50bb1d5

Please sign in to comment.