Skip to content

Commit

Permalink
Merge branch 'master' into nereids_refresh_catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Nov 8, 2024
2 parents 5b3f28a + 854512f commit 574dbc0
Show file tree
Hide file tree
Showing 539 changed files with 37,809 additions and 1,521 deletions.
38 changes: 7 additions & 31 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,59 +1,35 @@
### What problem does this PR solve?
<!--
You need to clearly describe your PR in this part:
1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
3. What features were added. Why this function was added.
4. Which codes were refactored and why this part of the code was refactored.
5. Which functions were optimized and what is the difference before and after the optimization.
The description of the PR needs to enable reviewers to quickly and clearly understand the logic of the code modification.
-->

<!--
If there are related issues, please fill in the issue number.
- If you want the issue to be closed after the PR is merged, please use "close #12345". Otherwise, use "ref #12345"
-->

Issue Number: close #xxx

<!--
If this PR is followup a preivous PR, for example, fix the bug that introduced by a related PR,
link the PR here
-->
Related PR: #xxx

Problem Summary:

### Check List (For Committer)
### Release note

- Test <!-- At least one of them must be included. -->
None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No colde files have been changed.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->

- Behavior changed:

- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?

- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 -->

- Release note

<!-- bugfix, feat, behavior changed need a release note -->
<!-- Add one line release note for this PR. -->
None

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
Expand Down
12 changes: 2 additions & 10 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1436,23 +1436,15 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
S3ClientConf conf {
.endpoint {},
.region {},
.ak = std::move(new_s3_conf.client_conf.ak),
.sk = std::move(new_s3_conf.client_conf.sk),
.token = std::move(new_s3_conf.client_conf.token),
.bucket {},
.provider = new_s3_conf.client_conf.provider,
};
S3ClientConf conf = std::move(new_s3_conf.client_conf);
st = client->reset(conf);
fs = std::move(existed_fs);
}

if (!st.ok()) {
LOG(WARNING) << "update s3 resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
LOG_INFO("successfully update s3 resource")
.tag("resource_id", param.id)
.tag("resource_name", param.name);
put_storage_resource(param.id, {std::move(fs)}, param.version);
Expand Down
2 changes: 1 addition & 1 deletion be/src/apache-orc
45 changes: 17 additions & 28 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,12 @@ Status CloudCumulativeCompaction::modify_rowsets() {
if (config::enable_delete_bitmap_merge_on_compaction &&
_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
process_old_version_delete_bitmap();
RETURN_IF_ERROR(process_old_version_delete_bitmap());
}
return Status::OK();
}

void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
// agg previously rowset old version delete bitmap
std::vector<RowsetSharedPtr> pre_rowsets {};
std::vector<std::string> pre_rowset_ids {};
Expand Down Expand Up @@ -407,40 +407,29 @@ void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
}
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
Status update_st;
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed",
{
update_st = Status::InternalError(
return Status::InternalError(
"test fail to update delete bitmap for tablet_id {}",
cloud_tablet()->tablet_id());
});
if (update_st.ok()) {
update_st = _engine.meta_mgr().update_delete_bitmap_without_lock(
*cloud_tablet(), new_delete_bitmap.get());
}
if (!update_st.ok()) {
std::stringstream ss;
ss << "failed to update delete bitmap for tablet=" << cloud_tablet()->tablet_id()
<< " st=" << update_st.to_string();
std::string msg = ss.str();
LOG(WARNING) << msg;
} else {
Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<CloudTablet*>(_tablet.get())
->delete_expired_stale_rowsets();
});
RETURN_IF_ERROR(_engine.meta_mgr().cloud_update_delete_bitmap_without_lock(
*cloud_tablet(), new_delete_bitmap.get()));

Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets",
{ static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
}
}
return Status::OK();
}

void CloudCumulativeCompaction::garbage_collection() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {

void update_cumulative_point();

void process_old_version_delete_bitmap();
Status process_old_version_delete_bitmap();

ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; }

Expand Down
2 changes: 0 additions & 2 deletions be/src/cloud/cloud_delete_bitmap_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* re
auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
auto size = tablet->tablet_meta()->delete_bitmap().get_size();
LOG(INFO) << "show_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count
<< ",cardinality=" << cardinality << ",size=" << size;

rapidjson::Document root;
root.SetObject();
Expand Down
26 changes: 15 additions & 11 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,11 +704,19 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
const auto& segment_ids = res.segment_ids();
const auto& vers = res.versions();
const auto& delete_bitmaps = res.segment_delete_bitmaps();
if (rowset_ids.size() != segment_ids.size() || rowset_ids.size() != vers.size() ||
rowset_ids.size() != delete_bitmaps.size()) {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"get delete bitmap data wrong,"
"rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size());
}
for (size_t i = 0; i < rowset_ids.size(); i++) {
RowsetId rst_id;
rst_id.init(rowset_ids[i]);
delete_bitmap->merge({rst_id, segment_ids[i], vers[i]},
roaring::Roaring::read(delete_bitmaps[i].data()));
delete_bitmap->merge(
{rst_id, segment_ids[i], vers[i]},
roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length()));
}
int64_t latency = cntl.latency_us();
if (latency > 100 * 1000) { // 100ms
Expand Down Expand Up @@ -1061,9 +1069,9 @@ Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t loc
return st;
}

Status CloudMetaMgr::update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap* delete_bitmap) {
LOG(INFO) << "update_delete_bitmap_without_lock , tablet_id: " << tablet.tablet_id()
Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap* delete_bitmap) {
LOG(INFO) << "cloud_update_delete_bitmap_without_lock , tablet_id: " << tablet.tablet_id()
<< ",delete_bitmap size:" << delete_bitmap->delete_bitmap.size();
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse res;
Expand Down Expand Up @@ -1215,12 +1223,8 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
}
if (rs_meta.tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
auto indices = rs_meta.tablet_schema()->indexes();
const auto& indices = rs_meta.tablet_schema()->inverted_indexes();
for (auto& index : indices) {
// only get file_size for inverted index
if (index.index_type() != IndexType::INVERTED) {
continue;
}
for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
Expand All @@ -1229,7 +1233,7 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
std::string inverted_index_file_path =
InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_path),
index.index_id(), index.get_index_suffix());
index->index_id(), index->get_index_suffix());
auto st = fs->file_size(inverted_index_file_path, &file_size);
if (!st.ok()) {
file_size = 0;
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class CloudMetaMgr {
Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, int64_t initiator,
DeleteBitmap* delete_bitmap);

Status update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap* delete_bitmap);
Status cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap* delete_bitmap);

Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator);
Expand Down
14 changes: 6 additions & 8 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,13 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
auto schema_ptr = rowset_meta->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
if (idx_version == InvertedIndexStorageFormatPB::V1) {
for (const auto& index : schema_ptr->indexes()) {
if (index.index_type() == IndexType::INVERTED) {
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rowset_meta, seg_id, index.index_id(),
index.get_index_suffix());
download_idx_file(idx_path);
}
for (const auto& index : schema_ptr->inverted_indexes()) {
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rowset_meta, seg_id, index->index_id(),
index->get_index_suffix());
download_idx_file(idx_path);
}
} else if (idx_version == InvertedIndexStorageFormatPB::V2) {
} else {
if (schema_ptr->has_inverted_index()) {
auto idx_path = storage_resource.value()->remote_idx_v2_path(
*rowset_meta, seg_id);
Expand Down
27 changes: 13 additions & 14 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ void CloudWarmUpManager::handle_jobs() {
#ifndef BE_TEST
constexpr int WAIT_TIME_SECONDS = 600;
while (true) {
JobMeta cur_job;
std::shared_ptr<JobMeta> cur_job = nullptr;
{
std::unique_lock lock(_mtx);
_cond.wait(lock, [this]() { return _closed || !_pending_job_metas.empty(); });
if (_closed) break;
cur_job = std::move(_pending_job_metas.front());
cur_job = _pending_job_metas.front();
}
for (int64_t tablet_id : cur_job.tablet_ids) {
for (int64_t tablet_id : cur_job->tablet_ids) {
if (_cur_job_id == 0) { // The job is canceled
break;
}
Expand Down Expand Up @@ -147,15 +147,13 @@ void CloudWarmUpManager::handle_jobs() {
auto schema_ptr = rs->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
if (idx_version == InvertedIndexStorageFormatPB::V1) {
for (const auto& index : schema_ptr->indexes()) {
if (index.index_type() == IndexType::INVERTED) {
wait->add_count();
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rs, seg_id, index.index_id(), index.get_index_suffix());
download_idx_file(idx_path);
}
for (const auto& index : schema_ptr->inverted_indexes()) {
wait->add_count();
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rs, seg_id, index->index_id(), index->get_index_suffix());
download_idx_file(idx_path);
}
} else if (idx_version == InvertedIndexStorageFormatPB::V2) {
} else {
if (schema_ptr->has_inverted_index()) {
wait->add_count();
auto idx_path =
Expand All @@ -173,7 +171,7 @@ void CloudWarmUpManager::handle_jobs() {
}
{
std::unique_lock lock(_mtx);
_finish_job.push_back(std::move(cur_job));
_finish_job.push_back(cur_job);
_pending_job_metas.pop_front();
}
}
Expand Down Expand Up @@ -230,8 +228,9 @@ Status CloudWarmUpManager::check_and_set_batch_id(int64_t job_id, int64_t batch_
void CloudWarmUpManager::add_job(const std::vector<TJobMeta>& job_metas) {
{
std::lock_guard lock(_mtx);
std::for_each(job_metas.begin(), job_metas.end(),
[this](const TJobMeta& meta) { _pending_job_metas.emplace_back(meta); });
std::for_each(job_metas.begin(), job_metas.end(), [this](const TJobMeta& meta) {
_pending_job_metas.emplace_back(std::make_shared<JobMeta>(meta));
});
}
_cond.notify_all();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class CloudWarmUpManager {
std::condition_variable _cond;
int64_t _cur_job_id {0};
int64_t _cur_batch_id {-1};
std::deque<JobMeta> _pending_job_metas;
std::vector<JobMeta> _finish_job;
std::deque<std::shared_ptr<JobMeta>> _pending_job_metas;
std::vector<std::shared_ptr<JobMeta>> _finish_job;
std::thread _download_thread;
bool _closed {false};
// the attribute for compile in ut
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out, const TabletMetaPB& in)
if (in.has_schema_version()) {
out->set_schema_version(in.schema_version());
}
out->set_storage_page_size(in.storage_page_size());
}

void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out, TabletMetaPB&& in) {
Expand Down Expand Up @@ -569,6 +570,7 @@ void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out, TabletMetaPB&& in) {
if (in.has_schema_version()) {
out->set_schema_version(in.schema_version());
}
out->set_storage_page_size(in.storage_page_size());
}

TabletMetaPB cloud_tablet_meta_to_doris(const TabletMetaCloudPB& in) {
Expand Down Expand Up @@ -644,6 +646,7 @@ void cloud_tablet_meta_to_doris(TabletMetaPB* out, const TabletMetaCloudPB& in)
if (in.has_schema_version()) {
out->set_schema_version(in.schema_version());
}
out->set_storage_page_size(in.storage_page_size());
}

void cloud_tablet_meta_to_doris(TabletMetaPB* out, TabletMetaCloudPB&& in) {
Expand Down Expand Up @@ -710,6 +713,7 @@ void cloud_tablet_meta_to_doris(TabletMetaPB* out, TabletMetaCloudPB&& in) {
if (in.has_schema_version()) {
out->set_schema_version(in.schema_version());
}
out->set_storage_page_size(in.storage_page_size());
}

} // namespace doris::cloud
Loading

0 comments on commit 574dbc0

Please sign in to comment.