From 2ecfcc5bda307d499df7b49aa2a1ce0b453d8e3f Mon Sep 17 00:00:00 2001 From: Yixin Luo <18810541851@163.com> Date: Wed, 13 Sep 2023 15:02:27 +0800 Subject: [PATCH] [Refactor] Refactoring the timing of memory release on partial column updates (#30683) Refactoring the timing of memory release on partial column updates, and also fix the potential pk index leak after an error status. --- be/src/storage/tablet_updates.cpp | 21 +++++++------- be/src/storage/update_manager.cpp | 28 +++++++++++++++++++ be/src/storage/update_manager.h | 4 +++ .../rowset_column_partial_update_test.cpp | 28 +++++++++++++++++++ 4 files changed, 71 insertions(+), 10 deletions(-) diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index 64f3d57166cf5..b688eac0c8345 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -909,11 +909,12 @@ void TabletUpdates::_apply_column_partial_update_commit(const EditVersionInfo& v state_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms()); // when failure happen, remove state cache and record error msg auto failure_handler = [&](const std::string& str, const Status& st) { - manager->update_column_state_cache().remove(state_entry); std::string msg = strings::Substitute("$0: $1 $2", str, st.to_string(), debug_string()); LOG(ERROR) << msg; _set_error(msg); }; + // remove state entry when function end + DeferOp state_defer([&]() { manager->update_column_state_cache().remove(state_entry); }); auto& state = state_entry->value(); auto st = state.load(&_tablet, rowset.get(), manager->mem_tracker()); manager->update_column_state_cache().update_object_size(state_entry, state.memory_usage()); @@ -927,17 +928,24 @@ void TabletUpdates::_apply_column_partial_update_commit(const EditVersionInfo& v auto index_entry = manager->index_cache().get_or_create(tablet_id); index_entry->update_expire_time(MonotonicMillis() + manager->get_index_cache_expire_ms(_tablet)); auto& index = index_entry->value(); + bool enable_persistent_index = index.enable_persistent_index(); + // release or remove index entry when function end + DeferOp index_defer([&]() { + if (enable_persistent_index ^ _tablet.get_enable_persistent_index()) { + manager->index_cache().remove(index_entry); + } else { + manager->index_cache().release(index_entry); + } + }); // empty rowset does not need to load in-memory primary index, so skip it if (rowset->has_data_files() || _tablet.get_enable_persistent_index()) { auto st = index.load(&_tablet); manager->index_cache().update_object_size(index_entry, index.memory_usage()); if (!st.ok()) { - manager->index_cache().remove(index_entry); failure_handler("load primary index failed", st); return; } } - bool enable_persistent_index = index.enable_persistent_index(); PersistentIndexMetaPB index_meta; if (enable_persistent_index) { st = TabletMetaManager::get_persistent_index_meta(_tablet.data_dir(), _tablet.tablet_id(), &index_meta); @@ -1018,13 +1026,6 @@ void TabletUpdates::_apply_column_partial_update_commit(const EditVersionInfo& v return; } - // 6. clear state and index cache - manager->update_column_state_cache().remove(state_entry); - if (enable_persistent_index ^ _tablet.get_enable_persistent_index()) { - manager->index_cache().remove(index_entry); - } else { - manager->index_cache().release(index_entry); - } _update_total_stats(version_info.rowsets, nullptr, nullptr); } diff --git a/be/src/storage/update_manager.cpp b/be/src/storage/update_manager.cpp index f946b05bd2734..3576ba9a0b6aa 100644 --- a/be/src/storage/update_manager.cpp +++ b/be/src/storage/update_manager.cpp @@ -565,4 +565,32 @@ void UpdateManager::on_rowset_cancel(Tablet* tablet, Rowset* rowset) { } } +bool UpdateManager::TEST_update_state_exist(Tablet* tablet, Rowset* rowset) { + string rowset_unique_id = rowset->rowset_id().to_string(); + if (rowset->is_column_mode_partial_update()) { + auto column_state_entry = + _update_column_state_cache.get(strings::Substitute("$0_$1", tablet->tablet_id(), rowset_unique_id)); + if (column_state_entry != nullptr) { + _update_column_state_cache.remove(column_state_entry); + return true; + } + } else { + auto state_entry = _update_state_cache.get(strings::Substitute("$0_$1", tablet->tablet_id(), rowset_unique_id)); + if (state_entry != nullptr) { + _update_state_cache.remove(state_entry); + return true; + } + } + return false; +} + +bool UpdateManager::TEST_primary_index_refcnt(int64_t tablet_id, uint32_t expected_cnt) { + auto index_entry = _index_cache.get(tablet_id); + if (index_entry == nullptr) { + return expected_cnt == 0; + } + _index_cache.release(index_entry); + return index_entry->get_ref() == expected_cnt; +} + } // namespace starrocks diff --git a/be/src/storage/update_manager.h b/be/src/storage/update_manager.h index 78f6cd5c3d08e..0d6d59d299f40 100644 --- a/be/src/storage/update_manager.h +++ b/be/src/storage/update_manager.h @@ -145,6 +145,10 @@ class UpdateManager { bool keep_pindex_bf() { return _keep_pindex_bf; } void set_keep_pindex_bf(bool keep_pindex_bf) { _keep_pindex_bf = keep_pindex_bf; } + // Used in UT only + bool TEST_update_state_exist(Tablet* tablet, Rowset* rowset); + bool TEST_primary_index_refcnt(int64_t tablet_id, uint32_t expected_cnt); + private: // default 6min int64_t _cache_expire_ms = 360000; diff --git a/be/test/storage/rowset_column_partial_update_test.cpp b/be/test/storage/rowset_column_partial_update_test.cpp index 21363830d0775..019ff653391fe 100644 --- a/be/test/storage/rowset_column_partial_update_test.cpp +++ b/be/test/storage/rowset_column_partial_update_test.cpp @@ -365,6 +365,12 @@ static void prepare_tablet(RowsetColumnPartialUpdateTest* self, const TabletShar ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2) { return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2; })); + // check refcnt + for (const auto& rs_ptr : rowsets) { + ASSERT_FALSE( + StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get())); + } + ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1)); version_before_partial_update = version; } @@ -387,6 +393,12 @@ static void prepare_tablet(RowsetColumnPartialUpdateTest* self, const TabletShar ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2) { return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2; })); + // check refcnt + for (const auto& rs_ptr : rowsets) { + ASSERT_FALSE( + StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get())); + } + ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1)); } } @@ -501,6 +513,11 @@ TEST_F(RowsetColumnPartialUpdateTest, partial_update_diff_column_and_check) { ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2) { return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2; })); + // check refcnt + for (const auto& rs_ptr : rowsets) { + ASSERT_FALSE(StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get())); + } + ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1)); } TEST_F(RowsetColumnPartialUpdateTest, partial_update_multi_segment_and_check) { @@ -535,6 +552,11 @@ TEST_F(RowsetColumnPartialUpdateTest, partial_update_multi_segment_and_check) { ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2) { return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2; })); + // check refcnt + for (const auto& rs_ptr : rowsets) { + ASSERT_FALSE(StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get())); + } + ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1)); } TEST_F(RowsetColumnPartialUpdateTest, partial_update_multi_segment_preload_and_check) { @@ -799,6 +821,12 @@ TEST_F(RowsetColumnPartialUpdateTest, test_upsert) { ASSERT_TRUE(check_tablet(tablet, version, 2 * N, [](int64_t k1, int64_t v1, int32_t v2) { return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2; })); + // check refcnt + for (const auto& rs_ptr : rowsets) { + ASSERT_FALSE( + StorageEngine::instance()->update_manager()->TEST_update_state_exist(tablet.get(), rs_ptr.get())); + } + ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1)); } }