From e551bca4e552d8d255341ecdf165170d57482704 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Wed, 13 Nov 2024 16:04:38 +0500 Subject: [PATCH] Rocksdb ttl --- cmake/Hunter/config.cmake | 2 +- core/application/app_configuration.hpp | 5 + .../impl/app_configuration_impl.cpp | 12 +- .../impl/app_configuration_impl.hpp | 4 + core/injector/application_injector.cpp | 7 +- .../store/candidate_chunk_key.hpp | 53 +++ .../availability/store/store_impl.cpp | 172 ++++++++- .../availability/store/store_impl.hpp | 6 +- core/storage/rocksdb/rocksdb.cpp | 328 +++++++++++++++--- core/storage/rocksdb/rocksdb.hpp | 60 +++- core/storage/rocksdb/rocksdb_spaces.cpp | 1 + core/storage/spaces.hpp | 1 + .../injector/application_injector_test.cpp | 2 + .../application/app_configuration_mock.hpp | 2 + 14 files changed, 594 insertions(+), 61 deletions(-) create mode 100644 core/parachain/availability/store/candidate_chunk_key.hpp diff --git a/cmake/Hunter/config.cmake b/cmake/Hunter/config.cmake index a4bb35f002..8c4c514445 100644 --- a/cmake/Hunter/config.cmake +++ b/cmake/Hunter/config.cmake @@ -35,7 +35,7 @@ hunter_config( hunter_config( rocksdb VERSION 9.6.1 - CMAKE_ARGS WITH_GFLAGS=OFF + CMAKE_ARGS WITH_GFLAGS=OFF USE_RTTI=ON ) hunter_config( diff --git a/core/application/app_configuration.hpp b/core/application/app_configuration.hpp index 261c9d9d88..6a5807553c 100644 --- a/core/application/app_configuration.hpp +++ b/core/application/app_configuration.hpp @@ -267,6 +267,11 @@ namespace kagome::application { */ virtual bool disableSecureMode() const = 0; + /** + * Whether to enable automatic database migration. + */ + virtual bool enableDbMigration() const = 0; + enum class OffchainWorkerMode : uint8_t { WhenValidating, Always, diff --git a/core/application/impl/app_configuration_impl.cpp b/core/application/impl/app_configuration_impl.cpp index 12d6fe5a1a..9ca33b02e3 100644 --- a/core/application/impl/app_configuration_impl.cpp +++ b/core/application/impl/app_configuration_impl.cpp @@ -175,12 +175,11 @@ namespace { static constexpr std::array - interpreters { + interpreters{ #if KAGOME_WASM_COMPILER_WASM_EDGE == 1 - "WasmEdge", + "WasmEdge", #endif - "Binaryen" - }; + "Binaryen"}; static const std::string interpreters_str = fmt::format("[{}]", fmt::join(interpreters, ", ")); @@ -844,6 +843,7 @@ namespace kagome::application { ("state-pruning", po::value()->default_value("archive"), "state pruning policy. 'archive', 'prune-discarded', or the number of finalized blocks to keep.") ("blocks-pruning", po::value(), "If specified, keep block body only for specified number of recent finalized blocks.") ("enable-thorough-pruning", po::bool_switch(), "Makes trie node pruner more efficient, but the node starts slowly") + ("enable-db-migration", po::bool_switch(), "Enable automatic db migration") ; po::options_description network_desc("Network options"); @@ -1618,6 +1618,10 @@ namespace kagome::application { blocks_pruning_ = find_argument(vm, "blocks-pruning"); + if (find_argument(vm, "enable-db-migration")) { + enable_db_migration_ = true; + } + if (find_argument(vm, "precompile-relay")) { precompile_wasm_.emplace(); } diff --git a/core/application/impl/app_configuration_impl.hpp b/core/application/impl/app_configuration_impl.hpp index 977da0306e..e87e9149d9 100644 --- a/core/application/impl/app_configuration_impl.hpp +++ b/core/application/impl/app_configuration_impl.hpp @@ -208,6 +208,9 @@ namespace kagome::application { std::optional blocksPruning() const override { return blocks_pruning_; } + bool enableDbMigration() const override { + return enable_db_migration_; + } std::optional devMnemonicPhrase() const override { if (dev_mnemonic_phrase_) { return *dev_mnemonic_phrase_; @@ -375,6 +378,7 @@ namespace kagome::application { bool prune_discarded_states_ = false; bool enable_thorough_pruning_ = false; std::optional blocks_pruning_; + bool enable_db_migration_ = false; std::optional dev_mnemonic_phrase_; std::string node_wss_pem_; std::optional benchmark_config_; diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index 6e063f3f57..fb9c248df0 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -258,6 +258,7 @@ namespace { const sptr &chain_spec) { // hack for recovery mode (otherwise - fails due to rocksdb bug) bool prevent_destruction = app_config.recoverState().has_value(); + bool enable_migration = app_config.enableDbMigration(); auto options = rocksdb::Options{}; options.create_if_missing = true; @@ -273,11 +274,15 @@ namespace { // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) options.max_open_files = soft_limit.value() / 2; + const std::unordered_map column_ttl = { + {"avaliability_storage", 25 * 60 * 60}}; // 25 hours auto db_res = storage::RocksDb::create(app_config.databasePath(chain_spec->id()), options, app_config.dbCacheSize(), - prevent_destruction); + prevent_destruction, + column_ttl, + enable_migration); if (!db_res) { auto log = log::createLogger("Injector", "injector"); log->critical( diff --git a/core/parachain/availability/store/candidate_chunk_key.hpp b/core/parachain/availability/store/candidate_chunk_key.hpp new file mode 100644 index 0000000000..5f9d739194 --- /dev/null +++ b/core/parachain/availability/store/candidate_chunk_key.hpp @@ -0,0 +1,53 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include + +#include "parachain/types.hpp" +#include "primitives/common.hpp" + +namespace kagome { + struct CandidateChunkKey { + static constexpr size_t kCandidateHashSize = + sizeof(parachain::CandidateHash); + static constexpr size_t kChunkIndexSize = sizeof(parachain::ChunkIndex); + using Key = common::Blob; + using HashKey = common::Blob; + + static Key encode(const parachain::CandidateHash &candidate_hash, + const parachain::ChunkIndex &chunk_index) { + Key key; + std::copy_n( + encode_hash(candidate_hash).data(), kCandidateHashSize, key.data()); + boost::endian::store_big_u32(key.data() + kCandidateHashSize, + chunk_index); + return key; + } + + static HashKey encode_hash(const parachain::CandidateHash &candidate_hash) { + HashKey key; + std::copy_n(candidate_hash.data(), kCandidateHashSize, key.data()); + return key; + } + + static std::optional< + std::pair> + decode(common::BufferView key) { + if (key.size() != Key::size()) { + return std::nullopt; + } + std::pair candidateChunk; + std::copy_n(key.data(), kCandidateHashSize, candidateChunk.first.data()); + candidateChunk.second = + boost::endian::load_big_u32(key.data() + kCandidateHashSize); + return candidateChunk; + } + }; +} // namespace kagome diff --git a/core/parachain/availability/store/store_impl.cpp b/core/parachain/availability/store/store_impl.cpp index 28d20a6c6a..2b471b2a45 100644 --- a/core/parachain/availability/store/store_impl.cpp +++ b/core/parachain/availability/store/store_impl.cpp @@ -5,22 +5,39 @@ */ #include "parachain/availability/store/store_impl.hpp" +#include "candidate_chunk_key.hpp" constexpr uint64_t KEEP_CANDIDATES_TIMEOUT = 10 * 60; namespace kagome::parachain { - AvailabilityStoreImpl::AvailabilityStoreImpl(clock::SteadyClock &steady_clock) - : steady_clock_(steady_clock) {} + AvailabilityStoreImpl::AvailabilityStoreImpl( + clock::SteadyClock &steady_clock, + std::shared_ptr storage) + : steady_clock_{steady_clock}, storage_{std::move(storage)} { + BOOST_ASSERT(storage_ != nullptr); + } bool AvailabilityStoreImpl::hasChunk(const CandidateHash &candidate_hash, ValidatorIndex index) const { - return state_.sharedAccess([&](const auto &state) { + const auto has_chunk = state_.sharedAccess([&](const auto &state) { auto it = state.per_candidate_.find(candidate_hash); if (it == state.per_candidate_.end()) { return false; } return it->second.chunks.count(index) != 0; }); + if (has_chunk) { + return true; + } + const auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage); + if (not space) { + SL_CRITICAL(logger, + "Failed to get AvaliabilityStorage space in hasChunk"); + return false; + } + auto chunk_from_db = + space->get(CandidateChunkKey::encode(candidate_hash, index)); + return chunk_from_db.has_value(); } bool AvailabilityStoreImpl::hasPov( @@ -48,7 +65,7 @@ namespace kagome::parachain { std::optional AvailabilityStoreImpl::getChunk(const CandidateHash &candidate_hash, ValidatorIndex index) const { - return state_.sharedAccess( + auto chunk = state_.sharedAccess( [&](const auto &state) -> std::optional { auto it = state.per_candidate_.find(candidate_hash); @@ -61,6 +78,30 @@ namespace kagome::parachain { } return it2->second; }); + if (chunk) { + return chunk; + } + auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage); + if (not space) { + SL_ERROR(logger, "Failed to get space for candidate {}", candidate_hash); + return std::nullopt; + } + auto chunk_from_db = + space->get(CandidateChunkKey::encode(candidate_hash, index)); + if (not chunk_from_db) { + return std::nullopt; + } + const auto decoded_chunk = + scale::decode(chunk_from_db.value()); + if (not decoded_chunk) { + SL_ERROR(logger, + "Failed to decode chunk candidate {} index {} error {}", + candidate_hash, + index, + decoded_chunk.error()); + return std::nullopt; + } + return decoded_chunk.value(); } std::optional @@ -95,7 +136,7 @@ namespace kagome::parachain { std::vector AvailabilityStoreImpl::getChunks( const CandidateHash &candidate_hash) const { - return state_.sharedAccess([&](const auto &state) { + auto chunks = state_.sharedAccess([&](const auto &state) { std::vector chunks; auto it = state.per_candidate_.find(candidate_hash); if (it != state.per_candidate_.end()) { @@ -105,6 +146,65 @@ namespace kagome::parachain { } return chunks; }); + if (not chunks.empty()) { + return chunks; + } + auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage); + if (not space) { + SL_CRITICAL(logger, + "Failed to get AvaliabilityStorage space in getChunks"); + return chunks; + } + auto cursor = space->cursor(); + if (not cursor) { + SL_ERROR(logger, "Failed to get cursor for AvaliabilityStorage"); + return chunks; + } + const auto seek_key = CandidateChunkKey::encode_hash(candidate_hash); + auto seek_res = cursor->seek(seek_key); + if (not seek_res) { + SL_ERROR(logger, + "Failed to seek for candidate {} error: {}", + candidate_hash, + seek_res.error()); + return chunks; + } + if (not seek_res.value()) { + SL_DEBUG(logger, "Seek not found for candidate {}", candidate_hash); + return chunks; + } + const auto check_key = [&seek_key](const auto &key) { + if (not key) { + return false; + } + const auto &key_value = key.value(); + return key_value.size() >= seek_key.size() + and std::equal(seek_key.begin(), seek_key.end(), key_value.begin()); + }; + while (cursor->isValid() and check_key(cursor->key())) { + const auto cursor_opt_value = cursor->value(); + if (cursor_opt_value) { + auto decoded_res = + scale::decode(cursor_opt_value.value()); + if (decoded_res) { + chunks.emplace_back(std::move(decoded_res.value())); + } else { + SL_ERROR(logger, + "Failed to decode value for candidate hash {} error: {}", + candidate_hash, + decoded_res.error()); + } + } else { + SL_ERROR(logger, + "Failed to get value candidate {} for key {}", + candidate_hash, + cursor->key()->toHex()); + } + if (not cursor->next()) { + break; + } + } + return chunks; } void AvailabilityStoreImpl::printStoragesLoad() { @@ -138,7 +238,30 @@ namespace kagome::parachain { state.candidates_[relay_parent].insert(candidate_hash); auto &candidate_data = state.per_candidate_[candidate_hash]; for (auto &&chunk : std::move(chunks)) { + auto encoded_chunk = scale::encode(chunk); + const auto chunk_index = chunk.index; candidate_data.chunks[chunk.index] = std::move(chunk); + if (not encoded_chunk) { + SL_ERROR(logger, + "Failed to encode chunk, error: {}", + encoded_chunk.error()); + continue; + } + auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage); + if (not space) { + SL_ERROR(logger, "Failed to get space"); + continue; + } + if (auto res = space->put( + CandidateChunkKey::encode(candidate_hash, chunk_index), + std::move(encoded_chunk.value())); + not res) { + SL_ERROR(logger, + "Failed to put chunk candidate {} index {} error {}", + candidate_hash, + chunk_index, + res.error()); + } } candidate_data.pov = pov; candidate_data.data = data; @@ -150,6 +273,8 @@ namespace kagome::parachain { void AvailabilityStoreImpl::putChunk(const network::RelayHash &relay_parent, const CandidateHash &candidate_hash, ErasureChunk &&chunk) { + auto encoded_chunk = scale::encode(chunk); + const auto chunk_index = chunk.index; state_.exclusiveAccess([&](auto &state) { prune_candidates_no_lock(state); state.candidates_[relay_parent].insert(candidate_hash); @@ -158,6 +283,28 @@ namespace kagome::parachain { state.candidates_living_keeper_.emplace_back(steady_clock_.nowUint64(), relay_parent); }); + if (not encoded_chunk) { + SL_ERROR( + logger, "Failed to encode chunk, error: {}", encoded_chunk.error()); + return; + } + + auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage); + if (not space) { + SL_ERROR(logger, "Failed to get AvaliabilityStorage space"); + return; + } + + if (auto res = + space->put(CandidateChunkKey::encode(candidate_hash, chunk_index), + std::move(encoded_chunk.value())); + not res) { + SL_ERROR(logger, + "Failed to put chunk candidate {} index {} error {}", + candidate_hash, + chunk_index, + res.error()); + } } void AvailabilityStoreImpl::remove_no_lock( @@ -165,6 +312,21 @@ namespace kagome::parachain { if (auto it = state.candidates_.find(relay_parent); it != state.candidates_.end()) { for (const auto &l : it->second) { + auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage); + if (space) { + for (const auto &chunk : state.per_candidate_[l].chunks) { + if (not space->remove( + CandidateChunkKey::encode(l, chunk.second.index))) { + SL_ERROR(logger, + "Failed to remove chunk candidate {} index {}", + l, + chunk.second.index); + } + } + } else { + SL_CRITICAL(logger, + "Failed to get AvaliabilityStorage space in remove"); + } state.per_candidate_.erase(l); } state.candidates_.erase(it); diff --git a/core/parachain/availability/store/store_impl.hpp b/core/parachain/availability/store/store_impl.hpp index 7d77074dfd..ca67a485db 100644 --- a/core/parachain/availability/store/store_impl.hpp +++ b/core/parachain/availability/store/store_impl.hpp @@ -11,12 +11,13 @@ #include #include #include "log/logger.hpp" +#include "storage/spaced_storage.hpp" #include "utils/safe_object.hpp" - namespace kagome::parachain { class AvailabilityStoreImpl : public AvailabilityStore { public: - AvailabilityStoreImpl(clock::SteadyClock &steady_clock); + AvailabilityStoreImpl(clock::SteadyClock &steady_clock, + std::shared_ptr storage); ~AvailabilityStoreImpl() override = default; bool hasChunk(const CandidateHash &candidate_hash, @@ -63,5 +64,6 @@ namespace kagome::parachain { log::Logger logger = log::createLogger("AvailabilityStore", "parachain"); clock::SteadyClock &steady_clock_; SafeObject state_{}; + std::shared_ptr storage_; }; } // namespace kagome::parachain diff --git a/core/storage/rocksdb/rocksdb.cpp b/core/storage/rocksdb/rocksdb.cpp index cbb69245cb..82b37f8437 100644 --- a/core/storage/rocksdb/rocksdb.cpp +++ b/core/storage/rocksdb/rocksdb.cpp @@ -30,84 +30,277 @@ namespace kagome::storage { } delete db_; } - outcome::result> RocksDb::create( const filesystem::path &path, rocksdb::Options options, uint32_t memory_budget_mib, - bool prevent_destruction) { + bool prevent_destruction, + const std::unordered_map &column_ttl, + bool enable_migration) { + const auto no_db_presented = not fs::exists(path); OUTCOME_TRY(mkdirs(path)); auto log = log::createLogger("RocksDB", "storage"); auto absolute_path = fs::absolute(path); - std::error_code ec; - if (not fs::create_directory(absolute_path.native(), ec) and ec.value()) { - log->error("Can't create directory {} for database: {}", - absolute_path.native(), - ec); - return DatabaseError::IO_ERROR; - } - if (not fs::is_directory(absolute_path.native())) { - log->error("Can't open {} for database: is not a directory", - absolute_path.native()); - return DatabaseError::IO_ERROR; - } + OUTCOME_TRY(createDirectory(absolute_path, log)); - // calculate state cache size per space const auto memory_budget = memory_budget_mib * 1024 * 1024; const auto trie_space_cache_size = static_cast(memory_budget * 0.9); const uint32_t other_spaces_cache_size = (memory_budget - trie_space_cache_size) / (storage::Space::kTotal - 1); + std::vector column_family_descriptors; - column_family_descriptors.reserve(Space::kTotal); - for (auto i = 0; i < Space::kTotal; ++i) { - column_family_descriptors.emplace_back( - spaceName(static_cast(i)), - configureColumn(i != Space::kTrieNode ? other_spaces_cache_size - : trie_space_cache_size)); - } + std::vector ttls; + configureColumnFamilies(column_family_descriptors, + ttls, + column_ttl, + trie_space_cache_size, + other_spaces_cache_size, + log); std::vector existing_families; auto res = rocksdb::DB::ListColumnFamilies( options, path.native(), &existing_families); if (!res.ok() && !res.IsPathNotFound()) { SL_ERROR(log, - "Can't open database in {}: {}", + "Can't list column families in {}: {}", absolute_path.native(), res.ToString()); return status_as_error(res); } - for (auto &family : existing_families) { - if (std::ranges::find_if( - column_family_descriptors, - [&family](rocksdb::ColumnFamilyDescriptor &desc) { - return desc.name == family; - }) - == column_family_descriptors.end()) { - column_family_descriptors.emplace_back( - family, configureColumn(other_spaces_cache_size)); - } - } options.create_missing_column_families = true; auto rocks_db = std::shared_ptr(new RocksDb); + const auto ttl_migrated_path = path.parent_path() / "ttl_migrated"; + const auto ttl_migrated_exists = fs::exists(ttl_migrated_path); + + if (no_db_presented or ttl_migrated_exists) { + OUTCOME_TRY(openDatabaseWithTTL(options, + path, + column_family_descriptors, + ttls, + rocks_db, + ttl_migrated_path, + log)); + return rocks_db; + } + + if (not enable_migration) { + SL_ERROR(log, + "Database migration is disabled, use older kagome version or " + "run with migration enabling flag"); + return DatabaseError::IO_ERROR; + } + + OUTCOME_TRY(migrateDatabase(options, + path, + column_family_descriptors, + ttls, + rocks_db, + ttl_migrated_path, + log)); + return rocks_db; + } + + outcome::result RocksDb::createDirectory( + const std::filesystem::path &absolute_path, log::Logger &log) { + std::error_code ec; + if (not fs::create_directory(absolute_path.native(), ec) and ec.value()) { + SL_ERROR(log, + "Can't create directory {} for database: {}", + absolute_path.native(), + ec); + return DatabaseError::IO_ERROR; + } + if (not fs::is_directory(absolute_path.native())) { + SL_ERROR(log, + "Can't open {} for database: is not a directory", + absolute_path.native()); + return DatabaseError::IO_ERROR; + } + return outcome::success(); + } + + void RocksDb::configureColumnFamilies( + std::vector &column_family_descriptors, + std::vector &ttls, + const std::unordered_map &column_ttl, + uint32_t trie_space_cache_size, + uint32_t other_spaces_cache_size, + log::Logger &log) { + for (auto i = 0; i < Space::kTotal; ++i) { + const auto space_name = spaceName(static_cast(i)); + auto ttl = 0; + if (const auto it = column_ttl.find(space_name); it != column_ttl.end()) { + ttl = it->second; + } + column_family_descriptors.emplace_back( + space_name, + configureColumn(i != Space::kTrieNode ? other_spaces_cache_size + : trie_space_cache_size)); + ttls.push_back(ttl); + SL_DEBUG(log, "Column family {} configured with TTL {}", space_name, ttl); + } + } + + outcome::result RocksDb::openDatabaseWithTTL( + const rocksdb::Options &options, + const filesystem::path &path, + const std::vector + &column_family_descriptors, + const std::vector &ttls, + std::shared_ptr &rocks_db, + const filesystem::path &ttl_migrated_path, + log::Logger &log) { + const auto status = + rocksdb::DBWithTTL::Open(options, + path.native(), + column_family_descriptors, + &rocks_db->column_family_handles_, + &rocks_db->db_, + ttls); + if (not status.ok()) { + SL_ERROR(log, + "Can't open database in {}: {}", + path.native(), + status.ToString()); + return status_as_error(status); + } + if (not fs::exists(ttl_migrated_path)) { + std::ofstream file(ttl_migrated_path.native()); + if (not file) { + SL_ERROR(log, + "Can't create file {} for database", + ttl_migrated_path.native()); + return DatabaseError::IO_ERROR; + } + file.close(); + } + return outcome::success(); + } + + outcome::result RocksDb::migrateDatabase( + const rocksdb::Options &options, + const filesystem::path &path, + const std::vector + &column_family_descriptors, + const std::vector &ttls, + std::shared_ptr &rocks_db, + const filesystem::path &ttl_migrated_path, + log::Logger &log) { + rocksdb::DB *db_raw = nullptr; + std::vector column_family_handles; auto status = rocksdb::DB::Open(options, path.native(), column_family_descriptors, - &rocks_db->column_family_handles_, - &rocks_db->db_); - if (status.ok()) { - return rocks_db; + &column_family_handles, + &db_raw); + std::shared_ptr db(db_raw); + if (not status.ok()) { + SL_ERROR(log, + "Can't open old database in {}: {}", + path.native(), + status.ToString()); + return status_as_error(status); } + auto defer_db = + std::make_unique(db, column_family_handles, log); - SL_ERROR(log, - "Can't open database in {}: {}", - absolute_path.native(), - status.ToString()); - - return status_as_error(status); + std::vector column_family_handles_with_ttl; + const auto ttl_path = path.parent_path() / "db_ttl"; + std::error_code ec; + fs::create_directories(ttl_path, ec); + if (ec) { + SL_ERROR(log, + "Can't create directory {} for database: {}", + ttl_path.native(), + ec); + return DatabaseError::IO_ERROR; + } + rocksdb::DBWithTTL *db_with_ttl_raw = nullptr; + status = rocksdb::DBWithTTL::Open(options, + ttl_path.native(), + column_family_descriptors, + &column_family_handles_with_ttl, + &db_with_ttl_raw, + ttls); + if (not status.ok()) { + SL_ERROR(log, + "Can't open database in {}: {}", + ttl_path.native(), + status.ToString()); + return status_as_error(status); + } + std::shared_ptr db_with_ttl(db_with_ttl_raw); + auto defer_db_ttl = std::make_unique( + db_with_ttl, column_family_handles_with_ttl, log); + + for (std::size_t i = 0; i < column_family_handles.size(); ++i) { + const auto from_handle = column_family_handles[i]; + auto to_handle = column_family_handles_with_ttl[i]; + std::unique_ptr it( + db->NewIterator(rocksdb::ReadOptions(), from_handle)); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + const auto &key = it->key(); + const auto &value = it->value(); + status = + db_with_ttl->Put(rocksdb::WriteOptions(), to_handle, key, value); + if (not status.ok()) { + SL_ERROR(log, "Can't write to ttl database: {}", status.ToString()); + return status_as_error(status); + } + } + if (not it->status().ok()) { + return status_as_error(it->status()); + } + } + defer_db_ttl.reset(); + defer_db.reset(); + fs::remove_all(path, ec); + if (ec) { + SL_ERROR(log, "Can't remove old database in {}: {}", path.native(), ec); + return DatabaseError::IO_ERROR; + } + fs::create_directories(path, ec); + if (ec) { + SL_ERROR(log, + "Can't create directory {} for final database: {}", + path.native(), + ec); + return DatabaseError::IO_ERROR; + } + fs::rename(ttl_path, path, ec); + if (ec) { + SL_ERROR(log, + "Can't rename database from {} to {}: {}", + ttl_path.native(), + path.native(), + ec); + return DatabaseError::IO_ERROR; + } + status = rocksdb::DBWithTTL::Open(options, + path.native(), + column_family_descriptors, + &rocks_db->column_family_handles_, + &rocks_db->db_, + ttls); + if (not status.ok()) { + SL_ERROR(log, + "Can't open database in {}: {}", + path.native(), + status.ToString()); + return status_as_error(status); + } + std::ofstream file(ttl_migrated_path.native()); + if (not file) { + SL_ERROR( + log, "Can't create file {} for database", ttl_migrated_path.native()); + return DatabaseError::IO_ERROR; + } + file.close(); + return outcome::success(); } std::shared_ptr RocksDb::getSpace(Space space) { @@ -172,6 +365,53 @@ namespace kagome::storage { return options; } + RocksDb::DatabaseGuard::DatabaseGuard( + std::shared_ptr db, + std::vector column_family_handles, + log::Logger log) + : db_(std::move(db)), + column_family_handles_(std::move(column_family_handles)), + log_(std::move(log)) {} + + RocksDb::DatabaseGuard::DatabaseGuard( + std::shared_ptr db_ttl, + std::vector column_family_handles, + log::Logger log) + : db_ttl_(std::move(db_ttl)), + column_family_handles_(std::move(column_family_handles)), + log_(std::move(log)) {} + + RocksDb::DatabaseGuard::~DatabaseGuard() { + const auto clean = [this](auto db) { + auto status = db->Flush(rocksdb::FlushOptions()); + if (not status.ok()) { + SL_ERROR(log_, "Can't flush database: {}", status.ToString()); + } + + status = db->WaitForCompact(rocksdb::WaitForCompactOptions()); + if (not status.ok()) { + SL_ERROR(log_, + "Can't wait for background compaction: {}", + status.ToString()); + } + + for (auto *handle : column_family_handles_) { + db->DestroyColumnFamilyHandle(handle); + } + + status = db->Close(); + if (not status.ok()) { + SL_ERROR(log_, "Can't close database: {}", status.ToString()); + } + db.reset(); + }; + if (db_) { + clean(db_); + } else if (db_ttl_) { + clean(db_ttl_); + } + } + RocksDbSpace::RocksDbSpace(std::weak_ptr storage, const RocksDb::ColumnFamilyHandlePtr &column, log::Logger logger) diff --git a/core/storage/rocksdb/rocksdb.hpp b/core/storage/rocksdb/rocksdb.hpp index ab57020a12..f3d6aec89d 100644 --- a/core/storage/rocksdb/rocksdb.hpp +++ b/core/storage/rocksdb/rocksdb.hpp @@ -10,6 +10,7 @@ #include #include +#include #include #include "filesystem/common.hpp" @@ -48,7 +49,9 @@ namespace kagome::storage { const filesystem::path &path, rocksdb::Options options = rocksdb::Options(), uint32_t memory_budget_mib = kDefaultStateCacheSizeMiB, - bool prevent_destruction = false); + bool prevent_destruction = false, + const std::unordered_map &column_ttl = {}, + bool enable_migration = true); std::shared_ptr getSpace(Space space) override; @@ -73,11 +76,60 @@ namespace kagome::storage { friend class RocksDbBatch; private: - RocksDb(); + struct DatabaseGuard { + DatabaseGuard( + std::shared_ptr db, + std::vector column_family_handles, + log::Logger log); + + DatabaseGuard( + std::shared_ptr db_ttl, + std::vector column_family_handles, + log::Logger log); + + ~DatabaseGuard(); + + private: + std::shared_ptr db_; + std::shared_ptr db_ttl_; + std::vector column_family_handles_; + log::Logger log_; + }; + RocksDb(); static rocksdb::ColumnFamilyOptions configureColumn(uint32_t memory_budget); - - rocksdb::DB *db_{}; + static outcome::result createDirectory( + const std::filesystem::path &absolute_path, log::Logger &log); + + static void configureColumnFamilies( + std::vector &column_family_descriptors, + std::vector &ttls, + const std::unordered_map &column_ttl, + uint32_t trie_space_cache_size, + uint32_t other_spaces_cache_size, + log::Logger &log); + + static outcome::result openDatabaseWithTTL( + const rocksdb::Options &options, + const filesystem::path &path, + const std::vector + &column_family_descriptors, + const std::vector &ttls, + std::shared_ptr &rocks_db, + const filesystem::path &ttl_migrated_path, + log::Logger &log); + + static outcome::result migrateDatabase( + const rocksdb::Options &options, + const filesystem::path &path, + const std::vector + &column_family_descriptors, + const std::vector &ttls, + std::shared_ptr &rocks_db, + const filesystem::path &ttl_migrated_path, + log::Logger &log); + + rocksdb::DBWithTTL *db_{}; std::vector column_family_handles_; boost::container::flat_map> spaces_; rocksdb::ReadOptions ro_; diff --git a/core/storage/rocksdb/rocksdb_spaces.cpp b/core/storage/rocksdb/rocksdb_spaces.cpp index e4eb18b498..c1f4f920c9 100644 --- a/core/storage/rocksdb/rocksdb_spaces.cpp +++ b/core/storage/rocksdb/rocksdb_spaces.cpp @@ -23,6 +23,7 @@ namespace kagome::storage { "trie_value", "dispute_data", "beefy_justification", + "avaliability_storage" }; static_assert(kNames.size() == Space::kTotal - 1); diff --git a/core/storage/spaces.hpp b/core/storage/spaces.hpp index 286da4bcd1..3f4ef67dd3 100644 --- a/core/storage/spaces.hpp +++ b/core/storage/spaces.hpp @@ -23,6 +23,7 @@ namespace kagome::storage { kTrieValue, kDisputeData, kBeefyJustification, + kAvaliabilityStorage, kTotal }; diff --git a/test/core/injector/application_injector_test.cpp b/test/core/injector/application_injector_test.cpp index 28d2f9f7af..49b6eb61c9 100644 --- a/test/core/injector/application_injector_test.cpp +++ b/test/core/injector/application_injector_test.cpp @@ -96,6 +96,8 @@ namespace { .WillRepeatedly(testing::Return(db_path)); EXPECT_CALL(config_mock, keystorePath(_)) .WillRepeatedly(testing::Return(db_path / "keys")); + EXPECT_CALL(config_mock, enableDbMigration()) + .WillRepeatedly(testing::Return(true)); kagome::network::Roles roles; roles.flags.full = 1; roles.flags.authority = 1; diff --git a/test/mock/core/application/app_configuration_mock.hpp b/test/mock/core/application/app_configuration_mock.hpp index 44c7c93385..3fa0d2761e 100644 --- a/test/mock/core/application/app_configuration_mock.hpp +++ b/test/mock/core/application/app_configuration_mock.hpp @@ -143,6 +143,8 @@ namespace kagome::application { MOCK_METHOD(bool, disableSecureMode, (), (const, override)); + MOCK_METHOD(bool, enableDbMigration, (), (const, override)); + MOCK_METHOD(bool, isOffchainIndexingEnabled, (), (const, override)); MOCK_METHOD(std::optional, subcommand, (), (const, override));