Skip to content

Commit

Permalink
[refactor](pipelineX) refine union dependency (apache#27348)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored and seawinde committed Nov 28, 2023
1 parent fe91f0e commit 3e214e5
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 38 deletions.
8 changes: 6 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
Base::_shared_state->aggregate_evaluators.back()->set_timer(_exec_timer, _merge_timer,
_expr_timer);
}
if (p._is_streaming) {
Base::_shared_state->data_queue->set_sink_dependency(Base::_dependency, 0);
}
Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(
Expand Down Expand Up @@ -717,7 +720,7 @@ Status AggSinkLocalState<DependencyType, Derived>::try_spill_disk(bool eos) {
template <typename LocalStateType>
AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs, bool is_streaming)
: DataSinkOperatorX<LocalStateType>(operator_id, tnode.node_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
Expand All @@ -727,7 +730,8 @@ AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operato
_is_merge(false),
_pool(pool),
_limit(tnode.limit),
_have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) {
_have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()),
_is_streaming(is_streaming) {
_is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ template <typename LocalStateType = BlockingAggSinkLocalState>
class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
public:
AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool is_streaming = false);
~AggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
Expand Down Expand Up @@ -404,6 +404,7 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
size_t _spill_partition_count_bits;
int64_t _limit; // -1: no limit
bool _have_conjuncts;
const bool _is_streaming;
};

} // namespace pipeline
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._is_streaming) {
_shared_state->data_queue.reset(new DataQueue(1));
_shared_state->data_queue->set_dependency(_dependency,
info.upstream_dependencies.front().get());
_shared_state->data_queue->set_source_dependency(_dependency);
}
if (p._without_key) {
if (p._needs_finalize) {
Expand Down
31 changes: 17 additions & 14 deletions be/src/pipeline/exec/data_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ DataQueue::DataQueue(int child_count)
_is_canceled(child_count),
_cur_bytes_in_queue(child_count),
_cur_blocks_nums_in_queue(child_count),
_flag_queue_idx(0),
_source_dependency(nullptr),
_sink_dependency(nullptr) {
_flag_queue_idx(0) {
for (int i = 0; i < child_count; ++i) {
_queue_blocks_lock[i].reset(new std::mutex());
_free_blocks_lock[i].reset(new std::mutex());
Expand All @@ -51,6 +49,8 @@ DataQueue::DataQueue(int child_count)
_cur_bytes_in_queue[i] = 0;
_cur_blocks_nums_in_queue[i] = 0;
}
_un_finished_counter = child_count;
_sink_dependencies.resize(child_count, nullptr);
}

std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
Expand Down Expand Up @@ -118,11 +118,12 @@ Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
}
_cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
if (_sink_dependency) {
if (!_is_finished[_flag_queue_idx]) {
auto old_value = _cur_blocks_total_nums.fetch_sub(1);
if (old_value == 1 && _source_dependency) {
if (!is_all_finish()) {
_source_dependency->block();
}
_sink_dependency->set_ready();
_sink_dependencies[_flag_queue_idx]->set_ready();
}
} else {
if (_is_finished[_flag_queue_idx]) {
Expand All @@ -142,9 +143,10 @@ void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
_cur_bytes_in_queue[child_idx] += block->allocated_bytes();
_queue_blocks[child_idx].emplace_back(std::move(block));
_cur_blocks_nums_in_queue[child_idx] += 1;
if (_sink_dependency) {
_cur_blocks_total_nums++;
if (_source_dependency) {
_source_dependency->set_ready();
_sink_dependency->block();
_sink_dependencies[child_idx]->block();
}
//this only use to record the queue[0] for profile
_max_bytes_in_queue = std::max(_max_bytes_in_queue, _cur_bytes_in_queue[0].load());
Expand All @@ -154,10 +156,16 @@ void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i

void DataQueue::set_finish(int child_idx) {
std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]);
if (_is_finished[child_idx]) {
return;
}
_is_finished[child_idx] = true;
if (_source_dependency) {
_source_dependency->set_ready();
}
if (_un_finished_counter.fetch_sub(1) == 1) {
_is_all_finished = true;
}
}

void DataQueue::set_canceled(int child_idx) {
Expand All @@ -175,12 +183,7 @@ bool DataQueue::is_finish(int child_idx) {
}

bool DataQueue::is_all_finish() {
for (int i = 0; i < _child_count; ++i) {
if (_is_finished[i] == false) {
return false;
}
}
return true;
return _is_all_finished;
}

} // namespace pipeline
Expand Down
12 changes: 9 additions & 3 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ class DataQueue {
int64_t max_size_of_queue() const { return _max_size_of_queue; }

bool data_exhausted() const { return _data_exhausted; }
void set_dependency(Dependency* source_dependency, Dependency* sink_dependency) {
void set_source_dependency(Dependency* source_dependency) {
_source_dependency = source_dependency;
_sink_dependency = sink_dependency;
}
void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
_sink_dependencies[child_idx] = sink_dependency;
}

private:
Expand All @@ -80,10 +82,13 @@ class DataQueue {
//how many deque will be init, always will be one
int _child_count = 0;
std::vector<std::atomic_bool> _is_finished;
std::atomic_uint32_t _un_finished_counter;
std::atomic_bool _is_all_finished = false;
std::vector<std::atomic_bool> _is_canceled;
// int64_t just for counter of profile
std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
std::atomic_uint32_t _cur_blocks_total_nums = 0;

//this will be indicate which queue has data, it's useful when have many queues
std::atomic_int _flag_queue_idx = 0;
Expand All @@ -95,8 +100,9 @@ class DataQueue {
int64_t _max_size_of_queue = 0;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;

// data queue is multi sink one source
Dependency* _source_dependency = nullptr;
Dependency* _sink_dependency = nullptr;
std::vector<Dependency*> _sink_dependencies;
};

} // namespace pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool*
int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, operator_id, tnode, descs) {}
: AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, operator_id, tnode, descs,
true) {}

Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(AggSinkOperatorX<DistinctStreamingAggSinkLocalState>::init(tnode, state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ Status StreamingAggSinkLocalState::_pre_agg_with_serialized_key(
StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: AggSinkOperatorX<StreamingAggSinkLocalState>(pool, operator_id, tnode, descs) {}
: AggSinkOperatorX<StreamingAggSinkLocalState>(pool, operator_id, tnode, descs, true) {}

Status StreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(AggSinkOperatorX<StreamingAggSinkLocalState>::init(tnode, state));
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/union_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
_child_expr.resize(p._child_expr.size());
_shared_state->data_queue.set_sink_dependency(_dependency, p._cur_child_id);
for (size_t i = 0; i < p._child_expr.size(); i++) {
RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
}
RETURN_IF_ERROR(Base::init(state, info));
ss->data_queue.set_dependency(_dependency, info.upstream_dependencies.front().get());
ss->data_queue.set_source_dependency(_dependency);
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
// Const exprs materialized by this node. These exprs don't refer to any children.
Expand All @@ -141,6 +141,9 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(clone_expr_list(_const_expr_list, other_expr_list));
}
}
if (child_count == 0) {
_dependency->set_ready();
}
return Status::OK();
}

Expand Down
13 changes: 0 additions & 13 deletions be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,6 @@ class UnionSourceDependency final : public Dependency {
UnionSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "UnionSourceDependency", query_ctx) {}
~UnionSourceDependency() override = default;

[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
if (((UnionSharedState*)_shared_state.get())->child_count() == 0) {
return nullptr;
}
if (((UnionSharedState*)_shared_state.get())->data_queue.is_all_finish() ||
((UnionSharedState*)_shared_state.get())->data_queue.remaining_has_data()) {
return nullptr;
}
return this;
}
bool push_to_blocking_queue() const override { return true; }
void block() override {}
};

class UnionSourceOperatorX;
Expand Down

0 comments on commit 3e214e5

Please sign in to comment.