Skip to content

Commit

Permalink
Merge branch 'master' into doc_for_recover
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Jan 11, 2025
2 parents 63e00a4 + 9fc6b8d commit 42dc351
Show file tree
Hide file tree
Showing 5,710 changed files with 226,510 additions and 176,381 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
2 changes: 1 addition & 1 deletion .github/workflows/auto-cherry-pick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
pip install PyGithub
- name: Check SHA
run: |
expected_sha="4761d95a336b92e492276d589e580678af8d490d73fa0bd7d53f826aa3bf86b54e2b8725b436bc010aaf14a001e286bcd2b55b3ec0d2668d1e962d8c8b397eab"
expected_sha="5439ca6304c986a5b26e8f48d528253909b417597088fe98969afee5c2eccbe6d60c5cd1eb03d452eb8082ce2a8ff4ea18854770375487a30bd4af7af9afd322"
calculated_sha=$(sha512sum tools/auto-pick-script.py | awk '{ print $1 }')
if [ "$calculated_sha" != "$expected_sha" ]; then
echo "SHA mismatch! Expected: $expected_sha, but got: $calculated_sha"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
sh_checker_comment: true
sh_checker_exclude: .git .github ^docker ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples
sh_checker_exclude: .git .github ^docker/compilation ^docker/runtime ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples

preparation:
name: "Clang Tidy Preparation"
Expand Down
34 changes: 33 additions & 1 deletion .github/workflows/comment-to-trigger-teamcity.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:
"${COMMENT_BODY}" == *'run external'* ||
"${COMMENT_BODY}" == *'run cloud_p0'* ||
"${COMMENT_BODY}" == *'run cloud_p1'* ||
"${COMMENT_BODY}" == *'run vault_p0'* ||
"${COMMENT_BODY}" == *'run arm'* ||
"${COMMENT_BODY}" == *'run performance'* ]]; then
echo "comment_trigger=true" | tee -a "$GITHUB_OUTPUT"
Expand Down Expand Up @@ -86,7 +87,7 @@ jobs:
echo "TARGET_BRANCH='${TARGET_BRANCH}'" | tee -a "$GITHUB_OUTPUT"
echo "COMMENT_BODY='${COMMENT_BODY}'" | tee -a "$GITHUB_OUTPUT"
reg="run (buildall|compile|p0|p1|feut|beut|cloudut|external|clickbench|cloud_p0|cloud_p1|arm|performance)( [1-9]*[0-9]+)*"
reg="run (buildall|compile|p0|p1|feut|beut|cloudut|external|clickbench|cloud_p0|cloud_p1|vault_p0|arm|performance)( [1-9]*[0-9]+)*"
COMMENT_TRIGGER_TYPE="$(echo -e "${COMMENT_BODY}" | xargs | grep -E "${reg}" | awk -F' ' '{print $2}' | sed -n 1p | sed 's/\r//g')"
COMMENT_REPEAT_TIMES="$(echo -e "${COMMENT_BODY}" | xargs | grep -E "${reg}" | awk -F' ' '{print $3}' | sed -n 1p | sed 's/\r//g')"
echo "COMMENT_TRIGGER_TYPE=${COMMENT_TRIGGER_TYPE}" | tee -a "$GITHUB_OUTPUT"
Expand Down Expand Up @@ -139,8 +140,10 @@ jobs:
fi
if file_changed_cloud_p0; then
echo "changed_cloud_p0=true" | tee -a "$GITHUB_OUTPUT"
echo "changed_vault_p0=true" | tee -a "$GITHUB_OUTPUT"
else
echo "changed_cloud_p0=false" | tee -a "$GITHUB_OUTPUT"
echo "changed_vault_p0=false" | tee -a "$GITHUB_OUTPUT"
fi
if file_changed_cloud_p1; then
echo "changed_cloud_p1=true" | tee -a "$GITHUB_OUTPUT"
Expand All @@ -159,6 +162,7 @@ jobs:
echo "changed_performance=true" | tee -a "$GITHUB_OUTPUT"
echo "changed_cloud_p0=true" | tee -a "$GITHUB_OUTPUT"
echo "changed_cloud_p1=true" | tee -a "$GITHUB_OUTPUT"
echo "changed_vault_p0=true" | tee -a "$GITHUB_OUTPUT"
fi
# - name: "Setup tmate session"
Expand Down Expand Up @@ -323,6 +327,33 @@ jobs:
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
fi
- name: "Trigger or Skip vault_p0"
if: ${{ fromJSON(steps.parse.outputs.comment_trigger) && contains(fromJSON('["vault_p0", "buildall"]'), steps.parse.outputs.COMMENT_TRIGGER_TYPE) }}
run: |
source ./regression-test/pipeline/common/teamcity-utils.sh
if [[ ${{ steps.parse.outputs.COMMENT_TRIGGER_TYPE }} == "buildall" ]]; then
echo "COMMENT_TRIGGER_TYPE is buildall, trigger compile is enough, compile will trigger vault_p0" && exit
fi
set -x
if [[ "${{ steps.parse.outputs.TARGET_BRANCH }}" == "'master'" ||
"${{ steps.parse.outputs.TARGET_BRANCH }}" == "'branch-3.0'" ]]; then
echo "PR target branch is in (master, branch-3.0), need run vault_p0"
trigger_or_skip_build \
"${{ steps.changes.outputs.changed_vault_p0 }}" \
"${{ steps.parse.outputs.PULL_REQUEST_NUM }}" \
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"vault_p0" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
else
echo "PR target branch is not in (master, branch-3.0), skip run vault_p0"
trigger_or_skip_build \
"false" \
"${{ steps.parse.outputs.PULL_REQUEST_NUM }}" \
"${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" \
"vault_p0" \
"${{ steps.parse.outputs.COMMENT_REPEAT_TIMES }}"
fi
- name: "Trigger or Skip cloud_p1"
if: ${{ fromJSON(steps.parse.outputs.comment_trigger) && contains(fromJSON('["cloud_p1", "buildall"]'), steps.parse.outputs.COMMENT_TRIGGER_TYPE) }}
run: |
Expand Down Expand Up @@ -402,3 +433,4 @@ jobs:
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" cloud_p0
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" cloud_p1
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" cloudut
skip_build "${{ steps.parse.outputs.COMMIT_ID_FROM_TRIGGER }}" vault_p0
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ See how to compile 🔗[Compilation](https://doris.apache.org/docs/dev/install/

### 📮 Install

See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/standard-deployment)
See how to install and deploy 🔗[Installation and deployment](https://doris.apache.org/docs/dev/install/cluster-deployment/standard-deployment)

## 🧩 Components

Expand Down
Binary file removed aazcp.tar.gz
Binary file not shown.
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand Down Expand Up @@ -481,6 +482,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down
7 changes: 4 additions & 3 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ Status CloudBaseCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -343,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("num_output_delete_bitmap", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ Status CloudCumulativeCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms, range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -299,7 +300,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("number_output_delete_bitmap",
output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
146 changes: 9 additions & 137 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
Expand Down Expand Up @@ -221,146 +222,17 @@ int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
size_t* compaction_score, bool allow_delete) {
if (tablet->tablet_state() == TABLET_NOTREADY) {
return 0;
}

input_rowsets->clear();
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();

int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;

for (const auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
// we should compact those version before handling them over to base compaction
break;
} else {
// we meet a delete version, and no other versions before, skip it and continue
input_rowsets->clear();
*compaction_score = 0;
transient_size = 0;
total_size = 0;
continue;
}
}

*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();

transient_size += 1;
input_rowsets->push_back(rowset);

// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
// Only 1 non-overlapping rowset, skip it
input_rowsets->clear();
*compaction_score = 0;
total_size = 0;
continue;
}
return transient_size;
} else if (
*compaction_score >=
config::compaction_max_rowset_count) { // If the number of rowsets is too large: FDB_ERROR_CODE_TXN_TOO_LARGE
return transient_size;
}
}

// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
// if there is only one rowset and not overlapping,
// we do not need to do cumulative compaction
if (input_rowsets->size() == 1 &&
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
input_rowsets->clear();
*compaction_score = 0;
}
return transient_size;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (*compaction_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return transient_size;
}

// Condition 3: level1 achieve compaction_goal_size
std::vector<RowsetSharedPtr> level1_rowsets;
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
int64_t continuous_size = 0;
for (const auto& rowset : candidate_rowsets) {
const auto& rs_meta = rowset->rowset_meta();
if (rs_meta->compaction_level() == 0) {
break;
}
level1_rowsets.push_back(rowset);
continuous_size += rs_meta->total_disk_size();
if (level1_rowsets.size() >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
}
return transient_size;
}
}

input_rowsets->clear();
// Condition 5: If their are many empty rowsets, maybe should be compacted
tablet->calc_consecutive_empty_rowsets(
input_rowsets, candidate_rowsets,
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!input_rowsets->empty()) {
VLOG_NOTICE << "tablet is " << tablet->tablet_id()
<< ", there are too many consecutive empty rowsets, size is "
<< input_rowsets->size();
return 0;
}
*compaction_score = 0;

return 0;
return TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
tablet, last_cumu, candidate_rowsets, max_compaction_score, min_compaction_score,
input_rowsets, last_delete_version, compaction_score, allow_delete);
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(
const std::vector<RowsetSharedPtr>& input_rowsets) {
int64_t first_level = 0;
for (size_t i = 0; i < input_rowsets.size(); i++) {
int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level();
if (i == 0) {
first_level = cur_level;
} else {
if (first_level != cur_level) {
LOG(ERROR) << "Failed to check compaction level, first_level: " << first_level
<< ", cur_level: " << cur_level;
}
}
}
return first_level + 1;
int64_t CloudTimeSeriesCumulativeCompactionPolicy::get_compaction_level(
CloudTablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) {
return TimeSeriesCumulativeCompactionPolicy::get_compaction_level((BaseTablet*)tablet,
input_rowsets, output_rowset);
}

int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
Expand Down
18 changes: 15 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class CloudCumulativeCompactionPolicy {
Version& last_delete_version,
int64_t last_cumulative_point) = 0;

virtual int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) = 0;
virtual int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) = 0;

virtual int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
Expand All @@ -52,6 +54,8 @@ class CloudCumulativeCompactionPolicy {
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) = 0;

virtual std::string name() = 0;
};

class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactionPolicy {
Expand All @@ -68,7 +72,9 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override {
int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override {
return 0;
}

Expand All @@ -80,6 +86,8 @@ class CloudSizeBasedCumulativeCompactionPolicy : public CloudCumulativeCompactio
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

std::string name() override { return "size_based"; }

private:
int64_t _level_size(const int64_t size);

Expand All @@ -105,7 +113,9 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
Version& last_delete_version,
int64_t last_cumulative_point) override;

int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& input_rowsets) override;
int64_t get_compaction_level(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& input_rowsets,
RowsetSharedPtr output_rowset) override;

int64_t pick_input_rowsets(CloudTablet* tablet,
const std::vector<RowsetSharedPtr>& candidate_rowsets,
Expand All @@ -114,6 +124,8 @@ class CloudTimeSeriesCumulativeCompactionPolicy : public CloudCumulativeCompacti
std::vector<RowsetSharedPtr>* input_rowsets,
Version* last_delete_version, size_t* compaction_score,
bool allow_delete = false) override;

std::string name() override { return "time_series"; }
};

#include "common/compile_check_end.h"
Expand Down
Loading

0 comments on commit 42dc351

Please sign in to comment.