Skip to content

Commit

Permalink
[BugFix] refactor cloud native pk table publish & fix local persisten…
Browse files Browse the repository at this point in the history
…t index usage (StarRocks#30504)

Signed-off-by: luohaha <[email protected]>
  • Loading branch information
luohaha authored Sep 13, 2023
1 parent 264b6dc commit ac2b169
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 43 deletions.
4 changes: 4 additions & 0 deletions be/src/storage/lake/lake_local_persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ Status LakeLocalPersistentIndex::load_from_lake_tablet(starrocks::lake::Tablet*
LOG(WARNING) << "tablet: " << tablet->id() << " is not primary key tablet";
return Status::NotSupported("Only PrimaryKey table is supported to use persistent index");
}
// persistent index' minor compaction is a new strategy to decrease the IO amplification.
// More detail: https://github.com/StarRocks/starrocks/issues/27581.
// disable minor_compaction in cloud native table for now, will enable it later
config::enable_pindex_minor_compaction = false;

MonotonicStopWatch timer;
timer.start();
Expand Down
5 changes: 2 additions & 3 deletions be/src/storage/lake/meta_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ void MetaFileBuilder::apply_opwrite(const TxnLogPB_OpWrite& op_write, const std:
for (const auto& del_file : op_write.dels()) {
_trash_files->push_back(del_file);
}
_has_update_index = true;
}

void MetaFileBuilder::apply_opcompaction(const TxnLogPB_OpCompaction& op_compaction) {
Expand Down Expand Up @@ -122,8 +121,6 @@ void MetaFileBuilder::apply_opcompaction(const TxnLogPB_OpCompaction& op_compact
_tablet_meta->set_next_rowset_id(_tablet_meta->next_rowset_id() + rowset->segments_size());
}

_has_update_index = true;

VLOG(2) << fmt::format("MetaFileBuilder apply_opcompaction, id:{} input range:{} delvec del cnt:{} output:{}",
_tablet_meta->id(), del_range_ss.str(), delvec_erase_cnt,
op_compaction.output_rowset().ShortDebugString());
Expand Down Expand Up @@ -198,6 +195,8 @@ Status MetaFileBuilder::finalize(int64_t txn_id) {
RETURN_IF_ERROR(_tablet.put_metadata(_tablet_meta));
_update_mgr->update_primary_index_data_version(_tablet, version);
_fill_delvec_cache();
// Set _has_finalized at last, and if failure happens before this, we need to clear pk index
// and retry publish later.
_has_finalized = true;
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/meta_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class MetaFileBuilder {
// when apply or finalize fail, need to clear primary index cache
void handle_failure();
bool has_update_index() const { return _has_update_index; }
void set_has_update_index() { _has_update_index = true; }
// collect files that need to removed
std::shared_ptr<std::vector<std::string>> trash_files() { return _trash_files; }

Expand Down
37 changes: 27 additions & 10 deletions be/src/storage/lake/txn_log_applier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
}

~PrimaryKeyTxnLogApplier() override {
// must release primary index before `handle_failure`, otherwise `handle_failure` will fail
_tablet.update_mgr()->release_primary_index(_index_entry);
// handle failure first, then release lock
if (_check_meta_version_succ) {
_builder.handle_failure();
}
_builder.handle_failure();
if (_inited) {
_s_schema_change_set.erase(_tablet.id());
}
Expand All @@ -64,7 +64,6 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
Status check_meta_version() {
// check tablet meta
RETURN_IF_ERROR(_tablet.update_mgr()->check_meta_version(_tablet, _base_version));
_check_meta_version_succ = true;
return Status::OK();
}

Expand All @@ -82,7 +81,13 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
return Status::OK();
}

Status finish() override { return _builder.finalize(_max_txn_id); }
Status finish() override {
// Must call `commit_primary_index` before `finalize`,
// because if `commit_primary_index` or `finalize` fail, we can remove index in `handle_failure`.
// if `_index_entry` is null, do nothing.
RETURN_IF_ERROR(_tablet.update_mgr()->commit_primary_index(_index_entry, &_tablet));
return _builder.finalize(_max_txn_id);
}

std::shared_ptr<std::vector<std::string>> trash_files() override { return _builder.trash_files(); }

Expand All @@ -92,17 +97,29 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
!op_write.rowset().has_delete_predicate()) {
return Status::OK();
}
return _tablet.update_mgr()->publish_primary_key_tablet(op_write, txn_id, *_metadata, &_tablet, &_builder,
_base_version);
// We call `prepare_primary_index` only when first time we apply `write_log` or `compaction_log`, instead of
// in `TxnLogApplier.init`, because we have to build primary index after apply `schema_change_log` finish.
if (_index_entry == nullptr) {
ASSIGN_OR_RETURN(_index_entry, _tablet.update_mgr()->prepare_primary_index(*_metadata, &_tablet, &_builder,
_base_version, _new_version));
}
return _tablet.update_mgr()->publish_primary_key_tablet(op_write, txn_id, *_metadata, &_tablet, _index_entry,
&_builder, _base_version);
}

Status apply_compaction_log(const TxnLogPB_OpCompaction& op_compaction) {
if (op_compaction.input_rowsets().empty()) {
DCHECK(!op_compaction.has_output_rowset() || op_compaction.output_rowset().num_rows() == 0);
return Status::OK();
}
return _tablet.update_mgr()->publish_primary_compaction(op_compaction, *_metadata, &_tablet, &_builder,
_base_version);
// We call `prepare_primary_index` only when first time we apply `write_log` or `compaction_log`, instead of
// in `TxnLogApplier.init`, because we have to build primary index after apply `schema_change_log` finish.
if (_index_entry == nullptr) {
ASSIGN_OR_RETURN(_index_entry, _tablet.update_mgr()->prepare_primary_index(*_metadata, &_tablet, &_builder,
_base_version, _new_version));
}
return _tablet.update_mgr()->publish_primary_compaction(op_compaction, *_metadata, &_tablet, _index_entry,
&_builder, _base_version);
}

Status apply_schema_change_log(const TxnLogPB_OpSchemaChange& op_schema_change) {
Expand Down Expand Up @@ -139,8 +156,8 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
int64_t _new_version{0};
int64_t _max_txn_id{0}; // Used as the file name prefix of the delvec file
MetaFileBuilder _builder;
DynamicCache<uint64_t, LakePrimaryIndex>::Entry* _index_entry{nullptr};
bool _inited{false};
bool _check_meta_version_succ{false};
};

class NonPrimaryKeyTxnLogApplier : public TxnLogApplier {
Expand Down
85 changes: 58 additions & 27 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "storage/rowset/column_iterator.h"
#include "storage/rowset/default_value_column_iterator.h"
#include "storage/tablet_manager.h"
#include "storage/tablet_meta_manager.h"
#include "util/pretty_printer.h"
#include "util/trace.h"

Expand All @@ -50,26 +51,67 @@ Status LakeDelvecLoader::load(const TabletSegmentId& tsid, int64_t version, DelV
return _update_mgr->get_del_vec(tsid, version, _pk_builder, pdelvec);
}

// |metadata| contain last tablet meta info with new version
Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_write, int64_t txn_id,
const TabletMetadata& metadata, Tablet* tablet,
MetaFileBuilder* builder, int64_t base_version) {
// 1. update primary index
StatusOr<IndexEntry*> UpdateManager::prepare_primary_index(const TabletMetadata& metadata, Tablet* tablet,
MetaFileBuilder* builder, int64_t base_version,
int64_t new_version) {
auto index_entry = _index_cache.get_or_create(tablet->id());
index_entry->update_expire_time(MonotonicMillis() + get_cache_expire_ms());
auto& index = index_entry->value();
Status st = index.lake_load(tablet, metadata, base_version, builder);
_index_cache.update_object_size(index_entry, index.memory_usage());
if (!st.ok()) {
_index_cache.remove(index_entry);
std::string msg = strings::Substitute("prepare_primary_index: load primary index failed: $0", st.to_string());
LOG(ERROR) << msg;
return Status::InternalError(msg);
}
st = index.prepare(EditVersion(new_version, 0), 0);
if (!st.ok()) {
_index_cache.remove(index_entry);
std::string msg =
strings::Substitute("publish_primary_key_tablet: load primary index failed: $0", st.to_string());
strings::Substitute("prepare_primary_index: prepare primary index failed: $0", st.to_string());
LOG(ERROR) << msg;
return st;
return Status::InternalError(msg);
}
// release index entry but keep it in cache
DeferOp release_index_entry([&] { _index_cache.release(index_entry); });
// 2. load rowset update data to cache, get upsert and delete list
builder->set_has_update_index();
return index_entry;
}

Status UpdateManager::commit_primary_index(IndexEntry* index_entry, Tablet* tablet) {
if (index_entry != nullptr) {
auto& index = index_entry->value();
if (index.enable_persistent_index()) {
// only take affect in local persistent index
PersistentIndexMetaPB index_meta;
PersistentIndexMetaLockGuard index_meta_lock_guard;
DataDir* data_dir = StorageEngine::instance()->get_persistent_index_store();
index.get_persistent_index_meta_lock_guard(&index_meta_lock_guard);
RETURN_IF_ERROR(TabletMetaManager::get_persistent_index_meta(data_dir, tablet->id(), &index_meta));
RETURN_IF_ERROR(index.commit(&index_meta));
RETURN_IF_ERROR(TabletMetaManager::write_persistent_index_meta(data_dir, tablet->id(), index_meta));
// Call `on_commited` here, which will remove old files is safe.
// Because if publish version fail after `on_commited`, index will be rebuild.
RETURN_IF_ERROR(index.on_commited());
TRACE("commit primary index");
}
}

return Status::OK();
}

void UpdateManager::release_primary_index(IndexEntry* index_entry) {
if (index_entry != nullptr) {
_index_cache.release(index_entry);
}
}

// |metadata| contain last tablet meta info with new version
Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_write, int64_t txn_id,
const TabletMetadata& metadata, Tablet* tablet,
IndexEntry* index_entry, MetaFileBuilder* builder,
int64_t base_version) {
auto& index = index_entry->value();
// 1. load rowset update data to cache, get upsert and delete list
const uint32_t rowset_id = metadata.next_rowset_id();
auto tablet_schema = std::make_shared<TabletSchema>(metadata.schema());
auto state_entry = _update_state_cache.get_or_create(strings::Substitute("$0_$1", tablet->id(), txn_id));
Expand All @@ -79,7 +121,7 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
auto& state = state_entry->value();
RETURN_IF_ERROR(state.load(op_write, metadata, base_version, tablet, builder, true));
_update_state_cache.update_object_size(state_entry, state.memory_usage());
// 3. rewrite segment file if it is partial update
// 2. rewrite segment file if it is partial update
std::vector<std::string> orphan_files;
RETURN_IF_ERROR(state.rewrite_segment(op_write, metadata, tablet, &orphan_files));
PrimaryIndex::DeletesMap new_deletes;
Expand All @@ -89,6 +131,7 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
auto& upserts = state.upserts();
// handle merge condition, skip update row which's merge condition column value is smaller than current row
int32_t condition_column = _get_condition_column(op_write, *tablet_schema);
// 3. update primary index, and generate delete info.
for (uint32_t i = 0; i < upserts.size(); i++) {
if (upserts[i] != nullptr) {
if (condition_column < 0) {
Expand Down Expand Up @@ -451,26 +494,13 @@ size_t UpdateManager::get_rowset_num_deletes(int64_t tablet_id, int64_t version,

Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op_compaction,
const TabletMetadata& metadata, Tablet* tablet,
MetaFileBuilder* builder, int64_t base_version) {
IndexEntry* index_entry, MetaFileBuilder* builder,
int64_t base_version) {
std::stringstream cost_str;
MonotonicStopWatch watch;
watch.start();
// 1. load primary index
auto index_entry = _index_cache.get_or_create(tablet->id());
index_entry->update_expire_time(MonotonicMillis() + get_cache_expire_ms());
auto& index = index_entry->value();
Status st = index.lake_load(tablet, metadata, base_version, builder);
_index_cache.update_object_size(index_entry, index.memory_usage());
if (!st.ok()) {
_index_cache.remove(index_entry);
LOG(ERROR) << strings::Substitute("publish_primary_key_tablet: load primary index failed: $0", st.to_string());
return st;
}
cost_str << " [primary index load] " << watch.elapsed_time();
watch.reset();
// release index entry but keep it in cache
DeferOp release_index_entry([&] { _index_cache.release(index_entry); });
// 2. iterate output rowset, update primary index and generate delvec
// 1. iterate output rowset, update primary index and generate delvec
std::shared_ptr<TabletSchema> tablet_schema = std::make_shared<TabletSchema>(metadata.schema());
RowsetPtr output_rowset =
std::make_shared<Rowset>(tablet, std::make_shared<RowsetMetadata>(op_compaction.output_rowset()));
Expand All @@ -488,6 +518,7 @@ Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op
[&](const RowsetMetadata& r) { return r.id() == max_rowset_id; });
uint32_t max_src_rssid = max_rowset_id + input_rowset->segments_size() - 1;

// 2. update primary index, and generate delete info.
for (size_t i = 0; i < compaction_state->pk_cols.size(); i++) {
RETURN_IF_ERROR(compaction_state->load_segments(output_rowset.get(), tablet_schema, i));
auto& pk_col = compaction_state->pk_cols[i];
Expand Down
17 changes: 15 additions & 2 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Tablet;
class MetaFileBuilder;
class UpdateManager;
struct AutoIncrementPartialUpdateState;
using IndexEntry = DynamicCache<uint64_t, LakePrimaryIndex>::Entry;

class LakeDelvecLoader : public DelvecLoader {
public:
Expand All @@ -60,7 +61,8 @@ class UpdateManager {

// publish primary key tablet, update primary index and delvec, then update meta file
Status publish_primary_key_tablet(const TxnLogPB_OpWrite& op_write, int64_t txn_id, const TabletMetadata& metadata,
Tablet* tablet, MetaFileBuilder* builder, int64_t base_version);
Tablet* tablet, IndexEntry* index_entry, MetaFileBuilder* builder,
int64_t base_version);

// get rowids from primary index by each upserts
Status get_rowids_from_pkindex(Tablet* tablet, int64_t base_version, const std::vector<ColumnUniquePtr>& upserts,
Expand Down Expand Up @@ -88,7 +90,8 @@ class UpdateManager {
size_t get_rowset_num_deletes(int64_t tablet_id, int64_t version, const RowsetMetadataPB& rowset_meta);

Status publish_primary_compaction(const TxnLogPB_OpCompaction& op_compaction, const TabletMetadata& metadata,
Tablet* tablet, MetaFileBuilder* builder, int64_t base_version);
Tablet* tablet, IndexEntry* index_entry, MetaFileBuilder* builder,
int64_t base_version);

// remove primary index entry from cache, called when publish version error happens.
// Because update primary index isn't idempotent, so if primary index update success, but
Expand Down Expand Up @@ -120,6 +123,16 @@ class UpdateManager {

MemTracker* compaction_state_mem_tracker() const { return _compaction_state_mem_tracker.get(); }

// get or create primary index, and prepare primary index state
StatusOr<IndexEntry*> prepare_primary_index(const TabletMetadata& metadata, Tablet* tablet,
MetaFileBuilder* builder, int64_t base_version, int64_t new_version);

// commit primary index, only take affect when it is local persistent index
Status commit_primary_index(IndexEntry* index_entry, Tablet* tablet);

// release index entry if it isn't nullptr
void release_primary_index(IndexEntry* index_entry);

private:
// print memory tracker state
void _print_memory_stats();
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,8 @@ class PersistentIndex {
Status load_from_tablet(Tablet* tablet);

// start modification with intended version
// | version |: intended commit version
// | n |: deprecated
Status prepare(const EditVersion& version, size_t n);

// abort modification
Expand Down
9 changes: 9 additions & 0 deletions be/test/storage/lake/condition_update_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ TEST_P(ConditionUpdateTest, test_condition_update) {
ASSIGN_OR_ABORT(new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 4 + i);
}
if (GetParam().enable_persistent_index) {
check_local_persistent_index_meta(tablet_id, version);
}
}

TEST_P(ConditionUpdateTest, test_condition_update_multi_segment) {
Expand Down Expand Up @@ -270,6 +273,9 @@ TEST_P(ConditionUpdateTest, test_condition_update_multi_segment) {
ASSERT_EQ(kChunkSize, check(version, [](int c0, int c1, int c2) { return (c0 * 4 == c1) && (c0 * 5 == c2); }));
ASSIGN_OR_ABORT(new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 5);
if (GetParam().enable_persistent_index) {
check_local_persistent_index_meta(tablet_id, version);
}
}

TEST_P(ConditionUpdateTest, test_condition_update_in_memtable) {
Expand Down Expand Up @@ -302,6 +308,9 @@ TEST_P(ConditionUpdateTest, test_condition_update_in_memtable) {
}));
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 1);
if (GetParam().enable_persistent_index) {
check_local_persistent_index_meta(tablet_id, version);
}
}

INSTANTIATE_TEST_SUITE_P(ConditionUpdateTest, ConditionUpdateTest,
Expand Down
Loading

0 comments on commit ac2b169

Please sign in to comment.