Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jan 22, 2025
1 parent 99648ab commit daf382f
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 159 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Status BaseDeltaWriter::init() {
}
auto* t_ctx = doris::thread_context(true);
std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (t_ctx) {
if (t_ctx && t_ctx->is_attach_task()) {
wg_sptr = t_ctx->resource_ctx()->workload_group_context()->workload_group();
}
RETURN_IF_ERROR(_rowset_builder->init());
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,9 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para

// This may be a first fragment request of the query.
// Create the query fragments context.
query_ctx = QueryContext::create_shared(
query_id, _exec_env, params.query_options, params.coord,
params.is_nereids, params.current_connect_fe, query_source);
query_ctx = QueryContext::create(query_id, _exec_env, params.query_options,
params.coord, params.is_nereids,
params.current_connect_fe, query_source);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
RETURN_IF_ERROR(DescriptorTbl::create(
&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
Expand Down
22 changes: 17 additions & 5 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ std::unique_ptr<TaskController> QueryContext::QueryTaskController::create(QueryC
}

bool QueryContext::QueryTaskController::is_cancelled() const {
auto query_ctx = ensure_query_ctx();
auto query_ctx = query_ctx_.lock();
if (query_ctx == nullptr) {
return true;
}
return query_ctx->is_cancelled();
}

Status QueryContext::QueryTaskController::cancel(const Status& reason, int fragment_id) {
auto query_ctx = ensure_query_ctx();
auto query_ctx = query_ctx_.lock();
if (query_ctx == nullptr) {
return Status::InternalError("QueryContext is destroyed");
}
Expand All @@ -95,6 +95,17 @@ std::unique_ptr<MemoryContext> QueryContext::QueryMemoryContext::create() {
return QueryContext::QueryMemoryContext::create_unique();
}

std::shared_ptr<QueryContext> QueryContext::create(TUniqueId query_id, ExecEnv* exec_env,
const TQueryOptions& query_options,
TNetworkAddress coord_addr, bool is_nereids,
TNetworkAddress current_connect_fe,
QuerySource query_type) {
auto ctx = QueryContext::create_shared(query_id, exec_env, query_options, coord_addr,
is_nereids, current_connect_fe, query_type);
ctx->init_query_task_controller();
return ctx;
}

QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
const TQueryOptions& query_options, TNetworkAddress coord_addr,
bool is_nereids, TNetworkAddress current_connect_fe,
Expand Down Expand Up @@ -170,12 +181,13 @@ void QueryContext::_init_query_mem_tracker() {

void QueryContext::_init_resource_context() {
resource_ctx = ResourceContext::create_shared();
resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create());
_init_query_mem_tracker();
}

void QueryContext::init_query_task_controller() {
resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this));
resource_ctx->task_controller()->set_task_id(_query_id);

resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create());
_init_query_mem_tracker();
}

QueryContext::~QueryContext() {
Expand Down
13 changes: 9 additions & 4 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
QueryTaskController(const std::shared_ptr<QueryContext>& query_ctx)
: query_ctx_(query_ctx) {}

// Gets the shared pointer to the associated query ctx to ensure its
// liveness during the query control operation.
std::shared_ptr<QueryContext> ensure_query_ctx() const { return query_ctx_.lock(); }

const std::weak_ptr<QueryContext> query_ctx_;
};

Expand Down Expand Up @@ -136,12 +132,21 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
QueryMemoryContext() = default;
};

static std::shared_ptr<QueryContext> create(TUniqueId query_id, ExecEnv* exec_env,
const TQueryOptions& query_options,
TNetworkAddress coord_addr, bool is_nereids,
TNetworkAddress current_connect_fe,
QuerySource query_type);

// use QueryContext::create, cannot be made private because of ENABLE_FACTORY_CREATOR::create_shared.
QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options,
TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe,
QuerySource query_type);

~QueryContext();

void init_query_task_controller();

ExecEnv* exec_env() { return _exec_env; }

bool is_timeout(timespec now) const {
Expand Down
101 changes: 57 additions & 44 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,52 +106,54 @@
__VA_ARGS__; \
} while (0)

#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->resource_ctx() \
->workload_group_context() \
->workload_group() \
->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->resource_ctx() \
->workload_group_context() \
->workload_group() \
->update_local_scan_io(data_dir, *bytes_read); \
} \
} \
#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx && t_ctx->is_attach_task() && \
t_ctx->resource_ctx()->workload_group_context()->workload_group() != nullptr) { \
iot = t_ctx->resource_ctx() \
->workload_group_context() \
->workload_group() \
->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->resource_ctx() \
->workload_group_context() \
->workload_group() \
->update_local_scan_io(data_dir, *bytes_read); \
} \
} \
}

#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->resource_ctx() \
->workload_group_context() \
->workload_group() \
->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->resource_ctx() \
->workload_group_context() \
->workload_group() \
->update_remote_scan_io(*bytes_read); \
} \
} \
#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx && t_ctx->is_attach_task() && \
t_ctx->resource_ctx()->workload_group_context()->workload_group() != nullptr) { \
iot = t_ctx->resource_ctx() \
->workload_group_context() \
->workload_group() \
->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->resource_ctx() \
->workload_group_context() \
->workload_group() \
->update_remote_scan_io(*bytes_read); \
} \
} \
}

namespace doris {
Expand Down Expand Up @@ -197,8 +199,19 @@ class ThreadContext {
bool is_attach_task() { return resource_ctx_ != nullptr; }

std::shared_ptr<ResourceContext> resource_ctx() {
#ifndef BE_TEST
CHECK(is_attach_task());
return resource_ctx_;
#else
if (is_attach_task()) {
return resource_ctx_;
} else {
auto ctx = ResourceContext::create_shared();
ctx->memory_context()->set_mem_tracker(
doris::ExecEnv::GetInstance()->orphan_mem_tracker());
return ctx;
}
#endif
}

static std::string get_thread_id() {
Expand Down
14 changes: 7 additions & 7 deletions be/test/pipeline/pipeline_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ class PipelineTest : public testing::Test {
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx = QueryContext::create_shared(_query_id, ExecEnv::GetInstance(), _query_options,
fe_address, true, fe_address,
QuerySource::INTERNAL_FRONTEND);
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
TRuntimeFilterParamsBuilder().build());
ExecEnv::GetInstance()->set_stream_mgr(_mgr.get());
Expand Down Expand Up @@ -106,9 +106,9 @@ class PipelineTest : public testing::Test {
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
fe_address.port = DUMMY_PORT;
_query_ctx = QueryContext::create_shared(_query_id, ExecEnv::GetInstance(), _query_options,
fe_address, true, fe_address,
QuerySource::INTERNAL_FRONTEND);
_query_ctx =
QueryContext::create(_query_id, ExecEnv::GetInstance(), _query_options, fe_address,
true, fe_address, QuerySource::INTERNAL_FRONTEND);
_runtime_state.clear();
_context.clear();
_fragment_id = 0;
Expand Down Expand Up @@ -933,7 +933,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
for (int j = 0; j < parallelism; j++) {
auto runtime_filter_state = RuntimeFilterParamsContext::create(_query_ctx.get());
_runtime_filter_mgrs[j] = std::make_unique<RuntimeFilterMgr>(
_query_id, runtime_filter_state, _query_ctx->query_mem_tracker, false);
_query_id, runtime_filter_state, _query_ctx->query_mem_tracker(), false);
}
for (size_t i = 0; i < _pipelines.size(); i++) {
EXPECT_EQ(_pipelines[i]->id(), i);
Expand Down
Loading

0 comments on commit daf382f

Please sign in to comment.