diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 121fbd9324963c..e989af75b20e2e 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -80,10 +80,12 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask* task) { if (!static_cast(task)->push_blocked_task_to_queue()) { // put this task into current dependency's blocking queue and wait for event notification // instead of using a separate BlockedTaskScheduler. + task->set_running(false); return Status::OK(); } _blocked_tasks.push_back(task); _task_cond.notify_one(); + task->set_running(false); return Status::OK(); } @@ -337,7 +339,6 @@ void TaskScheduler::_do_work(size_t index) { } auto pipeline_state = task->get_state(); - task->set_running(false); switch (pipeline_state) { case PipelineTaskState::BLOCKED_FOR_SOURCE: case PipelineTaskState::BLOCKED_FOR_SINK: @@ -346,6 +347,7 @@ void TaskScheduler::_do_work(size_t index) { static_cast(_blocked_task_scheduler->add_blocked_task(task)); break; case PipelineTaskState::RUNNABLE: + task->set_running(false); static_cast(_task_queue->push_back(task, index)); break; default: