Skip to content

Commit

Permalink
[feature](backup) backup restore cooldown data
Browse files Browse the repository at this point in the history
  • Loading branch information
justfortaste committed Dec 24, 2024
1 parent 81f43a1 commit 2738ced
Show file tree
Hide file tree
Showing 29 changed files with 2,207 additions and 80 deletions.
4 changes: 3 additions & 1 deletion be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ class S3FileSystem final : public RemoteFileSystem {
return path;
} else {
// path with no schema
return _root_path / path;
return std::filesystem::path(
fmt::format("s3://{}/{}", _s3_conf.bucket, _s3_conf.prefix)) /
path;
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class RowsetMeta {
_fs = std::move(fs);
}

void clear_resource_id() { _rowset_meta_pb.clear_resource_id(); }

const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); }

bool is_local() const { return !_rowset_meta_pb.has_resource_id(); }
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ Status SingleReplicaCompaction::_fetch_rowset(const TReplicaInfo& addr, const st
// change all rowset ids because they maybe its id same with local rowset
auto olap_st = SnapshotManager::instance()->convert_rowset_ids(
local_path, _tablet->tablet_id(), _tablet->replica_id(), _tablet->table_id(),
_tablet->partition_id(), _tablet->schema_hash());
_tablet->partition_id(), _tablet->schema_hash(), false, 0);
if (!olap_st.ok()) {
LOG(WARNING) << "fail to convert rowset ids, path=" << local_path
<< ", tablet_id=" << _tablet->tablet_id() << ", error=" << olap_st;
Expand Down
18 changes: 12 additions & 6 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -146,7 +147,8 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) {

Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t tablet_id,
int64_t replica_id, int64_t table_id,
int64_t partition_id, const int32_t& schema_hash) {
int64_t partition_id, const int32_t& schema_hash,
bool is_restore, int64_t storage_policy_id) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
Status res = Status::OK();
// check clone dir existed
Expand Down Expand Up @@ -181,6 +183,10 @@ Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t
new_tablet_meta_pb.set_tablet_id(tablet_id);
*new_tablet_meta_pb.mutable_tablet_uid() = TabletUid::gen_uid().to_proto();
new_tablet_meta_pb.set_replica_id(replica_id);
if (is_restore) {
new_tablet_meta_pb.set_storage_policy_id(storage_policy_id);
new_tablet_meta_pb.clear_cooldown_meta_id();
}
if (table_id > 0) {
new_tablet_meta_pb.set_table_id(table_id);
}
Expand Down Expand Up @@ -212,6 +218,9 @@ Status SnapshotManager::convert_rowset_ids(const std::string& clone_dir, int64_t
} else {
// remote rowset
*rowset_meta = visible_rowset;
if (is_restore) {
rowset_meta->clear_resource_id();
}
}

rowset_meta->set_tablet_id(tablet_id);
Expand Down Expand Up @@ -521,11 +530,8 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
if (!is_single_rowset_clone && (!res.ok() || request.missing_version.empty())) {
if (!request.__isset.missing_version &&
ref_tablet->tablet_meta()->cooldown_meta_id().initialized()) {
LOG(WARNING) << "currently not support backup tablet with cooldowned remote "
"data. tablet="
<< request.tablet_id;
return Status::NotSupported(
"currently not support backup tablet with cooldowned remote data");
LOG(INFO) << "Backup tablet with cooldowned remote data. tablet="
<< request.tablet_id;
}
/// not all missing versions are found, fall back to full snapshot.
res = Status::OK(); // reset res
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class SnapshotManager {
static SnapshotManager* instance();

Status convert_rowset_ids(const std::string& clone_dir, int64_t tablet_id, int64_t replica_id,
int64_t table_id, int64_t partition_id, const int32_t& schema_hash);
int64_t table_id, int64_t partition_id, const int32_t& schema_hash,
bool is_restore, int64_t storage_policy_id);

private:
SnapshotManager() : _snapshot_base_id(0) {
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ class TabletSchema {
segment_v2::CompressionTypePB compression_type() const { return _compression_type; }

const std::vector<TabletIndex>& indexes() const { return _indexes; }
[[nodiscard]] bool has_inverted_index() const {
for (const auto& index : _indexes) {
if (index.index_type() == IndexType::INVERTED) {
return true;
}
}
return false;
}
std::vector<const TabletIndex*> get_indexes_for_column(int32_t col_unique_id) const;
bool has_inverted_index(int32_t col_unique_id) const;
bool has_inverted_index_with_index_id(int64_t index_id) const;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
// change all rowset ids because they maybe its id same with local rowset
status = SnapshotManager::instance()->convert_rowset_ids(
local_data_path, _clone_req.tablet_id, _clone_req.replica_id,
_clone_req.table_id, _clone_req.partition_id, _clone_req.schema_hash);
_clone_req.table_id, _clone_req.partition_id, _clone_req.schema_hash, false, 0);
} else {
LOG_WARNING("failed to download snapshot from remote BE")
.tag("url", _mask_token(remote_url_prefix))
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Status EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file(
// rowset create time is useful when load tablet from meta to check which tablet is the tablet to load
return SnapshotManager::instance()->convert_rowset_ids(
full_path, tablet_id, _tablet->replica_id(), _tablet->table_id(),
_tablet->partition_id(), schema_hash);
_tablet->partition_id(), schema_hash, false, 0);
}

Status EngineStorageMigrationTask::_reload_tablet(const std::string& full_path) {
Expand Down
198 changes: 170 additions & 28 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "gutil/strings/split.h"
#include "http/http_client.h"
#include "io/fs/broker_file_system.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
Expand All @@ -49,8 +50,10 @@
#include "io/fs/s3_file_system.h"
#include "io/hdfs_builder.h"
#include "olap/data_dir.h"
#include "olap/olap_define.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "runtime/client_cache.h"
Expand Down Expand Up @@ -100,6 +103,166 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l

SnapshotLoader::~SnapshotLoader() = default;

bool _end_with(std::string_view str, std::string_view match) {
return str.size() >= match.size() &&
str.compare(str.size() - match.size(), match.size(), match) == 0;
}

static Status list_segment_inverted_index_file(io::RemoteFileSystem* cold_fs,
const std::string& dir, const std::string& rowset,
std::vector<std::string>* remote_files) {
bool exists = true;
std::vector<io::FileInfo> files;
RETURN_IF_ERROR(cold_fs->list(dir, true, &files, &exists));
for (auto& tmp_file : files) {
io::Path path(tmp_file.file_name);
std::string file_name = path.filename();

if (file_name.substr(0, rowset.length()).compare(rowset) != 0 ||
!_end_with(file_name, ".idx")) {
continue;
}
remote_files->push_back(file_name);
}

return Status::OK();
}

static Status check_need_upload(const std::string& src_path, const std::string& local_file,
std::map<std::string, FileStat>& remote_files, std::string* md5sum,
bool* need_upload) {
// calc md5sum of localfile
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(src_path + "/" + local_file, md5sum));
VLOG_CRITICAL << "get file checksum: " << local_file << ": " << *md5sum;

// check if this local file need upload
auto find = remote_files.find(local_file);
if (find != remote_files.end()) {
if (*md5sum != find->second.md5) {
// remote storage file exist, but with different checksum
LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first
<< ", local: " << *md5sum;
// TODO(cmy): save these files and delete them later
*need_upload = true;
}
} else {
*need_upload = true;
}

return Status::OK();
}

static Status download_and_upload_one_cold_file(
io::RemoteFileSystem& dest_fs, io::RemoteFileSystem* cold_fs,
const std::string& remote_seg_path, const std::string& local_seg_path,
const std::string& dest_seg_path, const std::string& local_path,
const std::string& local_file, std::map<std::string, FileStat>& remote_files) {
RETURN_IF_ERROR(cold_fs->download(remote_seg_path, local_seg_path));

bool need_upload = false;
std::string md5sum;
RETURN_IF_ERROR(check_need_upload(local_path, local_file, remote_files, &md5sum, &need_upload));

if (!need_upload) {
VLOG_CRITICAL << "cold file exist in remote path, no need to upload: " << local_file;
return Status::OK();
}

RETURN_IF_ERROR(dest_fs.upload_with_checksum(local_seg_path, dest_seg_path, md5sum));

//delete local file
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(local_seg_path));

return Status::OK();
}

static Status upload_remote_cold_rowset(io::RemoteFileSystem& dest_fs, int64_t tablet_id,
const std::string& local_path, const std::string& dest_path,
io::RemoteFileSystem* cold_fs, const std::string& rowset_id,
int segments, int have_inverted_index,
std::map<std::string, FileStat>& remote_files) {
Status res = Status::OK();

for (int i = 0; i < segments; i++) {
std::string local_file = fmt::format("{}_{}.dat", rowset_id, i);
std::string remote_seg_path =
fmt::format("{}/{}_{}.dat", remote_tablet_path(tablet_id), rowset_id, i);
std::string local_seg_path = fmt::format("{}/{}_{}.dat", local_path, rowset_id, i);
std::string dest_seg_path = fmt::format("{}/{}_{}.dat", dest_path, rowset_id, i);

RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_seg_path,
local_seg_path, dest_seg_path, local_path,
local_file, remote_files));
}

if (!have_inverted_index) {
return res;
}

std::vector<std::string> remote_index_files;
RETURN_IF_ERROR(list_segment_inverted_index_file(cold_fs, remote_tablet_path(tablet_id),
rowset_id, &remote_index_files));

for (auto& index_file : remote_index_files) {
std::string remote_index_path =
fmt::format("{}/{}", remote_tablet_path(tablet_id), index_file);
std::string local_seg_path = fmt::format("{}/{}", local_path, index_file);
std::string dest_seg_path = fmt::format("{}/{}", dest_path, index_file);

RETURN_IF_ERROR(download_and_upload_one_cold_file(dest_fs, cold_fs, remote_index_path,
local_seg_path, dest_seg_path, local_path,
index_file, remote_files));
}
return res;
}

/*
* get the cooldown data info from the hdr file, download the cooldown data and
* upload it to remote storage.
*/
static Status upload_remote_cold_file(io::RemoteFileSystem& dest_fs, int64_t tablet_id,
const std::string& local_path, const std::string& dest_path,
std::map<std::string, FileStat>& remote_files) {
Status res = Status::OK();
std::string hdr_file = local_path + "/" + std::to_string(tablet_id) + ".hdr";

auto tablet_meta = std::make_shared<TabletMeta>();
res = tablet_meta->create_from_file(hdr_file);
if (!res.ok()) {
return Status::Error<ErrorCode::ENGINE_LOAD_INDEX_TABLE_ERROR>(
"fail to load tablet_meta. file_path={}", hdr_file);
}

if (tablet_meta->tablet_id() != tablet_id) {
return Status::InternalError("Invalid tablet {}", tablet_meta->tablet_id());
}

if (!tablet_meta->cooldown_meta_id().initialized()) {
return res;
}

string rowset_id;
int segments;
int have_inverted_index;

std::shared_ptr<io::RemoteFileSystem> colddata_fs;
RETURN_IF_ERROR(get_remote_file_system(tablet_meta->storage_policy_id(), &colddata_fs));

for (auto rowset_meta : tablet_meta->all_rs_metas()) {
rowset_id = rowset_meta->rowset_id().to_string();
segments = rowset_meta->num_segments();
have_inverted_index = rowset_meta->tablet_schema()->has_inverted_index();

if (segments > 0 && !rowset_meta->is_local()) {
RETURN_IF_ERROR(upload_remote_cold_rowset(dest_fs, tablet_id, local_path, dest_path,
colddata_fs.get(), rowset_id, segments,
have_inverted_index, remote_files));
}
}

return res;
}

Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path,
std::map<int64_t, std::vector<std::string>>* tablet_files) {
if (!_remote_fs) {
Expand Down Expand Up @@ -150,28 +313,11 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
TTaskType::type::UPLOAD));

const std::string& local_file = *it;
// calc md5sum of localfile
bool need_upload = false;
std::string md5sum;
RETURN_IF_ERROR(
io::global_local_filesystem()->md5sum(src_path + "/" + local_file, &md5sum));
VLOG_CRITICAL << "get file checksum: " << local_file << ": " << md5sum;
check_need_upload(src_path, local_file, remote_files, &md5sum, &need_upload));
local_files_with_checksum.push_back(local_file + "." + md5sum);

// check if this local file need upload
bool need_upload = false;
auto find = remote_files.find(local_file);
if (find != remote_files.end()) {
if (md5sum != find->second.md5) {
// remote storage file exist, but with different checksum
LOG(WARNING) << "remote file checksum is invalid. remote: " << find->first
<< ", local: " << md5sum;
// TODO(cmy): save these files and delete them later
need_upload = true;
}
} else {
need_upload = true;
}

if (!need_upload) {
VLOG_CRITICAL << "file exist in remote path, no need to upload: " << local_file;
continue;
Expand All @@ -184,6 +330,10 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
_remote_fs->upload_with_checksum(full_local_file, full_remote_file, md5sum));
} // end for each tablet's local files

// 2.4. upload cooldown data files
RETURN_IF_ERROR(
upload_remote_cold_file(*_remote_fs, tablet_id, src_path, dest_path, remote_files));

tablet_files->emplace(tablet_id, local_files_with_checksum);
finished_num++;
LOG(INFO) << "finished to write tablet to remote. local path: " << src_path
Expand Down Expand Up @@ -734,7 +884,7 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta
// rename the rowset ids and tabletid info in rowset meta
Status convert_status = SnapshotManager::instance()->convert_rowset_ids(
snapshot_path, tablet_id, tablet->replica_id(), tablet->table_id(),
tablet->partition_id(), schema_hash);
tablet->partition_id(), schema_hash, true, tablet->storage_policy_id());
if (!convert_status.ok()) {
std::stringstream ss;
ss << "failed to convert rowsetids in snapshot: " << snapshot_path
Expand Down Expand Up @@ -804,14 +954,6 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta
return status;
}

bool SnapshotLoader::_end_with(const std::string& str, const std::string& match) {
if (str.size() >= match.size() &&
str.compare(str.size() - match.size(), match.size(), match) == 0) {
return true;
}
return false;
}

Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const std::string& src_path,
int64_t* tablet_id,
int32_t* schema_hash) {
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/snapshot_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ class SnapshotLoader {
Status _get_existing_files_from_local(const std::string& local_path,
std::vector<std::string>* local_files);

bool _end_with(const std::string& str, const std::string& match);

Status _replace_tablet_id(const std::string& file_name, int64_t tablet_id,
std::string* new_file_name);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ PROPERTIES ("key"="value", ...);
- <version since="1.2" type="inline"> "reserve_dynamic_partition_enable" = "true":默认为 false。当该属性为 true 时,恢复的表会保留该表备份之前的'dynamic_partition_enable'属性值。该值不为true时,则恢复出来的表的'dynamic_partition_enable'属性值会设置为false。</version>
- "timeout" = "3600":任务超时时间,默认为一天。单位秒。
- "meta_version" = 40:使用指定的 meta_version 来读取之前备份的元数据。注意,该参数作为临时方案,仅用于恢复老版本 Doris 备份的数据。最新版本的备份数据中已经包含 meta version,无需再指定。
- "reserve_storage_policy" = "true":指定的恢复的表是否保留冷热分层属性。默认为true,备份集中保存的storage policy和对应的resource信息将在新集群中重建。恢复时数据都会下载到本地,再由降冷策略上传到远程。reserve_storage_policy设置为false,恢复后的表去除了冷热属性, 变为普通表。
- "storage_resource" = "resource_name":指定恢复后表的冷数据使用的resource。建议在跨集群恢复时指定此属性。注意恢复后的storage policy中的storage_resource属性也会更新为指定的storage_resource。若指定了"reserve_storage_policy"="false",则忽略storage_resource属性。

### Example

Expand Down
Loading

0 comments on commit 2738ced

Please sign in to comment.