Skip to content

Commit

Permalink
Cache recent block states and adjust timeouts (#823)
Browse files Browse the repository at this point in the history
* Add parameter --celldb-compress-depth to speed up celldb

* Fix collator timeout

* Add block_state_cache

* Adjust state cache ttl

* Don't merge shards when queue is too big

* Decrease lt limit if previous block is too old

---------

Co-authored-by: SpyCheese <[email protected]>
  • Loading branch information
EmelyanenkoK and SpyCheese authored Dec 8, 2023
1 parent 7fcf267 commit 9b6d699
Show file tree
Hide file tree
Showing 20 changed files with 230 additions and 37 deletions.
43 changes: 35 additions & 8 deletions crypto/vm/db/CellStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,25 @@ namespace vm {
namespace {
class RefcntCellStorer {
public:
RefcntCellStorer(td::int32 refcnt, const DataCell &cell) : refcnt_(refcnt), cell_(cell) {
RefcntCellStorer(td::int32 refcnt, const td::Ref<DataCell> &cell, bool as_boc)
: refcnt_(refcnt), cell_(cell), as_boc_(as_boc) {
}

template <class StorerT>
void store(StorerT &storer) const {
using td::store;
if (as_boc_) {
td::int32 tag = -1;
store(tag, storer);
store(refcnt_, storer);
td::BufferSlice data = vm::std_boc_serialize(cell_).move_as_ok();
storer.store_slice(data);
return;
}
store(refcnt_, storer);
store(cell_, storer);
for (unsigned i = 0; i < cell_.size_refs(); i++) {
auto cell = cell_.get_ref(i);
store(*cell_, storer);
for (unsigned i = 0; i < cell_->size_refs(); i++) {
auto cell = cell_->get_ref(i);
auto level_mask = cell->get_level_mask();
auto level = level_mask.get_level();
td::uint8 x = static_cast<td::uint8>(level_mask.get_mask());
Expand All @@ -60,7 +69,8 @@ class RefcntCellStorer {

private:
td::int32 refcnt_;
const DataCell &cell_;
td::Ref<DataCell> cell_;
bool as_boc_;
};

class RefcntCellParser {
Expand All @@ -69,18 +79,30 @@ class RefcntCellParser {
}
td::int32 refcnt;
Ref<DataCell> cell;
bool stored_boc_;

template <class ParserT>
void parse(ParserT &parser, ExtCellCreator &ext_cell_creator) {
using ::td::parse;
parse(refcnt, parser);
stored_boc_ = false;
if (refcnt == -1) {
stored_boc_ = true;
parse(refcnt, parser);
}
if (!need_data_) {
return;
}
auto status = [&]() -> td::Status {
TRY_STATUS(parser.get_status());
auto size = parser.get_left_len();
td::Slice data = parser.template fetch_string_raw<td::Slice>(size);
if (stored_boc_) {
TRY_RESULT(boc, vm::std_boc_deserialize(data));
TRY_RESULT(loaded_cell, boc->load_cell());
cell = std::move(loaded_cell.data_cell);
return td::Status::OK();
}
CellSerializationInfo info;
auto cell_data = data;
TRY_STATUS(info.init(cell_data, 0 /*ref_byte_size*/));
Expand Down Expand Up @@ -122,7 +144,8 @@ class RefcntCellParser {
};
} // namespace

CellLoader::CellLoader(std::shared_ptr<KeyValueReader> reader) : reader_(std::move(reader)) {
CellLoader::CellLoader(std::shared_ptr<KeyValueReader> reader, std::function<void(const LoadResult &)> on_load_callback)
: reader_(std::move(reader)), on_load_callback_(std::move(on_load_callback)) {
CHECK(reader_);
}

Expand All @@ -145,7 +168,11 @@ td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, bool need_da

res.refcnt_ = refcnt_cell.refcnt;
res.cell_ = std::move(refcnt_cell.cell);
res.stored_boc_ = refcnt_cell.stored_boc_;
//CHECK(res.cell_->get_hash() == hash);
if (on_load_callback_) {
on_load_callback_(res);
}

return res;
}
Expand All @@ -157,7 +184,7 @@ td::Status CellStorer::erase(td::Slice hash) {
return kv_.erase(hash);
}

td::Status CellStorer::set(td::int32 refcnt, const DataCell &cell) {
return kv_.set(cell.get_hash().as_slice(), td::serialize(RefcntCellStorer(refcnt, cell)));
td::Status CellStorer::set(td::int32 refcnt, const td::Ref<DataCell> &cell, bool as_boc) {
return kv_.set(cell->get_hash().as_slice(), td::serialize(RefcntCellStorer(refcnt, cell, as_boc)));
}
} // namespace vm
6 changes: 4 additions & 2 deletions crypto/vm/db/CellStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,21 @@ class CellLoader {

Ref<DataCell> cell_;
td::int32 refcnt_{0};
bool stored_boc_{false};
};
CellLoader(std::shared_ptr<KeyValueReader> reader);
CellLoader(std::shared_ptr<KeyValueReader> reader, std::function<void(const LoadResult &)> on_load_callback = {});
td::Result<LoadResult> load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator);

private:
std::shared_ptr<KeyValueReader> reader_;
std::function<void(const LoadResult &)> on_load_callback_;
};

class CellStorer {
public:
CellStorer(KeyValue &kv);
td::Status erase(td::Slice hash);
td::Status set(td::int32 refcnt, const DataCell &cell);
td::Status set(td::int32 refcnt, const td::Ref<DataCell> &cell, bool as_boc);

private:
KeyValue &kv_;
Expand Down
12 changes: 11 additions & 1 deletion crypto/vm/db/DynamicBagOfCellsDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,22 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
return td::Status::OK();
}

void set_celldb_compress_depth(td::uint32 value) override {
celldb_compress_depth_ = value;
}

vm::ExtCellCreator& as_ext_cell_creator() override {
return *this;
}

private:
std::unique_ptr<CellLoader> loader_;
std::vector<Ref<Cell>> to_inc_;
std::vector<Ref<Cell>> to_dec_;
CellHashTable<CellInfo> hash_table_;
std::vector<CellInfo *> visited_;
Stats stats_diff_;
td::uint32 celldb_compress_depth_{0};

static td::NamedThreadSafeCounter::CounterRef get_thread_safe_counter() {
static auto res = td::NamedThreadSafeCounter::get_default().get_counter("DynamicBagOfCellsDb");
Expand Down Expand Up @@ -443,7 +452,8 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
guard.dismiss();
} else {
auto loaded_cell = info.cell->load_cell().move_as_ok();
storer.set(info.db_refcnt, *loaded_cell.data_cell);
storer.set(info.db_refcnt, loaded_cell.data_cell,
loaded_cell.data_cell->get_depth() == celldb_compress_depth_ && celldb_compress_depth_ != 0);
info.in_db = true;
}
}
Expand Down
3 changes: 3 additions & 0 deletions crypto/vm/db/DynamicBagOfCellsDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class DynamicBagOfCellsDb {
// restart with new loader will also reset stats_diff
virtual td::Status set_loader(std::unique_ptr<CellLoader> loader) = 0;

virtual void set_celldb_compress_depth(td::uint32 value) = 0;
virtual vm::ExtCellCreator& as_ext_cell_creator() = 0;

static std::unique_ptr<DynamicBagOfCellsDb> create();

class AsyncExecutor {
Expand Down
6 changes: 6 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,7 @@ td::Status ValidatorEngine::load_global_config() {
if (!session_logs_file_.empty()) {
validator_options_.write().set_session_logs_file(session_logs_file_);
}
validator_options_.write().set_celldb_compress_depth(celldb_compress_depth_);

std::vector<ton::BlockIdExt> h;
for (auto &x : conf.validator_->hardforks_) {
Expand Down Expand Up @@ -3761,6 +3762,11 @@ int main(int argc, char *argv[]) {
acts.push_back([&x, at]() { td::actor::send_closure(x, &ValidatorEngine::schedule_shutdown, (double)at); });
return td::Status::OK();
});
p.add_checked_option('\0', "celldb-compress-depth", "(default: 0)", [&](td::Slice arg) {
TRY_RESULT(value, td::to_integer_safe<td::uint32>(arg));
acts.push_back([&x, value]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_compress_depth, value); });
return td::Status::OK();
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
Expand Down
4 changes: 4 additions & 0 deletions validator-engine/validator-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class ValidatorEngine : public td::actor::Actor {
double sync_ttl_ = 0;
double archive_ttl_ = 0;
double key_proof_ttl_ = 0;
td::uint32 celldb_compress_depth_ = 0;
bool read_config_ = false;
bool started_keyring_ = false;
bool started_ = false;
Expand Down Expand Up @@ -257,6 +258,9 @@ class ValidatorEngine : public td::actor::Actor {
keys_[key.compute_short_id()] = key;
}
void schedule_shutdown(double at);
void set_celldb_compress_depth(td::uint32 value) {
celldb_compress_depth_ = value;
}
void start_up() override;
ValidatorEngine() {
}
Expand Down
82 changes: 76 additions & 6 deletions validator/db/celldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,29 @@ void CellDbBase::execute_sync(std::function<void()> f) {
f();
}

CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path)
: root_db_(root_db), parent_(parent), path_(std::move(path)) {
CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path,
td::Ref<ValidatorManagerOptions> opts)
: root_db_(root_db), parent_(parent), path_(std::move(path)), opts_(opts) {
}

void CellDbIn::start_up() {
on_load_callback_ = [db = actor_id(this),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
if (res.cell_.is_null()) {
return;
}
bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0;
if (expected_stored_boc != res.stored_boc_) {
td::actor::send_closure(db, &CellDbIn::migrate_cell, td::Bits256{res.cell_->get_hash().bits()});
}
};

CellDbBase::start_up();
cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_).move_as_ok());

boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot())).ensure();
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());

alarm_timestamp() = td::Timestamp::in(10.0);
Expand Down Expand Up @@ -129,7 +142,7 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promi
set_block(key_hash, std::move(D));
cell_db_->commit_write_batch().ensure();

boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot())).ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());

promise.set_result(boc_->load_cell(cell->get_hash().as_slice()));
Expand All @@ -140,6 +153,9 @@ void CellDbIn::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>>
}

void CellDbIn::alarm() {
if (migrate_after_ && migrate_after_.is_in_past()) {
migrate_cells();
}
auto E = get_block(get_empty_key_hash()).move_as_ok();
auto N = get_block(E.next).move_as_ok();
if (N.is_empty()) {
Expand Down Expand Up @@ -220,7 +236,7 @@ void CellDbIn::gc_cont2(BlockHandle handle) {
cell_db_->commit_write_batch().ensure();
alarm_timestamp() = td::Timestamp::now();

boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot())).ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());

DCHECK(get_block(key_hash).is_error());
Expand Down Expand Up @@ -273,6 +289,49 @@ void CellDbIn::set_block(KeyHash key_hash, DbEntry e) {
cell_db_->set(td::as_slice(key), e.release()).ensure();
}

void CellDbIn::migrate_cell(td::Bits256 hash) {
cells_to_migrate_.insert(hash);
if (cells_to_migrate_.size() >= 32) {
migrate_cells();
} else if (!migrate_after_) {
migrate_after_ = td::Timestamp::in(1.0);
}
}

void CellDbIn::migrate_cells() {
if (cells_to_migrate_.empty()) {
return;
}
vm::CellStorer stor{*cell_db_};
auto loader = std::make_unique<vm::CellLoader>(cell_db_->snapshot());
boc_->set_loader(std::make_unique<vm::CellLoader>(*loader)).ensure();
cell_db_->begin_write_batch().ensure();
td::uint32 cnt = 0;
for (const auto& hash : cells_to_migrate_) {
auto R = loader->load(hash.as_slice(), true, boc_->as_ext_cell_creator());
if (R.is_error()) {
continue;
}
if (R.ok().status == vm::CellLoader::LoadResult::NotFound) {
continue;
}
bool expected_stored_boc =
R.ok().cell_->get_depth() == opts_->get_celldb_compress_depth() && opts_->get_celldb_compress_depth() != 0;
if (expected_stored_boc != R.ok().stored_boc_) {
++cnt;
stor.set(R.ok().refcnt(), R.ok().cell_, expected_stored_boc).ensure();
}
}
cells_to_migrate_.clear();
if (cnt > 0) {
LOG(DEBUG) << "Migrated " << cnt << " cells";
}
cell_db_->commit_write_batch().ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
migrate_after_ = td::Timestamp::never();
}

void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
if (!started_) {
td::actor::send_closure(cell_db_, &CellDbIn::load_cell, hash, std::move(promise));
Expand Down Expand Up @@ -300,7 +359,18 @@ void CellDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> p
void CellDb::start_up() {
CellDbBase::start_up();
boc_ = vm::DynamicBagOfCellsDb::create();
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_);
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_);
on_load_callback_ = [db = cell_db_.get(),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
if (res.cell_.is_null()) {
return;
}
bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0;
if (expected_stored_boc != res.stored_boc_) {
td::actor::send_closure(db, &CellDbIn::migrate_cell, td::Bits256{res.cell_->get_hash().bits()});
}
};
}

CellDbIn::DbEntry::DbEntry(tl_object_ptr<ton_api::db_celldb_value> entry)
Expand Down
Loading

0 comments on commit 9b6d699

Please sign in to comment.