diff --git a/.github/actions/action-sh-checker b/.github/actions/action-sh-checker
deleted file mode 160000
index 76ab0b22e1f194..00000000000000
--- a/.github/actions/action-sh-checker
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 76ab0b22e1f194e4a582edc7969df6485c4e9246
diff --git a/.github/actions/clang-format-lint-action b/.github/actions/clang-format-lint-action
deleted file mode 160000
index 6adbe14579e5b8..00000000000000
--- a/.github/actions/clang-format-lint-action
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 6adbe14579e5b8e19eb3e31e5ff2479f3bd302c7
diff --git a/.github/actions/clang-tidy-review b/.github/actions/clang-tidy-review
deleted file mode 160000
index 2c55ef8cfc9acb..00000000000000
--- a/.github/actions/clang-tidy-review
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 2c55ef8cfc9acb3715d433e58aea086dcec9b206
diff --git a/.github/workflows/auto_trigger_teamcity.yml b/.github/workflows/auto_trigger_teamcity.yml
index 61be077fed6857..ab3e85fa6b560f 100644
--- a/.github/workflows/auto_trigger_teamcity.yml
+++ b/.github/workflows/auto_trigger_teamcity.yml
@@ -57,40 +57,32 @@ jobs:
echo "latest_commit_id : ${{ env.LATEST_COMMIT }}"
set -x
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "buildall" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_Doris_FeUt Doris_DorisBeUt_BeUt Doris_DorisCompile_Compile Doris_Performance_Clickbench_ClickbenchNew Doris_ArmPipeline_P0Regression ${trigger_pipelines}"
+ reg="run (buildall|compile|p0|p1|feut|beut|external|clickbench|pipelinex_p0|arm|tpch)( [1-9]*[0-9]+)*"
+ comment_trigger_type="$(echo "${comment_message}" | grep -E "${reg}" | awk -F' ' '{print $2}' | sed -n 1p)"
+ comment_repeat_times="$(echo "${comment_message}" | grep -E "${reg}" | awk -F' ' '{print $3}' | sed -n 1p)"
+ if [[ "${comment_trigger_type}" == "buildall" ]]; then
+ trigger_pipelines="Doris_Doris_FeUt Doris_DorisBeUt_BeUt Doris_DorisCompile_Compile Doris_Performance_Clickbench_ClickbenchNew Doris_ArmPipeline_P0Regression"
+ elif [[ "${comment_trigger_type}" == "compile" ]]; then
+ trigger_pipelines="Doris_DorisCompile_Compile"
+ elif [[ "${comment_trigger_type}" == "p0" ]]; then
+ trigger_pipelines="Doris_DorisRegression_P0Regression"
+ elif [[ "${comment_trigger_type}" == "p1" ]]; then
+ trigger_pipelines="Doris_DorisRegression_P1Regression"
+ elif [[ "${comment_trigger_type}" == "feut" ]]; then
+ trigger_pipelines="Doris_Doris_FeUt"
+ elif [[ "${comment_trigger_type}" == "beut" ]]; then
+ trigger_pipelines="Doris_DorisBeUt_BeUt"
+ elif [[ "${comment_trigger_type}" == "external" ]]; then
+ trigger_pipelines="Doris_External_Regression"
+ elif [[ "${comment_trigger_type}" == "clickbench" ]]; then
+ trigger_pipelines="Doris_Performance_Clickbench_ClickbenchNew"
+ elif [[ "${comment_trigger_type}" == "pipelinex_p0" ]]; then
+ trigger_pipelines="Doris_DorisRegression_P0RegressionPipelineX"
+ elif [[ "${comment_trigger_type}" == "arm" ]]; then
+ trigger_pipelines="Doris_ArmPipeline_P0Regression"
+ elif [[ "${comment_trigger_type}" == "tpch" ]]; then
+ trigger_pipelines="Tpch_TpchSf100"
fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ " p0" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_DorisRegression_P0Regression ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ " pipelinex_p0" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_DorisRegression_P0RegressionPipelineX ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "p1" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_DorisRegression_P1Regression ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "feut" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_Doris_FeUt ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "beut" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_DorisBeUt_BeUt ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "compile" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_DorisCompile_Compile ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "clickbench" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_Performance_Clickbench_ClickbenchNew ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "arm" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_ArmPipeline_P0Regression ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "external" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_External_Regression ${trigger_pipelines}"
- fi
- if [[ "${comment_message}" =~ "run" && "${comment_message}" =~ "just_for_test" && ! "${comment_message}" =~ "Thanks for your contribution" ]]; then
- trigger_pipelines="Doris_DorisRegression_ExternalRegression ${trigger_pipelines}"
- fi
-
if [ -z "${trigger_pipelines}" ];then
echo "Just a general comment that doesn't match any pipeline rules"
fi
@@ -133,7 +125,12 @@ jobs:
if [ "_""${same_build_sign}" == "_false" ];then
sleep 10s
echo "there is no running build or queue build, so trigger a new !"
- execute_command="curl -s -X POST ${teamcity_url}/httpAuth/action.html\?add2Queue\=${pipeline}\&branchName\=pull/${pull_request_num}\&name=env.latest_pr_comment\&value=${encoded_string}\&name=env.latest_commit_id\&value=${latest_commit_id}"
+ echo "comment_repeat_times: ${comment_repeat_times}"
+ if [[ -n "${comment_repeat_times}" ]]; then
+ execute_command="curl -s -X POST ${teamcity_url}/httpAuth/action.html\?add2Queue\=${pipeline}\&branchName\=pull/${pull_request_num}\&name=env.latest_pr_comment\&value=${encoded_string}\&name=env.latest_commit_id\&value=${latest_commit_id}\&name=env.repeat_times\&value=${comment_repeat_times}"
+ else
+ execute_command="curl -s -X POST ${teamcity_url}/httpAuth/action.html\?add2Queue\=${pipeline}\&branchName\=pull/${pull_request_num}\&name=env.latest_pr_comment\&value=${encoded_string}\&name=env.latest_commit_id\&value=${latest_commit_id}"
+ fi
echo "${execute_command}"
eval "${execute_command}"
echo "-----------------------------------------------------------------"
diff --git a/.github/workflows/clang-format.yml b/.github/workflows/clang-format.yml
index f69cb953683895..c804b753a3bbc4 100644
--- a/.github/workflows/clang-format.yml
+++ b/.github/workflows/clang-format.yml
@@ -31,14 +31,21 @@ jobs:
uses: actions/checkout@v3
with:
persist-credentials: false
- submodules: recursive
- name: Checkout ${{ github.ref }} ( ${{ github.event.pull_request.head.sha }} )
if: ${{ github.event_name == 'pull_request_target' }}
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
- submodules: recursive
+
+ - name: Checkout paths-filter
+ run: |
+ rm -rf ./.github/actions/paths-filter
+ git clone https://github.com/dorny/paths-filter .github/actions/paths-filter
+
+ pushd .github/actions/paths-filter &>/dev/null
+ git checkout 4512585405083f25c027a35db413c2b3b9006d50
+ popd &>/dev/null
- name: Paths filter
uses: ./.github/actions/paths-filter
@@ -49,6 +56,15 @@ jobs:
- 'be/src/**'
- 'be/test/**'
+ - name: Checkout clang-format-lint-action
+ run: |
+ rm -rf ./.github/actions/clang-format-lint-action
+ git clone https://github.com/DoozyX/clang-format-lint-action .github/actions/clang-format-lint-action
+
+ pushd .github/actions/clang-format-lint-action &>/dev/null
+ git checkout 6adbe14579e5b8e19eb3e31e5ff2479f3bd302c7
+ popd &>/dev/null
+
- name: "Format it!"
if: ${{ steps.filter.outputs.be_changes == 'true' }}
uses: ./.github/actions/clang-format-lint-action
diff --git a/.github/workflows/code-checks.yml b/.github/workflows/code-checks.yml
index 652aa7f81ea861..9e314bfb6dfc25 100644
--- a/.github/workflows/code-checks.yml
+++ b/.github/workflows/code-checks.yml
@@ -27,21 +27,22 @@ jobs:
- name: Checkout ${{ github.ref }} ( ${{ github.sha }} )
if: ${{ github.event_name != 'pull_request_target' }}
uses: actions/checkout@v3
- with:
- submodules: recursive
- name: Checkout ${{ github.ref }} ( ${{ github.event.pull_request.head.sha }} )
if: ${{ github.event_name == 'pull_request_target' }}
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
- submodules: recursive
- - name: Patch
+ - name: Checkout action-sh-checker
run: |
- pushd .github/actions/action-sh-checker >/dev/null
+ rm -rf ./.github/actions/action-sh-checker
+ git clone https://github.com/luizm/action-sh-checker .github/actions/action-sh-checker
+
+ pushd .github/actions/action-sh-checker &>/dev/null
+ git checkout 76ab0b22e1f194e4a582edc7969df6485c4e9246
sed -i 's/\[ "$GITHUB_EVENT_NAME" == "pull_request" \]/\[\[ "$GITHUB_EVENT_NAME" == "pull_request" || "$GITHUB_EVENT_NAME" == "pull_request_target" \]\]/' entrypoint.sh
- popd >/dev/null
+ popd &>/dev/null
- name: Run ShellCheck
uses: ./.github/actions/action-sh-checker
@@ -63,7 +64,15 @@ jobs:
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
- submodules: recursive
+
+ - name: Checkout paths-filter
+ run: |
+ rm -rf ./.github/actions/paths-filter
+ git clone https://github.com/dorny/paths-filter .github/actions/paths-filter
+
+ pushd .github/actions/paths-filter &>/dev/null
+ git checkout 4512585405083f25c027a35db413c2b3b9006d50
+ popd &>/dev/null
- name: Paths Filter
uses: ./.github/actions/paths-filter
@@ -117,7 +126,6 @@ jobs:
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
- submodules: recursive
- name: Download
uses: actions/download-artifact@v3
@@ -125,6 +133,15 @@ jobs:
name: compile_commands
path: ./be/build_Release
+ - name: Checkout clang-tidy review
+ run: |
+ rm -rf ./.github/actions/clang-tidy-review
+ git clone https://github.com/ZedThree/clang-tidy-review .github/actions/clang-tidy-review
+
+ pushd .github/actions/clang-tidy-review &>/dev/null
+ git checkout 2c55ef8cfc9acb3715d433e58aea086dcec9b206
+ popd &>/dev/null
+
- name: Run clang-tidy review
uses: ./.github/actions/clang-tidy-review
id: review
diff --git a/.github/workflows/license-eyes.yml b/.github/workflows/license-eyes.yml
index 823ac845b3b0c1..890efb2d9d1196 100644
--- a/.github/workflows/license-eyes.yml
+++ b/.github/workflows/license-eyes.yml
@@ -30,15 +30,12 @@ jobs:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
if: ${{ github.event_name != 'pull_request_target' }}
uses: actions/checkout@v3
- with:
- submodules: recursive
- name: Checkout ${{ github.ref }} ( ${{ github.event.pull_request.head.sha }} )
if: ${{ github.event_name == 'pull_request_target' }}
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
- submodules: recursive
- name: Check License
uses: apache/skywalking-eyes@v0.2.0
diff --git a/.github/workflows/pr-approve-status.yml b/.github/workflows/pr-approve-status.yml
index cda6abc8f1c15b..ccb418ebb72e9d 100644
--- a/.github/workflows/pr-approve-status.yml
+++ b/.github/workflows/pr-approve-status.yml
@@ -33,7 +33,7 @@ jobs:
echo "PR number is not set"
exit 1
fi
- response=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }} " "https://api.github.com/repos/apache/doris/pulls/${pr_num}/reviews")
+ response=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }} " "https://api.github.com/repos/apache/doris/pulls/${pr_num}/reviews?per_page=100")
# shellcheck disable=SC2207
reviewers=($(echo $response | jq -r '.[] | .user.login'))
# shellcheck disable=SC2207
diff --git a/.gitmodules b/.gitmodules
index 9fe51bfd1d056d..a4e579b1794833 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,9 +4,6 @@
[submodule ".github/actions/get-workflow-origin"]
path = .github/actions/get-workflow-origin
url = https://github.com/potiuk/get-workflow-origin.git
-[submodule ".github/actions/clang-format-lint-action"]
- path = .github/actions/clang-format-lint-action
- url = https://github.com/DoozyX/clang-format-lint-action.git
[submodule ".github/actions/setup-maven"]
path = .github/actions/setup-maven
url = https://github.com/stCarolas/setup-maven.git
@@ -19,12 +16,6 @@
[submodule ".github/actions/ccache-action"]
path = .github/actions/ccache-action
url = https://github.com/hendrikmuhs/ccache-action
-[submodule ".github/actions/action-sh-checker"]
- path = .github/actions/action-sh-checker
- url = https://github.com/luizm/action-sh-checker
-[submodule ".github/actions/clang-tidy-review"]
- path = .github/actions/clang-tidy-review
- url = https://github.com/ZedThree/clang-tidy-review.git
[submodule "be/src/apache-orc"]
path = be/src/apache-orc
url = https://github.com/apache/doris-thirdparty.git
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index cc4ed74dc0854a..0a185d9bdc7c46 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -6,9 +6,9 @@
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,19 +16,19 @@
limitations under the License.
-->
-
-
-
-
-
-
-
-
-
\ No newline at end of file
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/README.md b/README.md
index cecece032825c6..d14ad11deb98a5 100644
--- a/README.md
+++ b/README.md
@@ -35,8 +35,6 @@ Apache Doris is an easy-to-use, high-performance and real-time analytical databa
All this makes Apache Doris an ideal tool for scenarios including report analysis, ad-hoc query, unified data warehouse, and data lake query acceleration. On Apache Doris, users can build various applications, such as user behavior analysis, AB test platform, log retrieval analysis, user portrait analysis, and order analysis.
-Doris Summit Asia 2023 is coming and warmly invite you to join! Click Now 🔗[doris-summit.org.cn](https://doris-summit.org.cn/?utm_source=website&utm_medium=readme&utm_campaign=2023&utm_id=2023)
-
🎉 Version 2.0.2 version released now. The 2.0.2 version has achieved over 10x performance improvements on standard Benchmark, comprehensive enhancement in log analysis and lakehouse scenarios, more efficient and stable data update and write efficiency, support for more comprehensive multi-tenant and resource isolation mechanisms, and take a new step in the direction of resource elasticity and storage computing separation. It has also been added a series of usability features for enterprise users. We welcome all users who have requirements for the new features of the 2.0 version to deploy and upgrade. Check out the 🔗[Release Notes](https://github.com/apache/doris/issues/25011) here.
🎉 Version 1.2.7 released now! It is fully evolved release and all users are encouraged to upgrade to this release. Check out the 🔗[Release Notes](https://doris.apache.org/docs/dev/releasenotes/release-1.2.7) here.
diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 5adf108ab61191..4a83fdc92f9187 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -145,17 +145,6 @@ endif()
add_thirdparty(minizip LIB64)
add_thirdparty(simdjson LIB64)
add_thirdparty(idn LIB64)
-add_thirdparty(opentelemetry_common LIB64)
-add_thirdparty(opentelemetry_exporter_zipkin_trace LIB64)
-add_thirdparty(opentelemetry_resources LIB64)
-add_thirdparty(opentelemetry_version LIB64)
-add_thirdparty(opentelemetry_exporter_ostream_span LIB64)
-add_thirdparty(opentelemetry_trace LIB64)
-add_thirdparty(opentelemetry_http_client_curl LIB64)
-add_thirdparty(opentelemetry_exporter_otlp_http LIB64)
-add_thirdparty(opentelemetry_exporter_otlp_http_client LIB64)
-add_thirdparty(opentelemetry_otlp_recordable LIB64)
-add_thirdparty(opentelemetry_proto LIB64)
add_thirdparty(xml2 LIB64)
add_thirdparty(lzma LIB64)
add_thirdparty(gsasl)
diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index 4a9aed2bb0f19c..d16a32b7be5433 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -54,6 +54,19 @@ void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) {
}
}
+void CgroupCpuCtl::update_cpu_soft_limit(int cpu_shares) {
+ if (!_init_succ) {
+ return;
+ }
+ std::lock_guard w_lock(_lock_mutex);
+ if (_cpu_shares != cpu_shares) {
+ Status ret = modify_cg_cpu_soft_limit_no_lock(cpu_shares);
+ if (ret.ok()) {
+ _cpu_shares = cpu_shares;
+ }
+ }
+}
+
Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::string msg,
bool is_append) {
int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
@@ -97,9 +110,11 @@ Status CgroupV1CpuCtl::init() {
}
}
- // quota path
+ // quota file
_cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + "/cpu.cfs_quota_us";
- // task path
+ // cpu.shares file
+ _cgroup_v1_cpu_tg_shares_file = _cgroup_v1_cpu_tg_path + "/cpu.shares";
+ // task file
_cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks";
LOG(INFO) << "cgroup v1 cpu path init success"
<< ", query tg path=" << _cgroup_v1_cpu_tg_path
@@ -110,6 +125,11 @@ Status CgroupV1CpuCtl::init() {
return Status::OK();
}
+Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) {
+ std::string msg = "modify cpu shares to " + std::to_string(cpu_shares);
+ return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_shares_file, cpu_shares, msg, false);
+}
+
Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
std::string msg = "modify cpu quota value to " + std::to_string(val);
@@ -120,9 +140,14 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
if (!_init_succ) {
return Status::OK();
}
+#if defined(__APPLE__)
+ //unsupported now
+ return Status::OK();
+#else
int tid = static_cast(syscall(SYS_gettid));
std::string msg = "add thread " + std::to_string(tid) + " to group";
std::lock_guard w_lock(_lock_mutex);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true);
+#endif
}
} // namespace doris
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index c3a30660147d63..b98e268da09464 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -28,6 +28,12 @@
namespace doris {
+// cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
+const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
+
+// cgroup cpu.shares default value
+const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
+
class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
@@ -35,15 +41,19 @@ class CgroupCpuCtl {
virtual Status init();
- virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
-
virtual Status add_thread_to_cgroup() = 0;
void update_cpu_hard_limit(int cpu_hard_limit);
+ void update_cpu_soft_limit(int cpu_shares);
+
protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);
+ virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
+
+ virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;
+
std::string _doris_cgroup_cpu_path;
uint64_t _cpu_core_num = CpuInfo::num_cores();
uint64_t _cpu_cfs_period_us = 100000;
@@ -51,6 +61,7 @@ class CgroupCpuCtl {
std::shared_mutex _lock_mutex;
bool _init_succ = false;
uint64_t _tg_id; // workload group id
+ uint64_t _cpu_shares = 0;
};
/*
@@ -73,20 +84,25 @@ class CgroupCpuCtl {
6 workload group quota file:
/sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/cpu.cfs_quota_us
- 7 workload group tasks file:
+ 7 workload group tasks file:
/sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/tasks
+
+ 8 workload group cpu.shares file:
+ /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/cpu.shares
*/
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
+ Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;
private:
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_tg_path; // workload group path
std::string _cgroup_v1_cpu_tg_quota_file;
+ std::string _cgroup_v1_cpu_tg_shares_file;
std::string _cgroup_v1_cpu_tg_task_file;
};
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 6717af3ce4b837..698a71aec16c13 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1949,6 +1949,10 @@ void StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac
EngineStorageMigrationTask engine_task(tablet, dest_store);
status = StorageEngine::instance()->execute_task(&engine_task);
}
+ // fe should ignore this err
+ if (status.is()) {
+ status = Status::OK();
+ }
if (!status.ok()) {
LOG_WARNING("failed to migrate storage medium")
.tag("signature", agent_task_req.signature)
@@ -2011,8 +2015,9 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
*dest_store = stores[0];
}
if (tablet->data_dir()->path() == (*dest_store)->path()) {
- return Status::InternalError("tablet is already on specified path {}",
- tablet->data_dir()->path());
+ LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path());
+ return Status::Error("tablet is already on specified path: {}",
+ tablet->data_dir()->path());
}
// check local disk capacity
@@ -2021,7 +2026,6 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
return Status::InternalError("reach the capacity limit of path {}, tablet_size={}",
(*dest_store)->path(), tablet_size);
}
-
return Status::OK();
}
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index deb99682fed450..a9b0848984b56a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -319,7 +319,7 @@ DEFINE_Int32(index_page_cache_percentage, "10");
// whether to disable page cache feature in storage
DEFINE_Bool(disable_storage_page_cache, "false");
// whether to disable row cache feature in storage
-DEFINE_Bool(disable_storage_row_cache, "true");
+DEFINE_mBool(disable_storage_row_cache, "true");
// whether to disable pk page cache feature in storage
DEFINE_Bool(disable_pk_storage_page_cache, "false");
@@ -457,7 +457,7 @@ DEFINE_Int32(webserver_num_workers, "48");
// Period to update rate counters and sampling counters in ms.
DEFINE_mInt32(periodic_counter_update_period_ms, "500");
-DEFINE_Bool(enable_single_replica_load, "false");
+DEFINE_Bool(enable_single_replica_load, "true");
// Number of download workers for single replica load
DEFINE_Int32(single_replica_load_download_num_workers, "64");
@@ -635,8 +635,6 @@ DEFINE_Bool(path_gc_check, "true");
DEFINE_mInt32(path_gc_check_interval_second, "86400");
DEFINE_mInt32(path_gc_check_step, "1000");
DEFINE_mInt32(path_gc_check_step_interval_ms, "10");
-DEFINE_mInt32(path_scan_interval_second, "86400");
-DEFINE_mInt32(path_scan_step_interval_ms, "70");
// The following 2 configs limit the max usage of disk capacity of a data dir.
// If both of these 2 threshold reached, no more data can be writen into that data dir.
@@ -742,8 +740,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
// share delta writers when memtable_on_sink_node = true
DEFINE_Bool(share_delta_writers, "true");
-// number of brpc stream per load
-DEFINE_Int32(num_streams_per_load, "5");
// timeout for open load stream rpc in ms
DEFINE_Int64(open_load_stream_timeout_ms, "500");
@@ -817,27 +813,6 @@ DEFINE_String(function_service_protocol, "h2:grpc");
// use which load balancer to select server to connect
DEFINE_String(rpc_load_balancer, "rr");
-// Enable tracing
-// If this configuration is enabled, you should also specify the trace_export_url.
-DEFINE_Bool(enable_tracing, "false");
-
-// Enable opentelemtry collector
-DEFINE_Bool(enable_otel_collector, "false");
-
-// Current support for exporting traces:
-// zipkin: Export traces directly to zipkin, which is used to enable the tracing feature quickly.
-// collector: The collector can be used to receive and process traces and support export to a variety of
-// third-party systems.
-DEFINE_mString(trace_exporter, "zipkin");
-DEFINE_Validator(trace_exporter, [](const std::string& config) -> bool {
- return config == "zipkin" || config == "collector";
-});
-
-// The endpoint to export spans to.
-// export to zipkin like: http://127.0.0.1:9411/api/v2/spans
-// export to collector like: http://127.0.0.1:4318/v1/traces
-DEFINE_String(trace_export_url, "http://127.0.0.1:9411/api/v2/spans");
-
// The maximum buffer/queue size to collect span. After the size is reached, spans are dropped.
// An export will be triggered when the number of spans in the queue reaches half of the maximum.
DEFINE_Int32(max_span_queue_size, "2048");
@@ -969,6 +944,9 @@ DEFINE_Bool(enable_workload_group_for_scan, "false");
// Will remove after fully test.
DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true");
+DEFINE_mBool(enable_flatten_nested_for_variant, "false");
+DEFINE_mDouble(ratio_of_defaults_as_sparse_column, "0.95");
+
// block file cache
DEFINE_Bool(enable_file_cache, "false");
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
@@ -1013,8 +991,6 @@ DEFINE_Int32(max_depth_in_bkd_tree, "32");
DEFINE_Bool(inverted_index_compaction_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
DEFINE_Int32(num_broadcast_buffer, "32");
-// semi-structure configs
-DEFINE_Bool(enable_parse_multi_dimession_array, "false");
// max depth of expression tree allowed.
DEFINE_Int32(max_depth_of_expr_tree, "600");
@@ -1034,6 +1010,8 @@ DEFINE_mInt32(s3_write_buffer_size, "5242880");
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
+// The timeout config for S3 buffer allocation
+DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
//disable shrink memory by default
@@ -1096,6 +1074,7 @@ DEFINE_String(group_commit_replay_wal_dir, "./wal");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_sync_wal_batch, "10");
+DEFINE_Bool(wait_internal_group_commit_finish, "false");
// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
@@ -1110,12 +1089,25 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
// cgroup
DEFINE_String(doris_cgroup_cpu_path, "");
+DEFINE_Bool(enable_cgroup_cpu_soft_limit, "false");
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
// Dir of default timezone files
DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
+// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
+DEFINE_Int32(group_commit_max_queue_size, "67108864");
+
+// Max size(bytes) of wal disk using, used for disk space back pressure, default 64M.
+DEFINE_Int32(wal_max_disk_size, "67108864");
+
+// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
+DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
+
+// Download binlog rate limit, unit is KB/s, 0 means no limit
+DEFINE_Int32(download_binlog_rate_limit_kbs, "0");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9855ba0ffecbd6..b6e911438ecad9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -106,11 +106,11 @@ DECLARE_Int32(brpc_num_threads);
// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
-// this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24
+// This is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24
// If no ip match this rule, will choose one randomly.
DECLARE_String(priority_networks);
-// performance moderate or or compact, only tcmalloc compile
+// performance moderate or compact, only tcmalloc compile
DECLARE_String(memory_mode);
// process memory limit specified as number of bytes
@@ -369,7 +369,7 @@ DECLARE_Int32(index_page_cache_percentage);
// TODO delete it. Divided into Data page, Index page, pk index page
DECLARE_Bool(disable_storage_page_cache);
// whether to disable row cache feature in storage
-DECLARE_Bool(disable_storage_row_cache);
+DECLARE_mBool(disable_storage_row_cache);
// whether to disable pk page cache feature in storage
DECLARE_Bool(disable_pk_storage_page_cache);
@@ -690,8 +690,6 @@ DECLARE_Bool(path_gc_check);
DECLARE_mInt32(path_gc_check_interval_second);
DECLARE_mInt32(path_gc_check_step);
DECLARE_mInt32(path_gc_check_step_interval_ms);
-DECLARE_mInt32(path_scan_interval_second);
-DECLARE_mInt32(path_scan_step_interval_ms);
// The following 2 configs limit the max usage of disk capacity of a data dir.
// If both of these 2 threshold reached, no more data can be writen into that data dir.
@@ -799,8 +797,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
// share delta writers when memtable_on_sink_node = true
DECLARE_Bool(share_delta_writers);
-// number of brpc stream per load
-DECLARE_Int32(num_streams_per_load);
// timeout for open load stream rpc in ms
DECLARE_Int64(open_load_stream_timeout_ms);
@@ -875,24 +871,6 @@ DECLARE_String(function_service_protocol);
// use which load balancer to select server to connect
DECLARE_String(rpc_load_balancer);
-// Enable tracing
-// If this configuration is enabled, you should also specify the trace_export_url.
-DECLARE_Bool(enable_tracing);
-
-// Enable opentelemtry collector
-DECLARE_Bool(enable_otel_collector);
-
-// Current support for exporting traces:
-// zipkin: Export traces directly to zipkin, which is used to enable the tracing feature quickly.
-// collector: The collector can be used to receive and process traces and support export to a variety of
-// third-party systems.
-DECLARE_mString(trace_exporter);
-
-// The endpoint to export spans to.
-// export to zipkin like: http://127.0.0.1:9411/api/v2/spans
-// export to collector like: http://127.0.0.1:4318/v1/traces
-DECLARE_String(trace_export_url);
-
// The maximum buffer/queue size to collect span. After the size is reached, spans are dropped.
// An export will be triggered when the number of spans in the queue reaches half of the maximum.
DECLARE_Int32(max_span_queue_size);
@@ -1055,8 +1033,6 @@ DECLARE_Int32(max_depth_in_bkd_tree);
DECLARE_Bool(inverted_index_compaction_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
DECLARE_Int32(num_broadcast_buffer);
-// semi-structure configs
-DECLARE_Bool(enable_parse_multi_dimession_array);
// max depth of expression tree allowed.
DECLARE_Int32(max_depth_of_expr_tree);
@@ -1076,6 +1052,8 @@ DECLARE_mInt32(s3_write_buffer_size);
// can at most buffer 50MB data. And the num of multi part upload task is
// s3_write_buffer_whole_size / s3_write_buffer_size
DECLARE_mInt32(s3_write_buffer_whole_size);
+// The timeout config for S3 buffer allocation
+DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
// the max number of cached file handle for block segemnt
DECLARE_mInt64(file_cache_max_file_reader_cache_size);
//enable shrink memory
@@ -1132,6 +1110,12 @@ DECLARE_mInt64(lookup_connection_cache_bytes_limit);
// level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT
DECLARE_mInt64(LZ4_HC_compression_level);
+// Whether flatten nested arrays in variant column
+// Notice: TEST ONLY
+DECLARE_mBool(enable_flatten_nested_for_variant);
+// Threshold of a column as sparse column
+// Notice: TEST ONLY
+DECLARE_mDouble(ratio_of_defaults_as_sparse_column);
DECLARE_mBool(enable_merge_on_write_correctness_check);
@@ -1159,6 +1143,7 @@ DECLARE_String(group_commit_replay_wal_dir);
DECLARE_Int32(group_commit_replay_wal_retry_num);
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
DECLARE_Int32(group_commit_sync_wal_batch);
+DECLARE_Bool(wait_internal_group_commit_finish);
// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);
@@ -1177,6 +1162,8 @@ DECLARE_mBool(exit_on_exception);
// cgroup
DECLARE_String(doris_cgroup_cpu_path);
+DECLARE_Bool(enable_cgroup_cpu_soft_limit);
+
// This config controls whether the s3 file writer would flush cache asynchronously
DECLARE_Bool(enable_flush_file_cache_async);
@@ -1186,6 +1173,18 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment);
// Dir of default timezone files
DECLARE_String(default_tzfiles_path);
+// Max size(bytes) of group commit queues, used for mem back pressure.
+DECLARE_Int32(group_commit_max_queue_size);
+
+// Max size(bytes) of wal disk using, used for disk space back pressure.
+DECLARE_Int32(wal_max_disk_size);
+
+// Ingest binlog work pool size
+DECLARE_Int32(ingest_binlog_work_pool_size);
+
+// Download binlog rate limit, unit is KB/s
+DECLARE_Int32(download_binlog_rate_limit_kbs);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/common/exception.cpp b/be/src/common/exception.cpp
index 9da69bc67827f3..37e357ec32f89c 100644
--- a/be/src/common/exception.cpp
+++ b/be/src/common/exception.cpp
@@ -21,7 +21,7 @@
#include "util/stack_util.h"
namespace doris {
-Exception::Exception(int code, const std::string_view msg) {
+Exception::Exception(int code, const std::string_view& msg) {
_code = code;
_err_msg = std::make_unique();
_err_msg->_msg = msg;
@@ -31,7 +31,7 @@ Exception::Exception(int code, const std::string_view msg) {
}
}
-Exception::Exception(const Exception& nested, int code, const std::string_view msg) {
+Exception::Exception(const Exception& nested, int code, const std::string_view& msg) {
_code = code;
_err_msg = std::make_unique();
_err_msg->_msg = msg;
diff --git a/be/src/common/exception.h b/be/src/common/exception.h
index 325bb67aa6b55b..299c0da043735e 100644
--- a/be/src/common/exception.h
+++ b/be/src/common/exception.h
@@ -37,14 +37,14 @@ inline thread_local int enable_thread_catch_bad_alloc = 0;
class Exception : public std::exception {
public:
Exception() : _code(ErrorCode::OK) {}
- Exception(int code, const std::string_view msg);
+ Exception(int code, const std::string_view& msg);
// add nested exception as first param, or the template may could not find
// the correct method for ...args
- Exception(const Exception& nested, int code, const std::string_view msg);
+ Exception(const Exception& nested, int code, const std::string_view& msg);
// Format message with fmt::format, like the logging functions.
template
- Exception(int code, const std::string_view fmt, Args&&... args)
+ Exception(int code, const std::string_view& fmt, Args&&... args)
: Exception(code, fmt::format(fmt, std::forward(args)...)) {}
int code() const { return _code; }
@@ -96,9 +96,8 @@ inline const std::string& Exception::to_string() const {
return Status::MemoryLimitExceeded(fmt::format( \
"PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \
- } else { \
- return Status::Error(e.code(), e.to_string()); \
} \
+ return Status::Error(e.code(), e.to_string()); \
} \
} while (0)
@@ -118,8 +117,7 @@ inline const std::string& Exception::to_string() const {
return Status::MemoryLimitExceeded(fmt::format( \
"PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \
- } else { \
- return Status::Error(e.code(), e.to_string()); \
} \
+ return Status::Error(e.code(), e.to_string()); \
} \
} while (0)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 88977ce310d6c1..be232f08522f17 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -16,7 +16,6 @@
#include
#include
-// IWYU pragma: no_include
#include "common/compiler_util.h" // IWYU pragma: keep
#ifdef ENABLE_STACKTRACE
#include "util/stack_util.h"
@@ -321,7 +320,8 @@ constexpr bool capture_stacktrace(int code) {
&& code != ErrorCode::UNINITIALIZED
&& code != ErrorCode::PIP_WAIT_FOR_RF
&& code != ErrorCode::PIP_WAIT_FOR_SC
- && code != ErrorCode::INVALID_ARGUMENT;
+ && code != ErrorCode::INVALID_ARGUMENT
+ && code != ErrorCode::DATA_QUALITY_ERR;
}
// clang-format on
@@ -575,15 +575,6 @@ inline std::string Status::to_string() const {
} \
} while (false);
-#define RETURN_WITH_WARN_IF_ERROR(stmt, ret_code, warning_prefix) \
- do { \
- Status _s = (stmt); \
- if (UNLIKELY(!_s.ok())) { \
- LOG(WARNING) << (warning_prefix) << ", error: " << _s; \
- return ret_code; \
- } \
- } while (false);
-
#define RETURN_NOT_OK_STATUS_WITH_WARN(stmt, warning_prefix) \
do { \
Status _s = (stmt); \
@@ -606,17 +597,28 @@ using ResultError = unexpected;
} \
} while (false)
-// clang-format off
-#define DORIS_TRY(stmt) \
- ({ \
- auto&& res = (stmt); \
- using T = std::decay_t; \
- static_assert(tl::detail::is_expected::value); \
- if (!res.has_value()) [[unlikely]] { \
- return std::forward(res).error(); \
- } \
- std::forward(res).value(); \
+#define DORIS_TRY(stmt) \
+ ({ \
+ auto&& res = (stmt); \
+ using T = std::decay_t; \
+ if (!res.has_value()) [[unlikely]] { \
+ return std::forward(res).error(); \
+ } \
+ std::forward(res).value(); \
});
-// clang-format on
} // namespace doris
+
+// specify formatter for Status
+template <>
+struct fmt::formatter {
+ template
+ constexpr auto parse(ParseContext& ctx) {
+ return ctx.begin();
+ }
+
+ template
+ auto format(doris::Status const& status, FormatContext& ctx) {
+ return fmt::format_to(ctx.out(), "{}", status.to_string());
+ }
+};
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 89f29b58998e43..285653c43b8d38 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -20,9 +20,8 @@
#pragma once
-#include
#include
-// IWYU pragma: no_include
+
#include
#include
#include
@@ -30,7 +29,6 @@
#include "common/status.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
-#include "util/telemetry/telemetry.h"
namespace doris {
@@ -84,7 +82,6 @@ class DataSink {
// It must be okay to call this multiple times. Subsequent calls should
// be ignored.
virtual Status close(RuntimeState* state, Status exec_status) {
- profile()->add_to_span(_span);
_closed = true;
return Status::OK();
}
@@ -129,8 +126,6 @@ class DataSink {
// Maybe this will be transferred to BufferControlBlock.
std::shared_ptr _query_statistics;
-
- OpentelemetrySpan _span {};
};
} // namespace doris
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ed2c54a0704a28..c416420ef22974 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -29,7 +29,6 @@
#include
#include
-// IWYU pragma: no_include
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
@@ -127,9 +126,6 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status ExecNode::prepare(RuntimeState* state) {
DCHECK(_runtime_profile.get() != nullptr);
- _span = state->get_tracer()->StartSpan(get_name());
- OpentelemetryScope scope {_span};
-
_exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1);
_rows_returned_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsReturned", TUnit::UNIT, 1);
@@ -201,7 +197,6 @@ void ExecNode::release_resource(doris::RuntimeState* state) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
- runtime_profile()->add_to_span(_span);
_is_resource_released = true;
}
}
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index d4d90546ffb5c9..f5f918731f7bca 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -36,7 +36,6 @@
#include "common/status.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
-#include "util/telemetry/telemetry.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_fwd.h"
@@ -231,8 +230,6 @@ class ExecNode {
MemTracker* mem_tracker() const { return _mem_tracker.get(); }
- OpentelemetrySpan get_next_span() { return _span; }
-
virtual std::string get_name();
// Names of counters shared by all exec nodes
@@ -289,9 +286,6 @@ class ExecNode {
// Account for peak memory used by this node
RuntimeProfile::Counter* _peak_memory_usage_counter;
- //
- OpentelemetrySpan _span;
-
//NOTICE: now add a faker profile, because sometimes the profile record is useless
//so we want remove some counters and timers, eg: in join node, if it's broadcast_join
//and shared hash table, some counter/timer about build hash table is useless,
diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index 47005ef042faad..91a27d980f826d 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -55,7 +55,7 @@ std::string cast_to_string(T value, int scale) {
} else if constexpr (primitive_type == TYPE_DECIMAL128I) {
return ((vectorized::Decimal)value).to_string(scale);
} else if constexpr (primitive_type == TYPE_DECIMAL256) {
- return ((vectorized::Decimal)value).to_string(scale);
+ return ((vectorized::Decimal)value).to_string(scale);
} else if constexpr (primitive_type == TYPE_TINYINT) {
return std::to_string(static_cast(value));
} else if constexpr (primitive_type == TYPE_LARGEINT) {
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 94fcd814bea6de..c7519c5b05ad14 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -193,6 +193,24 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
return Status::OK();
}
+bool _has_char_type(const TypeDescriptor& desc) {
+ switch (desc.type) {
+ case TYPE_CHAR:
+ return true;
+ case TYPE_ARRAY:
+ case TYPE_MAP:
+ case TYPE_STRUCT:
+ for (int idx = 0; idx < desc.children.size(); ++idx) {
+ if (_has_char_type(desc.children[idx])) {
+ return true;
+ }
+ }
+ return false;
+ default:
+ return false;
+ }
+}
+
Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids,
vectorized::Block* res_block) {
CHECK(!_stubs.empty());
@@ -238,17 +256,10 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids,
std::vector char_type_idx;
for (size_t i = 0; i < _fetch_option.desc->slots().size(); i++) {
const auto& column_desc = _fetch_option.desc->slots()[i];
- const TypeDescriptor* type_desc = &column_desc->type();
- do {
- if (type_desc->type == TYPE_CHAR) {
- char_type_idx.emplace_back(i);
- break;
- } else if (type_desc->type != TYPE_ARRAY) {
- break;
- }
- // for Array or Array>
- type_desc = &type_desc->children[0];
- } while (true);
+ const TypeDescriptor& type_desc = column_desc->type();
+ if (_has_char_type(type_desc)) {
+ char_type_idx.push_back(i);
+ }
}
res_block->shrink_char_type_column_suffix_zero(char_type_idx);
VLOG_DEBUG << "dump block:" << res_block->dump_data(0, 10);
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 9733558284a8fd..8d640341bfb9a6 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -60,8 +60,6 @@
namespace doris {
class ObjectPool;
-DorisServer* SchemaScanner::_s_doris_server;
-
SchemaScanner::SchemaScanner(const std::vector& columns)
: _is_init(false),
_param(nullptr),
diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index 2d4e468c592676..ec4bc9e0d3da99 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -33,7 +33,7 @@
namespace doris {
// forehead declare class, because jni function init in DorisServer.
-class DorisServer;
+
class RuntimeState;
class ObjectPool;
class TUserIdentity;
@@ -101,8 +101,6 @@ class SchemaScanner {
static std::unique_ptr create(TSchemaTableType::type type);
TSchemaTableType::type type() const { return _schema_table_type; }
- static void set_doris_server(DorisServer* doris_server) { _s_doris_server = doris_server; }
-
protected:
Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector& datas);
@@ -113,8 +111,6 @@ class SchemaScanner {
// schema table's column desc
std::vector _columns;
- static DorisServer* _s_doris_server;
-
TSchemaTableType::type _schema_table_type;
RuntimeProfile::Counter* _get_db_timer = nullptr;
diff --git a/be/src/exec/schema_scanner/schema_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp
index 3184aed4d2e4f5..b1d6900e0f5c2f 100644
--- a/be/src/exec/schema_scanner/schema_helper.cpp
+++ b/be/src/exec/schema_scanner/schema_helper.cpp
@@ -69,15 +69,6 @@ Status SchemaHelper::list_table_metadata_name_ids(const std::string& ip, const i
});
}
-Status SchemaHelper::describe_table(const std::string& ip, const int32_t port,
- const TDescribeTableParams& request,
- TDescribeTableResult* result) {
- return ThriftRpcHelper::rpc(
- ip, port, [&request, &result](FrontendServiceConnection& client) {
- client->describeTable(*result, request);
- });
-}
-
Status SchemaHelper::describe_tables(const std::string& ip, const int32_t port,
const TDescribeTablesParams& request,
TDescribeTablesResult* result) {
diff --git a/be/src/exec/schema_scanner/schema_helper.h b/be/src/exec/schema_scanner/schema_helper.h
index 900f963f7893cc..c0143136115bf1 100644
--- a/be/src/exec/schema_scanner/schema_helper.h
+++ b/be/src/exec/schema_scanner/schema_helper.h
@@ -55,10 +55,6 @@ class SchemaHelper {
const doris::TGetTablesParams& request,
TListTableMetadataNameIdsResult* result);
- static Status describe_table(const std::string& ip, const int32_t port,
- const TDescribeTableParams& desc_params,
- TDescribeTableResult* desc_result);
-
static Status describe_tables(const std::string& ip, const int32_t port,
const TDescribeTablesParams& desc_params,
TDescribeTablesResult* desc_result);
diff --git a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
index e69596ef8fa0a7..ef7b2b69c1e710 100644
--- a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
@@ -193,13 +193,13 @@ Status SchemaMetadataNameIdsScanner::_fill_block_impl(vectorized::Block* block)
RETURN_IF_ERROR(fill_dest_column_for_range(block, 3, null_datas));
}
}
- // table_id
+ // table_id
{
int64_t srcs[table_num];
for (int i = 0; i < table_num; ++i) {
if (_table_result.tables[i].__isset.id) {
srcs[i] = _table_result.tables[i].id;
- datas[i] = &srcs;
+ datas[i] = srcs + i;
} else {
datas[i] = nullptr;
}
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 9a24771edc08c1..90d434625816ef 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -17,7 +17,6 @@
#include "exec/tablet_info.h"
-#include
#include
#include
#include
@@ -26,6 +25,7 @@
#include
#include
+#include
#include
#include
@@ -130,7 +130,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
for (auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
}
- std::unordered_map, SlotDescriptor*> slots_map;
+ std::unordered_map, SlotDescriptor*> slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));
for (auto& p_slot_desc : pschema.slot_descs()) {
@@ -138,7 +138,8 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_tuple_desc->add_slot(slot_desc);
string data_type;
EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), data_type);
- slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), std::move(data_type)),
+ slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()),
+ TabletColumn::get_field_type_by_string(data_type)),
slot_desc);
}
@@ -149,8 +150,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
for (auto& pcolumn_desc : p_index.columns_desc()) {
if (!_is_partial_update ||
_partial_update_input_columns.count(pcolumn_desc.name()) > 0) {
- auto it = slots_map.find(
- std::make_pair(to_lower(pcolumn_desc.name()), pcolumn_desc.type()));
+ auto it = slots_map.find(std::make_pair(
+ to_lower(pcolumn_desc.name()),
+ TabletColumn::get_field_type_by_string(pcolumn_desc.type())));
if (it == std::end(slots_map)) {
return Status::InternalError("unknown index column, column={}, type={}",
pcolumn_desc.name(), pcolumn_desc.type());
@@ -322,49 +324,20 @@ Status VOlapTablePartitionParam::init() {
}
}
- _partitions_map.reset(
- new std::map(
- VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs)));
+ _partitions_map = std::make_unique<
+ std::map>(
+ VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs));
if (_t_param.__isset.distributed_columns) {
for (auto& col : _t_param.distributed_columns) {
RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed"));
}
}
- if (_distributed_slot_locs.empty()) {
- _compute_tablet_index = [](BlockRow* key,
- const VOlapTablePartition& partition) -> uint32_t {
- if (partition.load_tablet_idx == -1) {
- // load_to_single_tablet = false, just do random
- return butil::fast_rand() % partition.num_buckets;
- }
- // load_to_single_tablet = ture, do round-robin
- return partition.load_tablet_idx % partition.num_buckets;
- };
- } else {
- _compute_tablet_index = [this](BlockRow* key,
- const VOlapTablePartition& partition) -> uint32_t {
- uint32_t hash_val = 0;
- for (int i = 0; i < _distributed_slot_locs.size(); ++i) {
- auto slot_desc = _slots[_distributed_slot_locs[i]];
- auto& column = key->first->get_by_position(_distributed_slot_locs[i]).column;
- auto val = column->get_data_at(key->second);
- if (val.data != nullptr) {
- hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type,
- hash_val);
- } else {
- hash_val = HashUtil::zlib_crc_hash_null(hash_val);
- }
- }
- return hash_val % partition.num_buckets;
- };
- }
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
// initial partitions
- for (int i = 0; i < _t_param.partitions.size(); ++i) {
- const TOlapTablePartition& t_part = _t_param.partitions[i];
+ for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
@@ -383,26 +356,6 @@ Status VOlapTablePartitionParam::init() {
return Status::OK();
}
-bool VOlapTablePartitionParam::find_partition(BlockRow* block_row,
- const VOlapTablePartition** partition) const {
- // block_row is gave by inserting process. So try to use transformed index.
- auto it =
- _is_in_partition
- ? _partitions_map->find(std::tuple {block_row->first, block_row->second, true})
- : _partitions_map->upper_bound(
- std::tuple {block_row->first, block_row->second, true});
- // for list partition it might result in default partition
- if (_is_in_partition) {
- *partition = (it != _partitions_map->end()) ? it->second : _default_partition;
- it = _partitions_map->end();
- }
- if (it != _partitions_map->end() &&
- _part_contains(it->second, std::tuple {block_row->first, block_row->second, true})) {
- *partition = it->second;
- }
- return (*partition != nullptr);
-}
-
bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
BlockRowWithIndicator key) const {
// start_key.second == -1 means only single partition
@@ -411,11 +364,6 @@ bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
!comparator(key, std::tuple {part->start_key.first, part->start_key.second, false});
}
-uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row,
- const VOlapTablePartition& partition) const {
- return _compute_tablet_index(block_row, partition);
-}
-
Status VOlapTablePartitionParam::_create_partition_keys(const std::vector& t_exprs,
BlockRow* part_key) {
for (int i = 0; i < t_exprs.size(); i++) {
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index ec12dcbfcd377c..bb9fbd8bc6013a 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -17,6 +17,7 @@
#pragma once
+#include
#include
#include
@@ -33,6 +34,8 @@
#include "common/object_pool.h"
#include "common/status.h"
+#include "runtime/descriptors.h"
+#include "runtime/raw_value.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
@@ -162,9 +165,78 @@ class VOlapTablePartitionParam {
int64_t version() const { return _t_param.version; }
// return true if we found this block_row in partition
- bool find_partition(BlockRow* block_row, const VOlapTablePartition** partition) const;
+ //TODO: use virtual function to refactor it
+ ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
+ VOlapTablePartition*& partition) const {
+ auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true})
+ : _partitions_map->upper_bound(std::tuple {block, row, true});
+ // for list partition it might result in default partition
+ if (_is_in_partition) {
+ partition = (it != _partitions_map->end()) ? it->second : _default_partition;
+ it = _partitions_map->end();
+ }
+ if (it != _partitions_map->end() &&
+ _part_contains(it->second, std::tuple {block, row, true})) {
+ partition = it->second;
+ }
+ return (partition != nullptr);
+ }
+
+ ALWAYS_INLINE void find_tablets(
+ vectorized::Block* block, const std::vector& indexes,
+ const std::vector& partitions,
+ std::vector& tablet_indexes /*result*/,
+ /*TODO: check if flat hash map will be better*/
+ std::map* partition_tablets_buffer = nullptr) const {
+ std::function
+ compute_function;
+ if (!_distributed_slot_locs.empty()) {
+ //TODO: refactor by saving the hash values. then we can calculate in columnwise.
+ compute_function = [this](vectorized::Block* block, uint32_t row,
+ const VOlapTablePartition& partition) -> uint32_t {
+ uint32_t hash_val = 0;
+ for (unsigned short _distributed_slot_loc : _distributed_slot_locs) {
+ auto* slot_desc = _slots[_distributed_slot_loc];
+ auto& column = block->get_by_position(_distributed_slot_loc).column;
+ auto val = column->get_data_at(row);
+ if (val.data != nullptr) {
+ hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type,
+ hash_val);
+ } else {
+ hash_val = HashUtil::zlib_crc_hash_null(hash_val);
+ }
+ }
+ return hash_val % partition.num_buckets;
+ };
+ } else { // random distribution
+ compute_function = [](vectorized::Block* block, uint32_t row,
+ const VOlapTablePartition& partition) -> uint32_t {
+ if (partition.load_tablet_idx == -1) {
+ // load_to_single_tablet = false, just do random
+ return butil::fast_rand() % partition.num_buckets;
+ }
+ // load_to_single_tablet = ture, do round-robin
+ return partition.load_tablet_idx % partition.num_buckets;
+ };
+ }
- uint32_t find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const;
+ if (partition_tablets_buffer == nullptr) {
+ for (auto index : indexes) {
+ tablet_indexes[index] = compute_function(block, index, *partitions[index]);
+ }
+ } else { // use buffer
+ for (auto index : indexes) {
+ auto& partition_id = partitions[index]->id;
+ if (auto it = partition_tablets_buffer->find(partition_id);
+ it != partition_tablets_buffer->end()) {
+ tablet_indexes[index] = it->second; // tablet
+ }
+ // compute and save in buffer
+ (*partition_tablets_buffer)[partition_id] = tablet_indexes[index] =
+ compute_function(block, index, *partitions[index]);
+ }
+ }
+ }
const std::vector& get_partitions() const { return _partitions; }
@@ -193,8 +265,6 @@ class VOlapTablePartitionParam {
Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos);
- std::function _compute_tablet_index;
-
// check if this partition contain this key
bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const;
diff --git a/be/src/exprs/json_functions.cpp b/be/src/exprs/json_functions.cpp
index 30608adeb2546a..ace18f1090f1cc 100644
--- a/be/src/exprs/json_functions.cpp
+++ b/be/src/exprs/json_functions.cpp
@@ -21,6 +21,8 @@
#include
#include
#include
+#include
+#include
#include
#include // IWYU pragma: keep
#include
@@ -32,7 +34,6 @@
#include
#include
-// IWYU pragma: no_include
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
@@ -316,4 +317,33 @@ Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
return Status::OK();
}
+std::string JsonFunctions::print_json_value(const rapidjson::Value& value) {
+ rapidjson::StringBuffer buffer;
+ buffer.Clear();
+ rapidjson::Writer writer(buffer);
+ value.Accept(writer);
+ return std::string(buffer.GetString());
+}
+
+void JsonFunctions::merge_objects(rapidjson::Value& dst_object, rapidjson::Value& src_object,
+ rapidjson::Document::AllocatorType& allocator) {
+ if (!src_object.IsObject()) {
+ return;
+ }
+ for (auto src_it = src_object.MemberBegin(); src_it != src_object.MemberEnd(); ++src_it) {
+ auto dst_it = dst_object.FindMember(src_it->name);
+ if (dst_it != dst_object.MemberEnd()) {
+ if (src_it->value.IsObject()) {
+ merge_objects(dst_it->value, src_it->value, allocator);
+ } else {
+ if (dst_it->value.IsNull()) {
+ dst_it->value = src_it->value;
+ }
+ }
+ } else {
+ dst_object.AddMember(src_it->name, src_it->value, allocator);
+ }
+ }
+}
+
} // namespace doris
diff --git a/be/src/exprs/json_functions.h b/be/src/exprs/json_functions.h
index fcec257142fc7b..72aa522ff374fa 100644
--- a/be/src/exprs/json_functions.h
+++ b/be/src/exprs/json_functions.h
@@ -108,6 +108,13 @@ class JsonFunctions {
static Status extract_from_object(simdjson::ondemand::object& obj,
const std::vector& jsonpath,
simdjson::ondemand::value* value) noexcept;
+ // src: {"a" : "b" {"c" : 1}, "e" : 123}
+ // dst: {"a" : "b" {"d" : 1}}
+ // merged: {"a" : "b" : {"c" : 1, "d" : 1}, "e" : 123}
+ static void merge_objects(rapidjson::Value& dst_object, rapidjson::Value& src_object,
+ rapidjson::Document::AllocatorType& allocator);
+
+ static std::string print_json_value(const rapidjson::Value& value);
private:
static rapidjson::Value* match_value(const std::vector& parsed_paths,
diff --git a/be/src/exprs/math_functions.cpp b/be/src/exprs/math_functions.cpp
index bbfdc6053dc950..202a6eabea3e80 100644
--- a/be/src/exprs/math_functions.cpp
+++ b/be/src/exprs/math_functions.cpp
@@ -23,7 +23,6 @@
#include
#include
-// IWYU pragma: no_include
#include "common/compiler_util.h" // IWYU pragma: keep
// IWYU pragma: no_include
#include
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c6e64fd0e55ef3..1c7cb4b5c13b04 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -42,6 +42,7 @@
#include "exprs/hybrid_set.h"
#include "exprs/minmax_predicate.h"
#include "gutil/strings/substitute.h"
+#include "pipeline/pipeline_x/dependency.h"
#include "runtime/define_primitive_type.h"
#include "runtime/large_int_value.h"
#include "runtime/primitive_type.h"
@@ -62,7 +63,6 @@
#include "vec/exprs/vliteral.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
#include "vec/runtime/shared_hash_table_controller.h"
-
namespace doris {
// PrimitiveType-> PColumnType
@@ -1185,17 +1185,17 @@ bool IRuntimeFilter::await() {
return true;
}
+// NOTE: Wait infinitely will not make scan task wait really forever.
+// Because BlockTaskSchedule will make it run when query is timedout.
+bool IRuntimeFilter::wait_infinitely() const {
+ // bitmap filter is precise filter and only filter once, so it must be applied.
+ return _wait_infinitely ||
+ (_wrapper != nullptr && _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER);
+}
+
bool IRuntimeFilter::is_ready_or_timeout() {
DCHECK(is_consumer());
auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
- auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000
- : _state->execution_timeout() * 1000;
- auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms()
- : _state->runtime_filter_wait_time_ms();
- // bitmap filter is precise filter and only filter once, so it must be applied.
- int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
- ? execution_timeout
- : runtime_filter_wait_time_ms;
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
if (!_enable_pipeline_exec) {
_rf_state = RuntimeFilterState::TIME_OUT;
@@ -1212,7 +1212,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
if (is_ready()) {
return true;
}
- bool timeout = wait_times_ms <= ms_since_registration;
+ bool timeout = wait_infinitely() ? false : _rf_wait_time_ms <= ms_since_registration;
auto expected = RuntimeFilterState::NOT_READY;
if (timeout) {
if (!_rf_state_atomic.compare_exchange_strong(expected, RuntimeFilterState::TIME_OUT,
@@ -1235,6 +1235,11 @@ void IRuntimeFilter::signal() {
DCHECK(is_consumer());
if (_enable_pipeline_exec) {
_rf_state_atomic.store(RuntimeFilterState::READY);
+ if (!_filter_timer.empty()) {
+ for (auto& timer : _filter_timer) {
+ timer->call_ready();
+ }
+ }
} else {
std::unique_lock lock(_inner_mutex);
_rf_state = RuntimeFilterState::READY;
@@ -1255,6 +1260,10 @@ void IRuntimeFilter::signal() {
}
}
+void IRuntimeFilter::set_filter_timer(std::shared_ptr timer) {
+ _filter_timer.push_back(timer);
+}
+
BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index fdd3b02ad6351a..7a65706f5ad851 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -69,6 +69,10 @@ class VExprContext;
struct SharedRuntimeFilterContext;
} // namespace vectorized
+namespace pipeline {
+class RuntimeFilterTimer;
+} // namespace pipeline
+
enum class RuntimeFilterType {
UNKNOWN_FILTER = -1,
IN_FILTER = 0,
@@ -207,6 +211,8 @@ class IRuntimeFilter {
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
+ _wait_infinitely(_state->runtime_filter_wait_infinitely()),
+ _rf_wait_time_ms(_state->runtime_filter_wait_time_ms()),
_enable_pipeline_exec(_state->enable_pipeline_exec()),
_profile(new RuntimeProfile(_name)) {
if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) {
@@ -232,6 +238,8 @@ class IRuntimeFilter {
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
+ _wait_infinitely(query_ctx->runtime_filter_wait_infinitely()),
+ _rf_wait_time_ms(query_ctx->runtime_filter_wait_time_ms()),
_enable_pipeline_exec(query_ctx->enable_pipeline_exec()),
_profile(new RuntimeProfile(_name)) {
if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) {
@@ -384,6 +392,25 @@ class IRuntimeFilter {
}
}
+ // For pipelineX & Producer
+ int32_t wait_time_ms() const {
+ int32_t res = 0;
+ if (wait_infinitely()) {
+ res = _state == nullptr ? _query_ctx->execution_timeout() : _state->execution_timeout();
+ // Convert to ms
+ res *= 1000;
+ } else {
+ res = _rf_wait_time_ms;
+ }
+ return res;
+ }
+
+ bool wait_infinitely() const;
+
+ int64_t registration_time() const { return registration_time_; }
+
+ void set_filter_timer(std::shared_ptr);
+
protected:
// serialize _wrapper to protobuf
void to_protobuf(PInFilter* filter);
@@ -464,6 +491,9 @@ class IRuntimeFilter {
/// Time in ms (from MonotonicMillis()), that the filter was registered.
const int64_t registration_time_;
+ /// runtime filter wait time will be ignored if wait_infinitly is true
+ const bool _wait_infinitely;
+ const int32_t _rf_wait_time_ms;
const bool _enable_pipeline_exec;
@@ -475,6 +505,8 @@ class IRuntimeFilter {
// only effect on consumer
std::unique_ptr _profile;
bool _opt_remote_rf;
+
+ std::vector> _filter_timer;
};
// avoid expose RuntimePredicateWrapper
diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp
index 2220ca86060a2d..00540b8382c667 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -74,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
_rpc_context->request.set_filter_id(_filter_id);
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
_rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
- _rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms());
+ _rpc_context->cntl.set_timeout_ms(wait_time_ms());
_rpc_context->cid = _rpc_context->cntl.call_id();
Status serialize_status = serialize(&_rpc_context->request, &data, &len);
diff --git a/be/src/gutil/endian.h b/be/src/gutil/endian.h
index 66d849f73cd554..f1a9cf2a1a2da1 100644
--- a/be/src/gutil/endian.h
+++ b/be/src/gutil/endian.h
@@ -61,7 +61,8 @@ inline unsigned __int128 gbswap_128(unsigned __int128 host_int) {
}
inline wide::UInt256 gbswap_256(wide::UInt256 host_int) {
- wide::UInt256 result{gbswap_64(host_int.items[0]), gbswap_64(host_int.items[1]), gbswap_64(host_int.items[2]), gbswap_64(host_int.items[3])};
+ wide::UInt256 result{gbswap_64(host_int.items[3]), gbswap_64(host_int.items[2]),
+ gbswap_64(host_int.items[1]), gbswap_64(host_int.items[0])};
return result;
}
diff --git a/be/src/http/action/download_action.cpp b/be/src/http/action/download_action.cpp
index f2068d83c7bbb1..f271b4f1916708 100644
--- a/be/src/http/action/download_action.cpp
+++ b/be/src/http/action/download_action.cpp
@@ -33,13 +33,20 @@
#include "runtime/exec_env.h"
namespace doris {
-
-const std::string FILE_PARAMETER = "file";
-const std::string TOKEN_PARAMETER = "token";
-
-DownloadAction::DownloadAction(ExecEnv* exec_env, const std::vector& allow_dirs,
- int32_t num_workers)
- : _exec_env(exec_env), _download_type(NORMAL), _num_workers(num_workers) {
+namespace {
+static const std::string FILE_PARAMETER = "file";
+static const std::string TOKEN_PARAMETER = "token";
+static const std::string CHANNEL_PARAMETER = "channel";
+static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
+} // namespace
+
+DownloadAction::DownloadAction(ExecEnv* exec_env,
+ std::shared_ptr rate_limit_group,
+ const std::vector& allow_dirs, int32_t num_workers)
+ : _exec_env(exec_env),
+ _download_type(NORMAL),
+ _num_workers(num_workers),
+ _rate_limit_group(std::move(rate_limit_group)) {
for (auto& dir : allow_dirs) {
std::string p;
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
@@ -107,7 +114,13 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par
if (is_dir) {
do_dir_response(file_param, req);
} else {
- do_file_response(file_param, req);
+ const auto& channel = req->param(CHANNEL_PARAMETER);
+ bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
+ if (ingest_binlog) {
+ do_file_response(file_param, req, _rate_limit_group.get());
+ } else {
+ do_file_response(file_param, req);
+ }
}
}
diff --git a/be/src/http/action/download_action.h b/be/src/http/action/download_action.h
index d8e468d95851aa..3aab1a0d31423d 100644
--- a/be/src/http/action/download_action.h
+++ b/be/src/http/action/download_action.h
@@ -24,6 +24,8 @@
#include "http/http_handler.h"
#include "util/threadpool.h"
+struct bufferevent_rate_limit_group;
+
namespace doris {
class ExecEnv;
@@ -36,8 +38,9 @@ class HttpRequest;
// We use parameter named 'file' to specify the static resource path, it is an absolute path.
class DownloadAction : public HttpHandler {
public:
- DownloadAction(ExecEnv* exec_env, const std::vector& allow_dirs,
- int32_t num_workers = 0);
+ DownloadAction(ExecEnv* exec_env,
+ std::shared_ptr rate_limit_group,
+ const std::vector& allow_dirs, int32_t num_workers = 0);
// for load error
DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir);
@@ -67,6 +70,8 @@ class DownloadAction : public HttpHandler {
std::string _error_log_root_dir;
int32_t _num_workers;
std::unique_ptr _download_workers;
+
+ std::shared_ptr _rate_limit_group {nullptr};
}; // end class DownloadAction
} // end namespace doris
diff --git a/be/src/http/action/download_binlog_action.cpp b/be/src/http/action/download_binlog_action.cpp
index a23d5ec109f907..697512b2a301ee 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -21,8 +21,10 @@
#include
#include
+#include
#include
#include
+#include
#include
#include "common/config.h"
@@ -96,7 +98,7 @@ void handle_get_binlog_info(HttpRequest* req) {
}
/// handle get segment file, need tablet_id, rowset_id && index
-void handle_get_segment_file(HttpRequest* req) {
+void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_file_path;
try {
@@ -125,7 +127,7 @@ void handle_get_segment_file(HttpRequest* req) {
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
return;
}
- do_file_response(segment_file_path, req);
+ do_file_response(segment_file_path, req, rate_limit_group);
}
void handle_get_rowset_meta(HttpRequest* req) {
@@ -149,7 +151,9 @@ void handle_get_rowset_meta(HttpRequest* req) {
} // namespace
-DownloadBinlogAction::DownloadBinlogAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
+DownloadBinlogAction::DownloadBinlogAction(
+ ExecEnv* exec_env, std::shared_ptr rate_limit_group)
+ : _exec_env(exec_env), _rate_limit_group(std::move(rate_limit_group)) {}
void DownloadBinlogAction::handle(HttpRequest* req) {
VLOG_CRITICAL << "accept one download binlog request " << req->debug_string();
@@ -178,7 +182,7 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
if (method == "get_binlog_info") {
handle_get_binlog_info(req);
} else if (method == "get_segment_file") {
- handle_get_segment_file(req);
+ handle_get_segment_file(req, _rate_limit_group.get());
} else if (method == "get_rowset_meta") {
handle_get_rowset_meta(req);
} else {
diff --git a/be/src/http/action/download_binlog_action.h b/be/src/http/action/download_binlog_action.h
index 3cbd9b9e5b0fb9..77a2ed0878059a 100644
--- a/be/src/http/action/download_binlog_action.h
+++ b/be/src/http/action/download_binlog_action.h
@@ -17,12 +17,15 @@
#pragma once
+#include
#include
#include
#include "common/status.h"
#include "http/http_handler.h"
+struct bufferevent_rate_limit_group;
+
namespace doris {
class ExecEnv;
@@ -30,7 +33,8 @@ class HttpRequest;
class DownloadBinlogAction : public HttpHandler {
public:
- DownloadBinlogAction(ExecEnv* exec_env);
+ DownloadBinlogAction(ExecEnv* exec_env,
+ std::shared_ptr rate_limit_group);
virtual ~DownloadBinlogAction() = default;
void handle(HttpRequest* req) override;
@@ -40,6 +44,7 @@ class DownloadBinlogAction : public HttpHandler {
private:
ExecEnv* _exec_env;
+ std::shared_ptr _rate_limit_group {nullptr};
};
} // namespace doris
diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp
index 067f8c5d28db41..d81f42824c2bbd 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -167,7 +167,8 @@ int HttpStreamAction::on_header(HttpRequest* req) {
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;
- ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true");
+ ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
+ config::wait_internal_group_commit_finish;
ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false;
diff --git a/be/src/http/action/pad_rowset_action.cpp b/be/src/http/action/pad_rowset_action.cpp
index 95164f53fa5a1c..52604f23ff4fc1 100644
--- a/be/src/http/action/pad_rowset_action.cpp
+++ b/be/src/http/action/pad_rowset_action.cpp
@@ -99,14 +99,13 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet, const Version& versi
return Status::InternalError("Input version {} exists", version.to_string());
}
- std::unique_ptr writer;
RowsetWriterContext ctx;
ctx.version = version;
ctx.rowset_state = VISIBLE;
ctx.segments_overlap = NONOVERLAPPING;
ctx.tablet_schema = tablet->tablet_schema();
ctx.newest_write_timestamp = UnixSeconds();
- RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer));
+ auto writer = DORIS_TRY(tablet->create_rowset_writer(ctx, false));
RowsetSharedPtr rowset;
RETURN_IF_ERROR(writer->build(rowset));
rowset->make_visible(version);
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 99a98afbc37b8d..896a1f27755c32 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -76,6 +76,9 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);
+static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
+static const string CHUNK = "chunked";
+
#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
#endif
@@ -185,8 +188,9 @@ int StreamLoadAction::on_header(HttpRequest* req) {
url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
ctx->label = req->header(HTTP_LABEL_KEY);
Status st = Status::OK();
- if (iequal(req->header(HTTP_GROUP_COMMIT), "true")) {
- if (!ctx->label.empty()) {
+ if (iequal(req->header(HTTP_GROUP_COMMIT), "true") ||
+ config::wait_internal_group_commit_finish) {
+ if (iequal(req->header(HTTP_GROUP_COMMIT), "true") && !ctx->label.empty()) {
st = Status::InternalError("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
@@ -292,6 +296,12 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrheader(HttpHeaders::TRANSFER_ENCODING).empty()) {
+ if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos) {
+ ctx->is_chunked_transfer = true;
+ }
+ }
+
if (!http_req->header(HTTP_TIMEOUT).empty()) {
try {
ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
@@ -369,9 +379,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_header_type(ctx->header_type);
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
- auto pipe = std::make_shared(
- io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
- ctx->body_bytes /* total_length */);
+ std::shared_ptr pipe;
+ if (ctx->is_chunked_transfer) {
+ pipe = std::make_shared(
+ io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
+ } else {
+ pipe = std::make_shared(
+ io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
+ MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* total_length */);
+ }
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
ctx->pipe = pipe;
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index 5d231cb9087ecd..1bbd2c0e178fd2 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -84,7 +84,17 @@ static int on_connection(struct evhttp_request* req, void* param) {
EvHttpServer::EvHttpServer(int port, int num_workers)
: _port(port), _num_workers(num_workers), _real_port(0) {
_host = BackendOptions::get_service_bind_address();
+
+ evthread_use_pthreads();
DCHECK_GT(_num_workers, 0);
+ _event_bases.resize(_num_workers);
+ for (int i = 0; i < _num_workers; ++i) {
+ std::shared_ptr base(event_base_new(),
+ [](event_base* base) { event_base_free(base); });
+ CHECK(base != nullptr) << "Couldn't create an event_base.";
+ std::lock_guard lock(_event_bases_lock);
+ _event_bases[i] = base;
+ }
}
EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
@@ -107,34 +117,28 @@ void EvHttpServer::start() {
.set_min_threads(_num_workers)
.set_max_threads(_num_workers)
.build(&_workers));
-
- evthread_use_pthreads();
- _event_bases.resize(_num_workers);
for (int i = 0; i < _num_workers; ++i) {
- CHECK(_workers->submit_func([this, i]() {
- std::shared_ptr base(event_base_new(), [](event_base* base) {
- event_base_free(base);
- });
- CHECK(base != nullptr) << "Couldn't create an event_base.";
- {
- std::lock_guard lock(_event_bases_lock);
- _event_bases[i] = base;
- }
-
- /* Create a new evhttp object to handle requests. */
- std::shared_ptr http(evhttp_new(base.get()),
- [](evhttp* http) { evhttp_free(http); });
- CHECK(http != nullptr) << "Couldn't create an evhttp.";
-
- auto res = evhttp_accept_socket(http.get(), _server_fd);
- CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
-
- evhttp_set_newreqcb(http.get(), on_connection, this);
- evhttp_set_gencb(http.get(), on_request, this);
-
- event_base_dispatch(base.get());
- })
- .ok());
+ auto status = _workers->submit_func([this, i]() {
+ std::shared_ptr base;
+ {
+ std::lock_guard lock(_event_bases_lock);
+ base = _event_bases[i];
+ }
+
+ /* Create a new evhttp object to handle requests. */
+ std::shared_ptr http(evhttp_new(base.get()),
+ [](evhttp* http) { evhttp_free(http); });
+ CHECK(http != nullptr) << "Couldn't create an evhttp.";
+
+ auto res = evhttp_accept_socket(http.get(), _server_fd);
+ CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
+
+ evhttp_set_newreqcb(http.get(), on_connection, this);
+ evhttp_set_gencb(http.get(), on_request, this);
+
+ event_base_dispatch(base.get());
+ });
+ CHECK(status.ok());
}
}
diff --git a/be/src/http/ev_http_server.h b/be/src/http/ev_http_server.h
index e7ad1c052ab16a..d74a8cb4efd283 100644
--- a/be/src/http/ev_http_server.h
+++ b/be/src/http/ev_http_server.h
@@ -55,6 +55,11 @@ class EvHttpServer {
// get real port
int get_real_port() const { return _real_port; }
+ std::vector> get_event_bases() {
+ std::lock_guard lock(_event_bases_lock);
+ return _event_bases;
+ }
+
private:
Status _bind();
HttpHandler* _find_handler(HttpRequest* req);
diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp
index 5727ba3902efec..96679195316dac 100644
--- a/be/src/http/http_channel.cpp
+++ b/be/src/http/http_channel.cpp
@@ -18,6 +18,7 @@
#include "http/http_channel.h"
#include
+#include
#include
#include
@@ -69,11 +70,17 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std:
evbuffer_free(evb);
}
-void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size) {
+void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size,
+ bufferevent_rate_limit_group* rate_limit_group) {
auto evb = evbuffer_new();
evbuffer_add_file(evb, fd, off, size);
- evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK,
- default_reason(HttpStatus::OK).c_str(), evb);
+ auto* evhttp_request = request->get_evhttp_request();
+ if (rate_limit_group) {
+ auto* evhttp_connection = evhttp_request_get_connection(evhttp_request);
+ auto* buffer_event = evhttp_connection_get_bufferevent(evhttp_connection);
+ bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
+ }
+ evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb);
evbuffer_free(evb);
}
diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h
index 478f013af82079..ee1e6c0888f1d3 100644
--- a/be/src/http/http_channel.h
+++ b/be/src/http/http_channel.h
@@ -23,6 +23,7 @@
#include "http/http_status.h"
+struct bufferevent_rate_limit_group;
namespace doris {
class HttpRequest;
@@ -43,7 +44,8 @@ class HttpChannel {
static void send_reply(HttpRequest* request, HttpStatus status, const std::string& content);
- static void send_file(HttpRequest* request, int fd, size_t off, size_t size);
+ static void send_file(HttpRequest* request, int fd, size_t off, size_t size,
+ bufferevent_rate_limit_group* rate_limit_group = nullptr);
static bool compress_content(const std::string& accept_encoding, const std::string& input,
std::string* output);
diff --git a/be/src/http/http_handler_with_auth.cpp b/be/src/http/http_handler_with_auth.cpp
index 9d93b823c6b0e2..6a4b28beb279d4 100644
--- a/be/src/http/http_handler_with_auth.cpp
+++ b/be/src/http/http_handler_with_auth.cpp
@@ -64,13 +64,16 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {
#ifndef BE_TEST
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
- RETURN_WITH_WARN_IF_ERROR(
- ThriftRpcHelper::rpc(
- master_addr.hostname, master_addr.port,
- [&auth_result, &auth_request](FrontendServiceConnection& client) {
- client->checkAuth(auth_result, auth_request);
- }),
- -1, "checkAuth failed");
+ {
+ auto status = ThriftRpcHelper::rpc(
+ master_addr.hostname, master_addr.port,
+ [&auth_result, &auth_request](FrontendServiceConnection& client) {
+ client->checkAuth(auth_result, auth_request);
+ });
+ if (!status) {
+ return -1;
+ }
+ }
#else
CHECK(_exec_env == nullptr);
#endif
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index f55b5d47696c11..31550456c552ea 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -124,7 +124,8 @@ std::string get_content_type(const std::string& file_name) {
return "";
}
-void do_file_response(const std::string& file_path, HttpRequest* req) {
+void do_file_response(const std::string& file_path, HttpRequest* req,
+ bufferevent_rate_limit_group* rate_limit_group) {
if (file_path.find("..") != std::string::npos) {
LOG(WARNING) << "Not allowed to read relative path: " << file_path;
HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
@@ -165,7 +166,7 @@ void do_file_response(const std::string& file_path, HttpRequest* req) {
return;
}
- HttpChannel::send_file(req, fd, 0, file_size);
+ HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
}
void do_dir_response(const std::string& dir_path, HttpRequest* req) {
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 5928039c492b2d..2d1e13fbe4e78e 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -22,6 +22,8 @@
#include "common/utils.h"
#include "http/http_request.h"
+struct bufferevent_rate_limit_group;
+
namespace doris {
struct AuthInfo;
@@ -34,7 +36,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth);
-void do_file_response(const std::string& dir_path, HttpRequest* req);
+void do_file_response(const std::string& dir_path, HttpRequest* req,
+ bufferevent_rate_limit_group* rate_limit_group = nullptr);
void do_dir_response(const std::string& dir_path, HttpRequest* req);
diff --git a/be/src/index-tools/index_tool.cpp b/be/src/index-tools/index_tool.cpp
index 2fd51feb63a9ad..c356449175994f 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -19,13 +19,16 @@
#include
#include
+#include
#include
#include
+#include
#include
#include
#include
#include "io/fs/local_file_system.h"
+#include "olap/rowset/segment_v2/inverted_index/query/conjunction_query.h"
#include "olap/rowset/segment_v2/inverted_index_compound_directory.h"
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
@@ -60,6 +63,16 @@ std::string get_usage(const std::string& progname) {
return ss.str();
}
+std::vector split(const std::string& s, char delimiter) {
+ std::vector tokens;
+ std::string token;
+ std::istringstream tokenStream(s);
+ while (getline(tokenStream, token, delimiter)) {
+ tokens.push_back(token);
+ }
+ return tokens;
+}
+
void search(lucene::store::Directory* dir, std::string& field, std::string& token,
std::string& pred) {
IndexReader* reader = IndexReader::open(dir);
@@ -72,36 +85,59 @@ void search(lucene::store::Directory* dir, std::string& field, std::string& toke
IndexSearcher s(reader);
std::unique_ptr query;
+ std::cout << "version: " << (int32_t)(reader->getIndexVersion()) << std::endl;
+
std::wstring field_ws(field.begin(), field.end());
- std::wstring token_ws(token.begin(), token.end());
- lucene::index::Term* term = _CLNEW lucene::index::Term(field_ws.c_str(), token_ws.c_str());
- if (pred == "eq" || pred == "match") {
- query.reset(new lucene::search::TermQuery(term));
- } else if (pred == "lt") {
- query.reset(new lucene::search::RangeQuery(nullptr, term, false));
- } else if (pred == "gt") {
- query.reset(new lucene::search::RangeQuery(term, nullptr, false));
- } else if (pred == "le") {
- query.reset(new lucene::search::RangeQuery(nullptr, term, true));
- } else if (pred == "ge") {
- query.reset(new lucene::search::RangeQuery(term, nullptr, true));
+ if (pred == "match_all") {
+ } else if (pred == "match_phrase") {
+ std::vector terms = split(token, '|');
+ auto* phrase_query = new lucene::search::PhraseQuery();
+ for (auto& term : terms) {
+ std::wstring term_ws = StringUtil::string_to_wstring(term);
+ auto* t = _CLNEW lucene::index::Term(field_ws.c_str(), term_ws.c_str());
+ phrase_query->add(t);
+ _CLDECDELETE(t);
+ }
+ query.reset(phrase_query);
} else {
- std::cout << "invalid predicate type:" << pred << std::endl;
- exit(-1);
+ std::wstring token_ws(token.begin(), token.end());
+ lucene::index::Term* term = _CLNEW lucene::index::Term(field_ws.c_str(), token_ws.c_str());
+ if (pred == "eq" || pred == "match") {
+ query.reset(new lucene::search::TermQuery(term));
+ } else if (pred == "lt") {
+ query.reset(new lucene::search::RangeQuery(nullptr, term, false));
+ } else if (pred == "gt") {
+ query.reset(new lucene::search::RangeQuery(term, nullptr, false));
+ } else if (pred == "le") {
+ query.reset(new lucene::search::RangeQuery(nullptr, term, true));
+ } else if (pred == "ge") {
+ query.reset(new lucene::search::RangeQuery(term, nullptr, true));
+ } else {
+ std::cout << "invalid predicate type:" << pred << std::endl;
+ exit(-1);
+ }
+ _CLDECDELETE(term);
}
- _CLDECDELETE(term);
- std::vector result;
- int total = 0;
-
- s._search(query.get(), [&result, &total](const int32_t docid, const float_t /*score*/) {
- // docid equal to rowid in segment
- result.push_back(docid);
- if (FLAGS_print_row_id) {
- printf("RowID is %d\n", docid);
- }
- total += 1;
- });
+ int32_t total = 0;
+ if (pred == "match_all") {
+ roaring::Roaring result;
+ std::vector terms = split(token, '|');
+ doris::ConjunctionQuery query(s.getReader());
+ query.add(field_ws, terms);
+ query.search(result);
+ total += result.cardinality();
+ } else {
+ roaring::Roaring result;
+ s._search(query.get(), [&result](const int32_t docid, const float_t /*score*/) {
+ // docid equal to rowid in segment
+ result.add(docid);
+ if (FLAGS_print_row_id) {
+ printf("RowID is %d\n", docid);
+ }
+ });
+ total += result.cardinality();
+ }
std::cout << "Term queried count:" << total << std::endl;
s.close();
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp
index 0578c82e34e55e..9d81e5c539f458 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -27,7 +27,6 @@
#include
-// IWYU pragma: no_include
#include "common/compiler_util.h" // IWYU pragma: keep
// IWYU pragma: no_include
#include // IWYU pragma: keep
diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp b/be/src/io/cache/block/cached_remote_file_reader.cpp
index fd19f88af631cf..bbd7516dfaaf01 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -26,7 +26,6 @@
#include
#include
-// IWYU pragma: no_include
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "io/cache/block/block_file_cache.h"
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 4648309fc57c03..63ddc3d43096be 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -175,7 +175,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
} else {
pipe_id = runtime_state->fragment_instance_id();
}
- *file_reader = multi_table_pipe->getPipe(pipe_id);
+ *file_reader = multi_table_pipe->get_pipe(pipe_id);
LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id
<< " pipe: " << (*file_reader).get();
diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp
index 73d79b703886fc..4d370cfb4d8387 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -28,26 +28,22 @@
#include
#include
-// IWYU pragma: no_include
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/broker_file_system.h"
#include "util/doris_metrics.h"
-namespace doris {
-namespace io {
+namespace doris::io {
struct IOContext;
-BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path,
- size_t file_size, TBrokerFD fd,
- std::shared_ptr fs)
- : _path(path),
+BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size,
+ TBrokerFD fd, std::shared_ptr fs)
+ : _path(std::move(path)),
_file_size(file_size),
_broker_addr(broker_addr),
_fd(fd),
_fs(std::move(fs)) {
- static_cast(_fs->get_client(&_client));
DorisMetrics::instance()->broker_file_open_reading->increment(1);
DorisMetrics::instance()->broker_file_reader_total->increment(1);
}
@@ -59,32 +55,7 @@ BrokerFileReader::~BrokerFileReader() {
Status BrokerFileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
- TBrokerCloseReaderRequest request;
- request.__set_version(TBrokerVersion::VERSION_ONE);
- request.__set_fd(_fd);
-
- TBrokerOperationStatus response;
- try {
- try {
- (*_client)->closeReader(response, request);
- } catch (apache::thrift::transport::TTransportException&) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->closeReader(response, request);
- }
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "Close broker reader failed, broker:" << _broker_addr << " failed:" << e.what();
- return Status::RpcError(ss.str());
- }
-
- if (response.statusCode != TBrokerOperationStatusCode::OK) {
- std::stringstream ss;
- ss << "close broker reader failed, broker:" << _broker_addr
- << " failed:" << response.message;
- return Status::InternalError(ss.str());
- }
-
+ RETURN_IF_ERROR(_fs->close_file(_fd));
DorisMetrics::instance()->broker_file_open_reading->increment(-1);
}
return Status::OK();
@@ -100,45 +71,12 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes
return Status::OK();
}
- TBrokerPReadRequest request;
- request.__set_version(TBrokerVersion::VERSION_ONE);
- request.__set_fd(_fd);
- request.__set_offset(offset);
- request.__set_length(bytes_req);
-
- TBrokerReadResponse response;
- try {
- VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset
- << ", read bytes length:" << bytes_req;
- try {
- (*_client)->pread(response, request);
- } catch (apache::thrift::transport::TTransportException& e) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- RETURN_IF_ERROR((*_client).reopen());
- LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what();
- (*_client)->pread(response, request);
- }
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "Open broker reader failed, broker:" << _broker_addr << " failed:" << e.what();
- return Status::RpcError(ss.str());
- }
-
- if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) {
- // read the end of broker's file
- *bytes_read = 0;
- return Status::OK();
- } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
- std::stringstream ss;
- ss << "Open broker reader failed, broker:" << _broker_addr
- << " failed:" << response.opStatus.message;
- return Status::InternalError(ss.str());
- }
+ std::string data;
+ RETURN_IF_ERROR(_fs->read_file(_fd, offset, bytes_req, &data));
- *bytes_read = response.data.size();
- memcpy(to, response.data.data(), *bytes_read);
+ *bytes_read = data.size();
+ memcpy(to, data.data(), *bytes_read);
return Status::OK();
}
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h
index ff3d3d2273ee58..7acdcbcc0d578f 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -32,15 +32,14 @@
#include "runtime/client_cache.h"
#include "util/slice.h"
-namespace doris {
-namespace io {
+namespace doris::io {
struct IOContext;
class BrokerFileReader : public FileReader {
public:
- BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, size_t file_size,
- TBrokerFD fd, std::shared_ptr fs);
+ BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd,
+ std::shared_ptr fs);
~BrokerFileReader() override;
@@ -66,8 +65,6 @@ class BrokerFileReader : public FileReader {
TBrokerFD _fd;
std::shared_ptr _fs;
- std::shared_ptr _client;
std::atomic _closed = false;
};
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp
index f35f9b0e69a956..863dc9c19e60dd 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -45,8 +45,7 @@
#include "runtime/exec_env.h"
#include "util/slice.h"
-namespace doris {
-namespace io {
+namespace doris::io {
#ifdef BE_TEST
inline BrokerServiceClientCache* client_cache() {
@@ -70,8 +69,8 @@ inline const std::string& client_id(const TNetworkAddress& addr) {
#ifndef CHECK_BROKER_CLIENT
#define CHECK_BROKER_CLIENT(client) \
- if (!client) { \
- return Status::IOError("init Broker client error"); \
+ if (!client || !client->is_alive()) { \
+ return Status::IOError("connect to broker failed"); \
}
#endif
@@ -90,8 +89,8 @@ BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& broker_addr,
Status BrokerFileSystem::connect_impl() {
Status status = Status::OK();
- _client.reset(new BrokerServiceConnection(client_cache(), _broker_addr,
- config::thrift_rpc_timeout_ms, &status));
+ _connection = std::make_unique(client_cache(), _broker_addr,
+ config::thrift_rpc_timeout_ms, &status);
return status;
}
@@ -109,7 +108,7 @@ Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* re
RETURN_IF_ERROR(file_size_impl(file, &fsize));
}
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
TBrokerOpenReaderRequest request;
request.__set_version(TBrokerVersion::VERSION_ONE);
request.__set_path(file);
@@ -121,11 +120,11 @@ Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* re
try {
Status status;
try {
- (*_client)->openReader(*response, request);
+ (*_connection)->openReader(*response, request);
} catch (apache::thrift::transport::TTransportException&) {
std::this_thread::sleep_for(std::chrono::seconds(1));
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->openReader(*response, request);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->openReader(*response, request);
}
} catch (apache::thrift::TException& e) {
return Status::RpcError("failed to open file {}: {}", file.native(), error_msg(e.what()));
@@ -146,7 +145,7 @@ Status BrokerFileSystem::create_directory_impl(const Path& /*path*/, bool /*fail
}
Status BrokerFileSystem::delete_file_impl(const Path& file) {
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
try {
// rm file from remote path
TBrokerDeletePathRequest del_req;
@@ -156,10 +155,10 @@ Status BrokerFileSystem::delete_file_impl(const Path& file) {
del_req.__set_properties(_broker_prop);
try {
- (*_client)->deletePath(del_rep, del_req);
+ (*_connection)->deletePath(del_rep, del_req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->deletePath(del_rep, del_req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->deletePath(del_rep, del_req);
}
if (del_rep.statusCode == TBrokerOperationStatusCode::OK) {
@@ -186,7 +185,7 @@ Status BrokerFileSystem::batch_delete_impl(const std::vector& files) {
}
Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const {
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
*res = false;
try {
TBrokerCheckPathExistRequest check_req;
@@ -196,10 +195,10 @@ Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const {
check_req.__set_properties(_broker_prop);
try {
- (*_client)->checkPathExist(check_rep, check_req);
+ (*_connection)->checkPathExist(check_rep, check_req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->checkPathExist(check_rep, check_req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->checkPathExist(check_rep, check_req);
}
if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
@@ -219,7 +218,7 @@ Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const {
}
Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size) const {
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
try {
TBrokerFileSizeRequest req;
req.__set_version(TBrokerVersion::VERSION_ONE);
@@ -228,10 +227,10 @@ Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size) co
TBrokerFileSizeResponse resp;
try {
- (*_client)->fileSize(resp, req);
+ (*_connection)->fileSize(resp, req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->fileSize(resp, req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->fileSize(resp, req);
}
if (resp.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
@@ -256,7 +255,7 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector<
if (!(*exists)) {
return Status::OK();
}
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
Status status = Status::OK();
try {
// get existing files from remote path
@@ -269,10 +268,10 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector<
list_req.__set_fileNameOnly(true); // we only need file name, not abs path
try {
- (*_client)->listPath(list_rep, list_req);
+ (*_connection)->listPath(list_rep, list_req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->listPath(list_rep, list_req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->listPath(list_rep, list_req);
}
if (list_rep.opStatus.statusCode == TBrokerOperationStatusCode::FILE_NOT_FOUND) {
@@ -309,7 +308,7 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool only_file, std::vector<
}
Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
try {
TBrokerOperationStatus op_status;
TBrokerRenamePathRequest rename_req;
@@ -319,10 +318,10 @@ Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path& new_name
rename_req.__set_properties(_broker_prop);
try {
- (*_client)->renamePath(op_status, rename_req);
+ (*_connection)->renamePath(op_status, rename_req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->renamePath(op_status, rename_req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->renamePath(op_status, rename_req);
}
if (op_status.statusCode != TBrokerOperationStatusCode::OK) {
@@ -465,9 +464,76 @@ Status BrokerFileSystem::direct_download_impl(const Path& remote_file, std::stri
return Status::OK();
}
-Status BrokerFileSystem::get_client(std::shared_ptr* client) const {
- CHECK_BROKER_CLIENT(_client);
- *client = _client;
+Status BrokerFileSystem::read_file(const TBrokerFD& fd, size_t offset, size_t bytes_req,
+ std::string* data) const {
+ if (data == nullptr) {
+ return Status::InvalidArgument("data should be not null");
+ }
+ CHECK_BROKER_CLIENT(_connection);
+ TBrokerPReadRequest request;
+ request.__set_version(TBrokerVersion::VERSION_ONE);
+ request.__set_fd(fd);
+ request.__set_offset(offset);
+ request.__set_length(bytes_req);
+
+ TBrokerReadResponse response;
+ try {
+ VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset
+ << ", read bytes length:" << bytes_req;
+ try {
+ (*_connection)->pread(response, request);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ RETURN_IF_ERROR((*_connection).reopen());
+ LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what();
+ (*_connection)->pread(response, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "read broker file failed, broker:" << _broker_addr << " failed:" << e.what();
+ return Status::RpcError(ss.str());
+ }
+
+ if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) {
+ // read the end of broker's file
+ return Status::OK();
+ }
+ if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "Open broker reader failed, broker:" << _broker_addr
+ << " failed:" << response.opStatus.message;
+ return Status::InternalError(ss.str());
+ }
+ *data = std::move(response.data);
+ return Status::OK();
+}
+
+Status BrokerFileSystem::close_file(const TBrokerFD& fd) const {
+ CHECK_BROKER_CLIENT(_connection);
+ TBrokerCloseReaderRequest request;
+ request.__set_version(TBrokerVersion::VERSION_ONE);
+ request.__set_fd(fd);
+
+ TBrokerOperationStatus response;
+ try {
+ try {
+ (*_connection)->closeReader(response, request);
+ } catch (apache::thrift::transport::TTransportException&) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->closeReader(response, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "close broker file failed, broker:" << _broker_addr << " failed:" << e.what();
+ return Status::RpcError(ss.str());
+ }
+
+ if (response.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "close broker file failed, broker:" << _broker_addr << " failed:" << response.message;
+ return Status::InternalError(ss.str());
+ }
return Status::OK();
}
@@ -475,5 +541,4 @@ std::string BrokerFileSystem::error_msg(const std::string& err) const {
return fmt::format("({}:{}), {}", _broker_addr.hostname, _broker_addr.port, err);
}
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h
index 1da18cb29e9a76..4c4b9604e8df5d 100644
--- a/be/src/io/fs/broker_file_system.h
+++ b/be/src/io/fs/broker_file_system.h
@@ -19,6 +19,7 @@
#include
+#include
#include