diff --git a/tddb/CMakeLists.txt b/tddb/CMakeLists.txt index 89730b954..2829ef0ac 100644 --- a/tddb/CMakeLists.txt +++ b/tddb/CMakeLists.txt @@ -37,6 +37,9 @@ set(TDDB_SOURCE set(TDDB_ROCKSDB_SOURCE td/db/RocksDb.cpp td/db/RocksDb.h + + td/db/RocksDbSecondary.cpp + td/db/RocksDbSecondary.h ) set(TDDB_TEST_SOURCE diff --git a/tddb/td/db/RocksDbSecondary.cpp b/tddb/td/db/RocksDbSecondary.cpp new file mode 100644 index 000000000..1f3e77269 --- /dev/null +++ b/tddb/td/db/RocksDbSecondary.cpp @@ -0,0 +1,198 @@ +/* + This file is part of TON Blockchain Library. + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . + Copyright 2017-2020 Telegram Systems LLP +*/ +#include "td/db/RocksDbSecondary.h" + +#include "rocksdb/db.h" +#include "rocksdb/table.h" +#include "rocksdb/statistics.h" +#include "rocksdb/write_batch.h" +#include "rocksdb/utilities/transaction.h" + +namespace td { +namespace { +static Status from_rocksdb(rocksdb::Status status) { + if (status.ok()) { + return Status::OK(); + } + return Status::Error(status.ToString()); +} +static Slice from_rocksdb(rocksdb::Slice slice) { + return Slice(slice.data(), slice.size()); +} +static rocksdb::Slice to_rocksdb(Slice slice) { + return rocksdb::Slice(slice.data(), slice.size()); +} +} // namespace + +Status RocksDbSecondary::destroy(Slice path) { + return from_rocksdb(rocksdb::DestroyDB(path.str(), {})); +} + +RocksDbSecondary::RocksDbSecondary(RocksDbSecondary &&) = default; +RocksDbSecondary &RocksDbSecondary::operator=(RocksDbSecondary &&) = default; + +RocksDbSecondary::~RocksDbSecondary() { + if (!db_) { + return; + } + end_snapshot().ensure(); +} + +RocksDbSecondary RocksDbSecondary::clone() const { + return RocksDbSecondary{db_, statistics_}; +} + +Result RocksDbSecondary::open(std::string path) { + rocksdb::DB *db; + auto statistics = rocksdb::CreateDBStatistics(); + { + rocksdb::Options options; + + static auto cache = rocksdb::NewLRUCache(1 << 30); + + rocksdb::BlockBasedTableOptions table_options; + table_options.block_cache = cache; + options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); + + options.manual_wal_flush = true; + options.create_if_missing = true; + options.max_background_compactions = 4; + options.max_background_flushes = 2; + options.bytes_per_sync = 1 << 20; + options.writable_file_max_buffer_size = 2 << 14; + options.keep_log_file_num = 1; + options.statistics = statistics; + rocksdb::ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, cf_options)); + std::vector handles; + TRY_STATUS(from_rocksdb( + rocksdb::DB::OpenAsSecondary(options, path, path + "/secondary", column_families, &handles, &db))); + CHECK(handles.size() == 1); + // i can delete the handle since DBImpl is always holding a reference to + // default column family + delete handles[0]; + } + TRY_STATUS(from_rocksdb(db->TryCatchUpWithPrimary())); + return RocksDbSecondary(std::shared_ptr(db), std::move(statistics)); +} + +Status RocksDbSecondary::try_catch_up_with_primary() { + return from_rocksdb(db_->TryCatchUpWithPrimary()); +} + +std::unique_ptr RocksDbSecondary::snapshot() { + auto res = std::make_unique(clone()); + res->begin_snapshot().ensure(); + return std::move(res); +} + +std::string RocksDbSecondary::stats() const { + std::string out; + db_->GetProperty("rocksdb.stats", &out); + //db_->GetProperty("rocksdb.cur-size-all-mem-tables", &out); + return out; +} + +Result RocksDbSecondary::get(Slice key, std::string &value) { + rocksdb::Status status; + if (snapshot_) { + rocksdb::ReadOptions options; + options.snapshot = snapshot_.get(); + status = db_->Get(options, to_rocksdb(key), &value); + } else { + status = db_->Get({}, to_rocksdb(key), &value); + } + if (status.ok()) { + return GetStatus::Ok; + } + if (status.code() == rocksdb::Status::kNotFound) { + return GetStatus::NotFound; + } + return from_rocksdb(status); +} + +Status RocksDbSecondary::set(Slice key, Slice value) { + CHECK(false) +} + +Status RocksDbSecondary::erase(Slice key) { + CHECK(false) +} + +Result RocksDbSecondary::count(Slice prefix) { + rocksdb::ReadOptions options; + options.snapshot = snapshot_.get(); + std::unique_ptr iterator; + + iterator.reset(db_->NewIterator(options)); + + size_t res = 0; + for (iterator->Seek(to_rocksdb(prefix)); iterator->Valid(); iterator->Next()) { + if (from_rocksdb(iterator->key()).truncate(prefix.size()) != prefix) { + break; + } + res++; + } + if (!iterator->status().ok()) { + return from_rocksdb(iterator->status()); + } + return res; +} + +Status RocksDbSecondary::begin_write_batch() { + CHECK(false) +} + +Status RocksDbSecondary::begin_transaction() { + CHECK(false) +} + +Status RocksDbSecondary::commit_write_batch() { + CHECK(false) +} + +Status RocksDbSecondary::commit_transaction() { + CHECK(false) +} + +Status RocksDbSecondary::abort_write_batch() { + CHECK(false) +} + +Status RocksDbSecondary::abort_transaction() { + CHECK(false) +} + +Status RocksDbSecondary::flush() { + CHECK(false) +} + +Status RocksDbSecondary::begin_snapshot() { + snapshot_.reset(db_->GetSnapshot()); + return td::Status::OK(); +} + +Status RocksDbSecondary::end_snapshot() { + if (snapshot_) { + db_->ReleaseSnapshot(snapshot_.release()); + } + return td::Status::OK(); +} + +RocksDbSecondary::RocksDbSecondary(std::shared_ptr db, std::shared_ptr statistics) + : db_(std::move(db)), statistics_(std::move(statistics)) { +} +} // namespace td \ No newline at end of file diff --git a/tddb/td/db/RocksDbSecondary.h b/tddb/td/db/RocksDbSecondary.h new file mode 100644 index 000000000..8c6c3dd45 --- /dev/null +++ b/tddb/td/db/RocksDbSecondary.h @@ -0,0 +1,85 @@ +/* + This file is part of TON Blockchain Library. + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . + Copyright 2017-2020 Telegram Systems LLP +*/ +#pragma once + +#if !TDDB_USE_ROCKSDB +#error "RocksDb is not supported" +#endif + +#include "td/db/KeyValue.h" +#include "td/utils/Status.h" + +namespace rocksdb { +class DB; +class Transaction; +class WriteBatch; +class Snapshot; +class Statistics; +} // namespace rocksdb + +namespace td { +class RocksDbSecondary : public KeyValue { + public: + static Status destroy(Slice path); + RocksDbSecondary clone() const; + static Result open(std::string path); + + Status try_catch_up_with_primary(); + + Result get(Slice key, std::string &value) override; + Status set(Slice key, Slice value) override; + Status erase(Slice key) override; + Result count(Slice prefix) override; + + Status begin_write_batch() override; + Status commit_write_batch() override; + Status abort_write_batch() override; + + Status begin_transaction() override; + Status commit_transaction() override; + Status abort_transaction() override; + Status flush() override; + + Status begin_snapshot(); + Status end_snapshot(); + + std::unique_ptr snapshot() override; + std::string stats() const override; + + RocksDbSecondary(RocksDbSecondary &&); + RocksDbSecondary &operator=(RocksDbSecondary &&); + ~RocksDbSecondary(); + + std::shared_ptr raw_db() const { + return db_; + }; + + private: + std::shared_ptr db_; + std::shared_ptr statistics_; + + class UnreachableDeleter { + public: + template + void operator()(T *) { + UNREACHABLE(); + } + }; + std::unique_ptr snapshot_; + + explicit RocksDbSecondary(std::shared_ptr db, + std::shared_ptr statistics); +}; +} // namespace td \ No newline at end of file diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index 2c0c82e51..011a39666 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -21,6 +21,7 @@ #include "td/utils/overloaded.h" #include "files-async.hpp" #include "td/db/RocksDb.h" +#include "td/db/RocksDbSecondary.h" #include "common/delay.h" namespace ton { @@ -56,8 +57,8 @@ std::string PackageId::name() const { } ArchiveManager::ArchiveManager(td::actor::ActorId root, std::string db_root, - td::Ref opts) - : db_root_(db_root), opts_(opts) { + td::Ref opts, bool secondary) + : db_root_(db_root), opts_(opts), secondary_(secondary) { } void ArchiveManager::add_handle(BlockHandle handle, td::Promise promise) { @@ -601,7 +602,7 @@ void ArchiveManager::load_package(PackageId id) { } desc.file = - td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get()); + td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get(), secondary_); m.emplace(id, std::move(desc)); update_permanent_slices(); @@ -636,7 +637,7 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull td::mkdir(db_root_ + id.path()).ensure(); std::string prefix = PSTRING() << db_root_ << id.path() << id.name(); new_desc.file = - td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get()); + td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get(), secondary_); const FileDescription &desc = f.emplace(id, std::move(new_desc)); if (!id.temp) { update_desc(f, desc, shard, seqno, ts, lt); @@ -829,7 +830,12 @@ void ArchiveManager::start_up() { if (opts_->get_max_open_archive_files() > 0) { archive_lru_ = td::actor::create_actor("archive_lru", opts_->get_max_open_archive_files()); } - index_ = std::make_shared(td::RocksDb::open(db_root_ + "/files/globalindex").move_as_ok()); + if (secondary_) { + index_ = std::static_pointer_cast(std::make_shared(td::RocksDbSecondary::open(db_root_ + "/files/globalindex").move_as_ok())); + } else { + index_ = std::static_pointer_cast(std::make_shared(td::RocksDb::open(db_root_ + "/files/globalindex").move_as_ok())); + } + std::string value; auto v = index_->get(create_serialize_tl_object().as_slice(), value); v.ensure(); @@ -884,7 +890,9 @@ void ArchiveManager::start_up() { } }).ensure(); - persistent_state_gc(FileHash::zero()); + if (!secondary_) { + persistent_state_gc(FileHash::zero()); + } double open_since = td::Clocks::system() - opts_->get_archive_preload_period(); for (auto it = files_.rbegin(); it != files_.rend(); ++it) { @@ -905,7 +913,121 @@ void ArchiveManager::start_up() { } } +void ArchiveManager::try_catch_up_with_primary(td::Promise promise) { + CHECK(secondary_); + + auto index_secondary = std::static_pointer_cast(index_); + + auto index_res = index_secondary->try_catch_up_with_primary(); + if (index_res.is_error()) { + promise.set_error(index_res.move_as_error()); + return; + } + + std::string value; + auto v = index_->get(create_serialize_tl_object().as_slice(), value); + v.ensure(); + + CHECK(v.move_as_ok() == td::KeyValue::GetStatus::Ok) + auto R = fetch_tl_object(value, true); + R.ensure(); + auto x = R.move_as_ok(); + + for (auto &d : x->packages_) { + auto id = PackageId{static_cast(d), false, false}; + if (get_file_map(id).count(id) == 0) { + load_package(id); + } else { + auto res = catch_up_package(id); + if (res.is_error()) { + promise.set_error(std::move(res)); + return; + } + } + } + for (auto &d : x->key_packages_) { + auto id = PackageId{static_cast(d), true, false}; + if (get_file_map(id).count(id) == 0) { + load_package(id); + } else { + auto res = catch_up_package(id); + if (res.is_error()) { + promise.set_error(std::move(res)); + return; + } + } + } + for (auto &d : x->temp_packages_) { + auto id = PackageId{static_cast(d), false, true}; + if (get_file_map(id).count(id) == 0) { + load_package(id); + } else { + auto res = catch_up_package(id); + if (res.is_error()) { + promise.set_error(std::move(res)); + return; + } + } + } + + promise.set_result(td::Status::OK()); +} + +td::Status ArchiveManager::catch_up_package(const PackageId& id) { + auto key = create_serialize_tl_object(id.id, id.key, id.temp); + + std::string value; + auto v = index_->get(key.as_slice(), value); + v.ensure(); + CHECK(v.move_as_ok() == td::KeyValue::GetStatus::Ok); + + auto R = fetch_tl_object(value, true); + R.ensure(); + auto x = R.move_as_ok(); + + std::map first_blocks; + if (!id.temp) { + for (auto &e : x->firstblocks_) { + first_blocks[ShardIdFull{e->workchain_, static_cast(e->shard_)}] = FileDescription::Desc{ + static_cast(e->seqno_), static_cast(e->unixtime_), static_cast(e->lt_)}; + } + } + + auto& map = get_file_map(id); + auto it = map.find(id); + CHECK(it != map.end()); + if (it->second.first_blocks != first_blocks || it->second.deleted != x->deleted_) { + FileDescription desc{id, x->deleted_}; + desc.first_blocks = std::move(first_blocks); + desc.file = std::move(it->second.file); + map.erase(it); + map.emplace(id, std::move(desc)); + } + + // probably we should also call ArchiveSlice::try_catch_up_with_primary for desc.file, + // but for now we do it in ArchiveManager::get_max_masterchain_seqno, since it's the only use case. + + return td::Status::OK(); +} + +void ArchiveManager::get_max_masterchain_seqno(td::Promise promise) { + auto fd = get_file_desc_by_seqno(ton::AccountIdPrefixFull(ton::masterchainId, ton::shardIdAll), INT_MAX, false); + if (secondary_) { + auto R = td::PromiseCreator::lambda([SelfId = actor_id(this), promise = std::move(promise), fd = std::move(fd)](td::Result R) mutable { + if (R.is_error()) { + promise.set_error(R.move_as_error()); + } else { + td::actor::send_closure(fd->file, &ArchiveSlice::get_max_masterchain_seqno, std::move(promise)); + } + }); + td::actor::send_closure(fd->file, &ArchiveSlice::try_catch_up_with_primary, std::move(R)); + } else { + td::actor::send_closure(fd->file, &ArchiveSlice::get_max_masterchain_seqno, std::move(promise)); + } +} + void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) { + CHECK(!secondary_); auto p = get_temp_package_id_by_unixtime(std::max(gc_ts, mc_ts - TEMP_PACKAGES_TTL)); std::vector vec; for (auto &x : temp_files_) { @@ -1126,6 +1248,7 @@ void ArchiveManager::set_async_mode(bool mode, td::Promise promise) { } void ArchiveManager::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise) { + CHECK(!secondary_) index_->begin_transaction().ensure(); td::MultiPromise mp; auto ig = mp.init_guard(); diff --git a/validator/db/archive-manager.hpp b/validator/db/archive-manager.hpp index 1c5deaf86..f8d6deb41 100644 --- a/validator/db/archive-manager.hpp +++ b/validator/db/archive-manager.hpp @@ -28,7 +28,7 @@ class RootDb; class ArchiveManager : public td::actor::Actor { public: - ArchiveManager(td::actor::ActorId root, std::string db_root, td::Ref opts); + ArchiveManager(td::actor::ActorId root, std::string db_root, td::Ref opts, bool secondary = false); void add_handle(BlockHandle handle, td::Promise promise); void update_handle(BlockHandle handle, td::Promise promise); @@ -71,6 +71,10 @@ class ArchiveManager : public td::actor::Actor { void start_up() override; + void try_catch_up_with_primary(td::Promise promise); + td::Status catch_up_package(const PackageId& id); + void get_max_masterchain_seqno(td::Promise promise); + void commit_transaction(); void set_async_mode(bool mode, td::Promise promise); @@ -87,6 +91,10 @@ class ArchiveManager : public td::actor::Actor { BlockSeqno seqno; UnixTime ts; LogicalTime lt; + + bool operator==(const Desc& other) const { + return seqno == other.seqno && ts == other.ts && lt == other.lt; + } }; FileDescription(PackageId id, bool deleted) : id(id), deleted(deleted) { } @@ -166,6 +174,7 @@ class ArchiveManager : public td::actor::Actor { void shard_index_add(const FileDescription &desc); void shard_index_del(const FileDescription &desc); }; + bool secondary_; FileMap files_, key_files_, temp_files_; td::actor::ActorOwn archive_lru_; BlockSeqno finalized_up_to_{0}; diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index 52abc0088..465ff8192 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -21,6 +21,7 @@ #include "td/actor/MultiPromise.h" #include "validator/fabric.h" #include "td/db/RocksDb.h" +#include "td/db/RocksDbSecondary.h" #include "td/utils/port/path.h" #include "common/delay.h" #include "files-async.hpp" @@ -452,6 +453,10 @@ void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint3 td::actor::create_actor("readfile", p->path, offset, limit, 0, std::move(promise)).release(); } +void ArchiveSlice::get_max_masterchain_seqno(td::Promise promise) { + promise.set_result(max_masterchain_seqno()); +} + void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) { before_query(); if (!sliced_mode_) { @@ -465,7 +470,11 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise(td::RocksDb::open(db_path_).move_as_ok()); + if (secondary_) { + kv_ = std::make_unique(td::RocksDbSecondary::open(db_path_).move_as_ok()); + } else { + kv_ = std::make_unique(td::RocksDb::open(db_path_).move_as_ok()); + } std::string value; auto R2 = kv_->get("status", value); R2.ensure(); @@ -567,6 +576,44 @@ void ArchiveSlice::end_async_query() { } } +td::Status ArchiveSlice::try_catch_up_with_primary() { + CHECK(secondary_); + + TRY_STATUS(static_cast(kv_.get())->try_catch_up_with_primary()); + + std::string value; + auto R2 = kv_->get("status", value); + R2.ensure(); + if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { + if (value == "sliced") { + R2 = kv_->get("slices", value); + R2.ensure(); + auto tot = td::to_integer(value); + if (tot == packages_.size()) { + return td::Status::OK(); + } + R2 = kv_->get("slice_size", value); + R2.ensure(); + slice_size_ = td::to_integer(value); + CHECK(slice_size_ > 0); + for (td::uint32 i = packages_.size(); i < tot; i++) { + R2 = kv_->get(PSTRING() << "status." << i, value); + R2.ensure(); + auto len = td::to_integer(value); + R2 = kv_->get(PSTRING() << "version." << i, value); + R2.ensure(); + td::uint32 ver = 0; + if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { + ver = td::to_integer(value); + } + auto v = archive_id_ + slice_size_ * i; + add_package(v, len, ver); + } + } + } + return td::Status::OK(); +} + void ArchiveSlice::begin_transaction() { if (!async_mode_ || !huge_transaction_started_) { kv_->begin_transaction().ensure(); @@ -604,14 +651,15 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { } ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, - td::actor::ActorId archive_lru) + td::actor::ActorId archive_lru, bool secondary) : archive_id_(archive_id) , key_blocks_only_(key_blocks_only) , temp_(temp) , finalized_(finalized) , p_id_(archive_id_, key_blocks_only_, temp_) , db_root_(std::move(db_root)) - , archive_lru_(std::move(archive_lru)) { + , archive_lru_(std::move(archive_lru)) + , secondary_(secondary) { db_path_ = PSTRING() << db_root_ << p_id_.path() << p_id_.name() << ".index"; } @@ -644,7 +692,7 @@ td::Result ArchiveSlice::choose_package(BlockSeqno void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 version) { PackageId p_id{seqno, key_blocks_only_, temp_}; std::string path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".pack"; - auto R = Package::open(path, false, true); + auto R = Package::open(path, false, !secondary_); if (R.is_error()) { LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error(); return; @@ -655,7 +703,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver return; } auto pack = std::make_shared(R.move_as_ok()); - if (version >= 1) { + if (version >= 1 && !secondary_) { pack->truncate(size).ensure(); } auto writer = td::actor::create_actor("writer", pack, async_mode_); @@ -831,6 +879,7 @@ void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shar } void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise) { + CHECK(!secondary_); if (temp_ || archive_id_ > masterchain_seqno) { destroy(std::move(promise)); return; @@ -848,7 +897,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl auto pack = cutoff.move_as_ok(); CHECK(pack); - auto pack_r = Package::open(pack->path + ".new", false, true); + auto pack_r = Package::open(pack->path + ".new", false, !is_secondary_); pack_r.ensure(); auto new_package = std::make_shared(pack_r.move_as_ok()); new_package->truncate(0).ensure(); diff --git a/validator/db/archive-slice.hpp b/validator/db/archive-slice.hpp index f178a9b80..9d831f8d1 100644 --- a/validator/db/archive-slice.hpp +++ b/validator/db/archive-slice.hpp @@ -81,7 +81,7 @@ class ArchiveLru; class ArchiveSlice : public td::actor::Actor { public: ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, - td::actor::ActorId archive_lru); + td::actor::ActorId archive_lru, bool secondary = false); void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise); @@ -101,6 +101,8 @@ class ArchiveSlice : public td::actor::Actor { std::function compare, bool exact, td::Promise promise); + void get_max_masterchain_seqno(td::Promise promise); + void get_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise); void destroy(td::Promise promise); @@ -111,6 +113,8 @@ class ArchiveSlice : public td::actor::Actor { void open_files(); void close_files(); + td::Status try_catch_up_with_primary(); + private: void before_query(); void do_close(); @@ -152,6 +156,7 @@ class ArchiveSlice : public td::actor::Actor { std::string db_root_; td::actor::ActorId archive_lru_; std::unique_ptr kv_; + bool secondary_; struct PackageInfo { PackageInfo(std::shared_ptr package, td::actor::ActorOwn writer, BlockSeqno id, diff --git a/validator/db/celldb.cpp b/validator/db/celldb.cpp index d29126cea..f560e7494 100644 --- a/validator/db/celldb.cpp +++ b/validator/db/celldb.cpp @@ -20,6 +20,7 @@ #include "rootdb.hpp" #include "td/db/RocksDb.h" +#include "td/db/RocksDbSecondary.h" #include "ton/ton-tl.hpp" #include "ton/ton-io.hpp" @@ -64,17 +65,20 @@ void CellDbBase::execute_sync(std::function f) { } CellDbIn::CellDbIn(td::actor::ActorId root_db, td::actor::ActorId parent, std::string path, - td::Ref opts) - : root_db_(root_db), parent_(parent), path_(std::move(path)), opts_(opts) { + td::Ref opts, bool secondary) + : root_db_(root_db), parent_(parent), path_(std::move(path)), opts_(opts), secondary_(secondary) { } void CellDbIn::start_up() { on_load_callback_ = [actor = std::make_shared>( td::actor::create_actor("celldbmigration", actor_id(this))), - compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) { + compress_depth = opts_->get_celldb_compress_depth(), secondary = secondary_](const vm::CellLoader::LoadResult& res) { if (res.cell_.is_null()) { return; } + if (secondary) { + return; + } bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0; if (expected_stored_boc != res.stored_boc_) { td::actor::send_closure(*actor, &CellDbIn::MigrationProxy::migrate_cell, @@ -83,7 +87,11 @@ void CellDbIn::start_up() { }; CellDbBase::start_up(); - cell_db_ = std::make_shared(td::RocksDb::open(path_).move_as_ok()); + if (secondary_) { + cell_db_ = std::make_shared(td::RocksDbSecondary::open(path_).move_as_ok()); + } else { + cell_db_ = std::make_shared(td::RocksDb::open(path_).move_as_ok()); + } boc_ = vm::DynamicBagOfCellsDb::create(); boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth()); @@ -101,6 +109,22 @@ void CellDbIn::start_up() { } } +void CellDbIn::try_catch_up_with_primary(td::Promise promise) { + if (!secondary_) { + promise.set_error(td::Status::Error("it's not secondary db")); + } + auto secondary = std::static_pointer_cast(cell_db_); + auto R = secondary->try_catch_up_with_primary(); + if (R.is_error()) { + promise.set_error(R.move_as_error()); + } + + boc_->set_loader(std::make_unique(cell_db_->snapshot())).ensure(); + td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); + + promise.set_result(td::Unit()); +} + void CellDbIn::load_cell(RootHash hash, td::Promise> promise) { boc_->load_cell_async(hash.as_slice(), async_executor, std::move(promise)); } @@ -156,6 +180,9 @@ void CellDbIn::get_cell_db_reader(td::Promise> } void CellDbIn::alarm() { + if (secondary_) { + return; + } if (migrate_after_ && migrate_after_.is_in_past()) { migrate_cells(); } @@ -376,6 +403,7 @@ void CellDb::load_cell(RootHash hash, td::Promise> promise } void CellDb::store_cell(BlockIdExt block_id, td::Ref cell, td::Promise> promise) { + CHECK(!secondary_); td::actor::send_closure(cell_db_, &CellDbIn::store_cell, block_id, std::move(cell), std::move(promise)); } @@ -387,13 +415,16 @@ void CellDb::start_up() { CellDbBase::start_up(); boc_ = vm::DynamicBagOfCellsDb::create(); boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth()); - cell_db_ = td::actor::create_actor("celldbin", root_db_, actor_id(this), path_, opts_); + cell_db_ = td::actor::create_actor("celldbin", root_db_, actor_id(this), path_, opts_, secondary_); on_load_callback_ = [actor = std::make_shared>( td::actor::create_actor("celldbmigration", cell_db_.get())), - compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) { + compress_depth = opts_->get_celldb_compress_depth(), secondary = secondary_](const vm::CellLoader::LoadResult& res) { if (res.cell_.is_null()) { return; } + if (secondary) { + return; + } bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0; if (expected_stored_boc != res.stored_boc_) { td::actor::send_closure(*actor, &CellDbIn::MigrationProxy::migrate_cell, @@ -402,6 +433,10 @@ void CellDb::start_up() { }; } +void CellDb::try_catch_up_with_primary(td::Promise promise) { + td::actor::send_closure(cell_db_, &CellDbIn::try_catch_up_with_primary, std::move(promise)); +} + CellDbIn::DbEntry::DbEntry(tl_object_ptr entry) : block_id(create_block_id(entry->block_id_)) , prev(entry->prev_) diff --git a/validator/db/celldb.hpp b/validator/db/celldb.hpp index a2a84ab4a..c86416be6 100644 --- a/validator/db/celldb.hpp +++ b/validator/db/celldb.hpp @@ -56,8 +56,10 @@ class CellDbIn : public CellDbBase { void migrate_cell(td::Bits256 hash); + void try_catch_up_with_primary(td::Promise promise); + CellDbIn(td::actor::ActorId root_db, td::actor::ActorId parent, std::string path, - td::Ref opts); + td::Ref opts, bool secondary = false); void start_up() override; void alarm() override; @@ -109,6 +111,8 @@ class CellDbIn : public CellDbBase { td::Timestamp migrate_after_ = td::Timestamp::never(); bool migration_active_ = false; + bool secondary_; + struct MigrationStats { td::Timer start_; td::Timestamp end_at_ = td::Timestamp::in(60.0); @@ -142,9 +146,10 @@ class CellDb : public CellDbBase { boc_->set_loader(std::make_unique(std::move(snapshot), on_load_callback_)).ensure(); } void get_cell_db_reader(td::Promise> promise); + void try_catch_up_with_primary(td::Promise promise); - CellDb(td::actor::ActorId root_db, std::string path, td::Ref opts) - : root_db_(root_db), path_(path), opts_(opts) { + CellDb(td::actor::ActorId root_db, std::string path, td::Ref opts, bool secondary = false) + : root_db_(root_db), path_(path), opts_(opts), secondary_(secondary) { } void start_up() override; @@ -159,6 +164,8 @@ class CellDb : public CellDbBase { std::unique_ptr boc_; bool started_ = false; + bool secondary_; + std::function on_load_callback_; }; diff --git a/validator/db/rootdb.cpp b/validator/db/rootdb.cpp index ff9abae68..716ce5219 100644 --- a/validator/db/rootdb.cpp +++ b/validator/db/rootdb.cpp @@ -346,6 +346,21 @@ void RootDb::get_block_by_seqno(AccountIdPrefixFull account, BlockSeqno seqno, t td::actor::send_closure(archive_db_, &ArchiveManager::get_block_by_seqno, account, seqno, std::move(promise)); } +void RootDb::get_max_masterchain_seqno(td::Promise promise) { + td::actor::send_closure(archive_db_, &ArchiveManager::get_max_masterchain_seqno, std::move(promise)); +} + +void RootDb::try_catch_up_with_primary(td::Promise promise) { + CHECK(secondary_); + td::MultiPromise mp; + auto ig = mp.init_guard(); + ig.add_promise(std::move(promise)); + + td::actor::send_closure(archive_db_, &ArchiveManager::try_catch_up_with_primary, ig.get_promise()); + td::actor::send_closure(cell_db_, &CellDb::try_catch_up_with_primary, ig.get_promise()); + td::actor::send_closure(state_db_, &StateDb::try_catch_up_with_primary, ig.get_promise()); +} + void RootDb::update_init_masterchain_block(BlockIdExt block, td::Promise promise) { td::actor::send_closure(state_db_, &StateDb::update_init_masterchain_block, block, std::move(promise)); } @@ -397,10 +412,10 @@ void RootDb::get_hardforks(td::Promise> promise) { } void RootDb::start_up() { - cell_db_ = td::actor::create_actor("celldb", actor_id(this), root_path_ + "/celldb/", opts_); - state_db_ = td::actor::create_actor("statedb", actor_id(this), root_path_ + "/state/"); + cell_db_ = td::actor::create_actor("celldb", actor_id(this), root_path_ + "/celldb/", opts_, secondary_); + state_db_ = td::actor::create_actor("statedb", actor_id(this), root_path_ + "/state/", secondary_); static_files_db_ = td::actor::create_actor("staticfilesdb", actor_id(this), root_path_ + "/static/"); - archive_db_ = td::actor::create_actor("archive", actor_id(this), root_path_, opts_); + archive_db_ = td::actor::create_actor("archive", actor_id(this), root_path_, opts_, secondary_); } void RootDb::archive(BlockHandle handle, td::Promise promise) { @@ -498,6 +513,7 @@ void RootDb::set_async_mode(bool mode, td::Promise promise) { } void RootDb::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) { + CHECK(!secondary_) td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, mc_ts, gc_ts, archive_ttl); } diff --git a/validator/db/rootdb.hpp b/validator/db/rootdb.hpp index 97b9550b8..388cbca21 100644 --- a/validator/db/rootdb.hpp +++ b/validator/db/rootdb.hpp @@ -36,8 +36,8 @@ class RootDb : public Db { public: enum class Flags : td::uint32 { f_started = 1, f_ready = 2, f_switched = 4, f_archived = 8 }; RootDb(td::actor::ActorId validator_manager, std::string root_path, - td::Ref opts) - : validator_manager_(validator_manager), root_path_(std::move(root_path)), opts_(opts) { + td::Ref opts, bool secondary = false) + : validator_manager_(validator_manager), root_path_(std::move(root_path)), opts_(opts), secondary_(secondary) { } void start_up() override; @@ -134,12 +134,17 @@ class RootDb : public Db { td::Promise promise) override; void set_async_mode(bool mode, td::Promise promise) override; + void try_catch_up_with_primary(td::Promise promise); + + void get_max_masterchain_seqno(td::Promise promise); + void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) override; private: td::actor::ActorId validator_manager_; std::string root_path_; td::Ref opts_; + bool secondary_; td::actor::ActorOwn cell_db_; td::actor::ActorOwn state_db_; diff --git a/validator/db/statedb.cpp b/validator/db/statedb.cpp index 5d49ae2bd..7b04f3903 100644 --- a/validator/db/statedb.cpp +++ b/validator/db/statedb.cpp @@ -20,6 +20,7 @@ #include "ton/ton-tl.hpp" #include "adnl/utils.hpp" #include "td/db/RocksDb.h" +#include "td/db/RocksDbSecondary.h" #include "ton/ton-shard.h" namespace ton { @@ -217,11 +218,15 @@ void StateDb::get_hardforks(td::Promise> promise) { promise.set_value(std::move(vec)); } -StateDb::StateDb(td::actor::ActorId root_db, std::string db_path) : root_db_(root_db), db_path_(db_path) { +StateDb::StateDb(td::actor::ActorId root_db, std::string db_path, bool secondary) : root_db_(root_db), db_path_(db_path), secondary_(secondary) { } void StateDb::start_up() { - kv_ = std::make_shared(td::RocksDb::open(db_path_).move_as_ok()); + if (secondary_) { + kv_ = std::make_shared(td::RocksDbSecondary::open(db_path_).move_as_ok()); + } else { + kv_ = std::make_shared(td::RocksDb::open(db_path_).move_as_ok()); + } std::string value; auto R = kv_->get(create_serialize_tl_object(), value); @@ -240,6 +245,18 @@ void StateDb::start_up() { } } +void StateDb::try_catch_up_with_primary(td::Promise promise) { + auto secondary = dynamic_cast(kv_.get()); + if (secondary == nullptr) { + promise.set_error(td::Status::Error("it's not secondary db")); + } + auto R = secondary->try_catch_up_with_primary(); + if (R.is_error()) { + promise.set_error(R.move_as_error()); + } + promise.set_result(td::Unit()); +} + void StateDb::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise) { { auto key = create_hash_tl_object(); diff --git a/validator/db/statedb.hpp b/validator/db/statedb.hpp index 75382d61c..5324a35ae 100644 --- a/validator/db/statedb.hpp +++ b/validator/db/statedb.hpp @@ -53,11 +53,13 @@ class StateDb : public td::actor::Actor { void update_db_version(td::uint32 version, td::Promise promise); void get_db_version(td::Promise promise); - StateDb(td::actor::ActorId root_db, std::string path); + StateDb(td::actor::ActorId root_db, std::string path, bool secondary = false); void start_up() override; void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise); + void try_catch_up_with_primary(td::Promise promise); + private: using KeyType = td::Bits256; @@ -65,6 +67,7 @@ class StateDb : public td::actor::Actor { td::actor::ActorId root_db_; std::string db_path_; + bool secondary_; }; } // namespace validator