Skip to content

Commit

Permalink
[opt](spill) Optimize the logic for triggering spilling
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Jan 10, 2025
1 parent 0cb542e commit 29c8b1a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
38 changes: 21 additions & 17 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,8 @@ Status PipelineTask::execute(bool* eos) {
debug_msg += fmt::format(", debug info: {}",
GlobalMemoryArbitrator::process_mem_log_str());
}
LOG(INFO) << debug_msg;

ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), reserve_size, st);
continue;
LOG_EVERY_N(INFO, 100) << debug_msg;
_state->get_query_ctx()->set_low_memory_mode();
}
}

Expand All @@ -443,11 +440,13 @@ Status PipelineTask::execute(bool* eos) {
Status status = Status::OK();
DEFER_RELEASE_RESERVED();
COUNTER_UPDATE(_memory_reserve_times, 1);
const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos);
auto workload_group = _state->get_query_ctx()->workload_group();
if (_state->enable_reserve_memory() && workload_group &&
!(wake_up_early() || _dry_run)) {
status = thread_context()->try_reserve_memory(sink_reserve_size);
const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos);
status = sink_reserve_size != 0
? thread_context()->try_reserve_memory(sink_reserve_size)
: Status::OK();

if (status.ok() && _state->enable_force_spill() && _sink->is_spillable() &&
_sink->revocable_mem_size(_state) >=
Expand All @@ -468,16 +467,21 @@ Status PipelineTask::execute(bool* eos) {
debug_msg += fmt::format(", debug info: {}",
GlobalMemoryArbitrator::process_mem_log_str());
}
VLOG_DEBUG << debug_msg;

DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
_block = vectorized::Block::create_unique(_pending_block->clone_empty());
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), sink_reserve_size, status);
_pending_eos = *eos;
*eos = false;
continue;

if (_sink->revocable_mem_size(_state) >= _state->spill_min_revocable_mem()) {
VLOG_DEBUG << debug_msg;
DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
_block = vectorized::Block::create_unique(_pending_block->clone_empty());
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), sink_reserve_size,
status);
_pending_eos = *eos;
*eos = false;
continue;
} else {
_state->get_query_ctx()->set_low_memory_mode();
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerC
}
LOG(INFO) << debug_msg;

ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
state->get_query_ctx()->shared_from_this(), reserve_size, st);
state->get_query_ctx()->set_low_memory_mode();
}

void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
Expand Down

0 comments on commit 29c8b1a

Please sign in to comment.