Skip to content

Commit

Permalink
Revert "use shared ptr"
Browse files Browse the repository at this point in the history
This reverts commit 42532ad.
  • Loading branch information
Gabriel39 committed Dec 30, 2024
1 parent 8011705 commit a6fe5b5
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 32 deletions.
6 changes: 3 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,14 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
auto task = std::shared_ptr<PipelineTask>(
auto task = std::make_unique<PipelineTask>(
new PipelineTask(pipeline, cur_task_id, task_runtime_state.get(), this,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i));
pipeline->incr_created_tasks(i, task.get());
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(task);
_tasks[i].emplace_back(std::move(task));
}
}

Expand Down Expand Up @@ -1679,7 +1679,7 @@ Status PipelineFragmentContext::submit() {
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
st = scheduler->schedule_task(t);
st = scheduler->schedule_task(t.get());
if (!st) {
cancel(Status::InternalError("submit context to executor fail"));
std::lock_guard<std::mutex> l(_task_mutex);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;
std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;

// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ std::string PipelineTask::debug_string() {

void PipelineTask::wake_up() {
// call by dependency
<<<<<<< HEAD
static_cast<void>(get_task_queue()->push_back(shared_from_this()));
}

Expand All @@ -578,10 +579,12 @@ void PipelineTask::clear_blocking_state() {
dep->set_always_ready();
}
}
=======
static_cast<void>(get_task_queue()->push_back(this, query_context()->is_cancelled()));
>>>>>>> parent of 42532ad7b2 (use shared ptr)
}

QueryContext* PipelineTask::query_context() {
return _fragment_context->get_query_ctx();
}

} // namespace doris::pipeline
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MultiCoreTaskQueue;
class PriorityTaskQueue;
class Dependency;

class PipelineTask final : public std::enable_shared_from_this<PipelineTask> {
class PipelineTask {
public:
PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
Expand Down
16 changes: 8 additions & 8 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

std::shared_ptr<PipelineTask> SubTaskQueue::try_take(bool is_steal) {
PipelineTask* SubTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
Expand All @@ -54,7 +54,7 @@ void PriorityTaskQueue::close() {
_wait_task.notify_all();
}

std::shared_ptr<PipelineTask> PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
Expand Down Expand Up @@ -90,13 +90,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
return SUB_QUEUE_LEVEL - 1;
}

std::shared_ptr<PipelineTask> PriorityTaskQueue::try_take(bool is_steal) {
PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return _try_take_unprotected(is_steal);
}

std::shared_ptr<PipelineTask> PriorityTaskQueue::take(uint32_t timeout_ms) {
PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = _try_take_unprotected(false);
if (task) {
Expand All @@ -111,7 +111,7 @@ std::shared_ptr<PipelineTask> PriorityTaskQueue::take(uint32_t timeout_ms) {
}
}

Status PriorityTaskQueue::push(std::shared_ptr<PipelineTask>& task) {
Status PriorityTaskQueue::push(PipelineTask* task) {
if (_closed) {
return Status::InternalError("WorkTaskQueue closed");
}
Expand Down Expand Up @@ -149,8 +149,8 @@ void MultiCoreTaskQueue::close() {
[](auto& prio_task_queue) { prio_task_queue.close(); });
}

std::shared_ptr<PipelineTask> MultiCoreTaskQueue::take(int core_id) {
std::shared_ptr<PipelineTask> task = nullptr;
PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
while (!_closed) {
DCHECK(_prio_task_queues.size() > core_id)
<< " list size: " << _prio_task_queues.size() << " core_id: " << core_id
Expand Down Expand Up @@ -178,7 +178,7 @@ std::shared_ptr<PipelineTask> MultiCoreTaskQueue::take(int core_id) {
return task;
}

std::shared_ptr<PipelineTask> MultiCoreTaskQueue::_steal_take(int core_id) {
PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(core_id < _core_size);
int next_id = core_id;
for (int i = 1; i < _core_size; ++i) {
Expand Down
20 changes: 10 additions & 10 deletions be/src/pipeline/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ class SubTaskQueue {
friend class PriorityTaskQueue;

public:
void push_back(std::shared_ptr<PipelineTask>& task) { _queue.emplace(task); }
void push_back(PipelineTask* task) { _queue.emplace(task); }

std::shared_ptr<PipelineTask> try_take(bool is_steal);
PipelineTask* try_take(bool is_steal);

void set_level_factor(double level_factor) { _level_factor = level_factor; }

Expand All @@ -58,7 +58,7 @@ class SubTaskQueue {
bool empty() { return _queue.empty(); }

private:
std::queue<std::shared_ptr<PipelineTask>> _queue;
std::queue<PipelineTask*> _queue;
// depends on LEVEL_QUEUE_TIME_FACTOR
double _level_factor = 1;

Expand All @@ -72,18 +72,18 @@ class PriorityTaskQueue {

void close();

std::shared_ptr<PipelineTask> try_take(bool is_steal);
PipelineTask* try_take(bool is_steal);

std::shared_ptr<PipelineTask> take(uint32_t timeout_ms = 0);
PipelineTask* take(uint32_t timeout_ms = 0);

Status push(std::shared_ptr<PipelineTask>& task);
Status push(PipelineTask* task);

void inc_sub_queue_runtime(int level, uint64_t runtime) {
_sub_queues[level].inc_runtime(runtime);
}

private:
std::shared_ptr<PipelineTask> _try_take_unprotected(bool is_steal);
PipelineTask* _try_take_unprotected(bool is_steal);
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
static constexpr size_t SUB_QUEUE_LEVEL = 6;
SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
Expand Down Expand Up @@ -112,19 +112,19 @@ class MultiCoreTaskQueue {
void close();

// Get the task by core id.
std::shared_ptr<PipelineTask> take(int core_id);
PipelineTask* take(int core_id);

// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(std::shared_ptr<PipelineTask> task);

Status push_back(std::shared_ptr<PipelineTask>& task, int core_id);
Status push_back(PipelineTask* task, int core_id);

void update_statistics(PipelineTask* task, int64_t time_spent);

int num_queues() const { return cast_set<int>(_prio_task_queues.size()); }

private:
std::shared_ptr<PipelineTask> _steal_take(int core_id);
PipelineTask* _steal_take(int core_id);

std::vector<PriorityTaskQueue> _prio_task_queues;
std::atomic<uint32_t> _next_core = 0;
Expand Down
13 changes: 6 additions & 7 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ Status TaskScheduler::start() {
return Status::OK();
}

Status TaskScheduler::schedule_task(std::shared_ptr<PipelineTask>& task) {
Status TaskScheduler::schedule_task(PipelineTask* task) {
return _task_queue.push_back(task);
}

// after _close_task, task maybe destructed.
void _close_task(std::shared_ptr<PipelineTask> task, Status exec_status) {
void _close_task(PipelineTask* task, Status exec_status) {
// Has to attach memory tracker here, because the close task will also release some memory.
// Should count the memory to the query or the query's memory will not decrease when part of
// task finished.
Expand All @@ -95,12 +95,11 @@ void _close_task(std::shared_ptr<PipelineTask> task, Status exec_status) {
task->finalize();
task->set_running(false);
task->fragment_context()->close_a_pipeline(task->pipeline_id());
task.reset();
}

void TaskScheduler::_do_work(int index) {
while (_markers[index]) {
auto task = _task_queue.take(index);
auto* task = _task_queue.take(index);
if (!task) {
continue;
}
Expand All @@ -123,7 +122,7 @@ void TaskScheduler::_do_work(int index) {
// If pipeline is canceled, it will report after pipeline closed, and will propagate
// errors to downstream through exchange. So, here we needn't send_report.
// fragment_ctx->send_report(true);
_close_task(std::move(task), fragment_ctx->get_query_ctx()->exec_status());
_close_task(task, fragment_ctx->get_query_ctx()->exec_status());
continue;
}

Expand Down Expand Up @@ -166,7 +165,7 @@ void TaskScheduler::_do_work(int index) {
LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}",
print_id(task->query_context()->query_id()),
status.to_string());
_close_task(std::move(task), status);
_close_task(task, status);
continue;
}
fragment_ctx->trigger_report_if_necessary();
Expand All @@ -179,7 +178,7 @@ void TaskScheduler::_do_work(int index) {
task->set_running(false);
} else {
Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
_close_task(std::move(task), exec_status);
_close_task(task, exec_status);
}
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TaskScheduler {

~TaskScheduler();

Status schedule_task(std::shared_ptr<PipelineTask>& task);
Status schedule_task(PipelineTask* task);

Status start();

Expand Down

0 comments on commit a6fe5b5

Please sign in to comment.