Skip to content

Commit

Permalink
[Refactor] Refactoring the timing of memory release on partial column…
Browse files Browse the repository at this point in the history
… updates (StarRocks#30683)

Refactoring the timing of memory release on partial column updates, and also fix the potential pk index leak after an error status.
  • Loading branch information
luohaha authored Sep 13, 2023
1 parent 4b9768e commit 2ecfcc5
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 10 deletions.
21 changes: 11 additions & 10 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -909,11 +909,12 @@ void TabletUpdates::_apply_column_partial_update_commit(const EditVersionInfo& v
state_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
// when failure happen, remove state cache and record error msg
auto failure_handler = [&](const std::string& str, const Status& st) {
manager->update_column_state_cache().remove(state_entry);
std::string msg = strings::Substitute("$0: $1 $2", str, st.to_string(), debug_string());
LOG(ERROR) << msg;
_set_error(msg);
};
// remove state entry when function end
DeferOp state_defer([&]() { manager->update_column_state_cache().remove(state_entry); });
auto& state = state_entry->value();
auto st = state.load(&_tablet, rowset.get(), manager->mem_tracker());
manager->update_column_state_cache().update_object_size(state_entry, state.memory_usage());
Expand All @@ -927,17 +928,24 @@ void TabletUpdates::_apply_column_partial_update_commit(const EditVersionInfo& v
auto index_entry = manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + manager->get_index_cache_expire_ms(_tablet));
auto& index = index_entry->value();
bool enable_persistent_index = index.enable_persistent_index();
// release or remove index entry when function end
DeferOp index_defer([&]() {
if (enable_persistent_index ^ _tablet.get_enable_persistent_index()) {
manager->index_cache().remove(index_entry);
} else {
manager->index_cache().release(index_entry);
}
});
// empty rowset does not need to load in-memory primary index, so skip it
if (rowset->has_data_files() || _tablet.get_enable_persistent_index()) {
auto st = index.load(&_tablet);
manager->index_cache().update_object_size(index_entry, index.memory_usage());
if (!st.ok()) {
manager->index_cache().remove(index_entry);
failure_handler("load primary index failed", st);
return;
}
}
bool enable_persistent_index = index.enable_persistent_index();
PersistentIndexMetaPB index_meta;
if (enable_persistent_index) {
st = TabletMetaManager::get_persistent_index_meta(_tablet.data_dir(), _tablet.tablet_id(), &index_meta);
Expand Down Expand Up @@ -1018,13 +1026,6 @@ void TabletUpdates::_apply_column_partial_update_commit(const EditVersionInfo& v
return;
}

// 6. clear state and index cache
manager->update_column_state_cache().remove(state_entry);
if (enable_persistent_index ^ _tablet.get_enable_persistent_index()) {
manager->index_cache().remove(index_entry);
} else {
manager->index_cache().release(index_entry);
}
_update_total_stats(version_info.rowsets, nullptr, nullptr);
}

Expand Down
28 changes: 28 additions & 0 deletions be/src/storage/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,4 +565,32 @@ void UpdateManager::on_rowset_cancel(Tablet* tablet, Rowset* rowset) {
}
}

bool UpdateManager::TEST_update_state_exist(Tablet* tablet, Rowset* rowset) {
string rowset_unique_id = rowset->rowset_id().to_string();
if (rowset->is_column_mode_partial_update()) {
auto column_state_entry =
_update_column_state_cache.get(strings::Substitute("$0_$1", tablet->tablet_id(), rowset_unique_id));
if (column_state_entry != nullptr) {
_update_column_state_cache.remove(column_state_entry);
return true;
}
} else {
auto state_entry = _update_state_cache.get(strings::Substitute("$0_$1", tablet->tablet_id(), rowset_unique_id));
if (state_entry != nullptr) {
_update_state_cache.remove(state_entry);
return true;
}
}
return false;
}

bool UpdateManager::TEST_primary_index_refcnt(int64_t tablet_id, uint32_t expected_cnt) {
auto index_entry = _index_cache.get(tablet_id);
if (index_entry == nullptr) {
return expected_cnt == 0;
}
_index_cache.release(index_entry);
return index_entry->get_ref() == expected_cnt;
}

} // namespace starrocks
4 changes: 4 additions & 0 deletions be/src/storage/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ class UpdateManager {
bool keep_pindex_bf() { return _keep_pindex_bf; }
void set_keep_pindex_bf(bool keep_pindex_bf) { _keep_pindex_bf = keep_pindex_bf; }

// Used in UT only
bool TEST_update_state_exist(Tablet* tablet, Rowset* rowset);
bool TEST_primary_index_refcnt(int64_t tablet_id, uint32_t expected_cnt);

private:
// default 6min
int64_t _cache_expire_ms = 360000;
Expand Down
28 changes: 28 additions & 0 deletions be/test/storage/rowset_column_partial_update_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ static void prepare_tablet(RowsetColumnPartialUpdateTest* self, const TabletShar
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2) {
return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
}));
// check refcnt
for (const auto& rs_ptr : rowsets) {
ASSERT_FALSE(
StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get()));
}
ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1));
version_before_partial_update = version;
}

Expand All @@ -387,6 +393,12 @@ static void prepare_tablet(RowsetColumnPartialUpdateTest* self, const TabletShar
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2;
}));
// check refcnt
for (const auto& rs_ptr : rowsets) {
ASSERT_FALSE(
StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get()));
}
ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1));
}
}

Expand Down Expand Up @@ -501,6 +513,11 @@ TEST_F(RowsetColumnPartialUpdateTest, partial_update_diff_column_and_check) {
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2;
}));
// check refcnt
for (const auto& rs_ptr : rowsets) {
ASSERT_FALSE(StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get()));
}
ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1));
}

TEST_F(RowsetColumnPartialUpdateTest, partial_update_multi_segment_and_check) {
Expand Down Expand Up @@ -535,6 +552,11 @@ TEST_F(RowsetColumnPartialUpdateTest, partial_update_multi_segment_and_check) {
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2;
}));
// check refcnt
for (const auto& rs_ptr : rowsets) {
ASSERT_FALSE(StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get()));
}
ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1));
}

TEST_F(RowsetColumnPartialUpdateTest, partial_update_multi_segment_preload_and_check) {
Expand Down Expand Up @@ -799,6 +821,12 @@ TEST_F(RowsetColumnPartialUpdateTest, test_upsert) {
ASSERT_TRUE(check_tablet(tablet, version, 2 * N, [](int64_t k1, int64_t v1, int32_t v2) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2;
}));
// check refcnt
for (const auto& rs_ptr : rowsets) {
ASSERT_FALSE(
StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get()));
}
ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1));
}
}

Expand Down

0 comments on commit 2ecfcc5

Please sign in to comment.