Skip to content

Commit

Permalink
Improve CellDb migration (#835)
Browse files Browse the repository at this point in the history
* Fix deserializing cells
* Use proxy actor
* Add delays
* Print stats every minute

Co-authored-by: SpyCheese <[email protected]>
  • Loading branch information
EmelyanenkoK and SpyCheese authored Dec 19, 2023
1 parent ace934f commit 83efceb
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 25 deletions.
4 changes: 2 additions & 2 deletions crypto/vm/boc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ unsigned long long BagOfCells::get_idx_entry_raw(int index) {
*
*/

td::Result<Ref<Cell>> std_boc_deserialize(td::Slice data, bool can_be_empty) {
td::Result<Ref<Cell>> std_boc_deserialize(td::Slice data, bool can_be_empty, bool allow_nonzero_level) {
if (data.empty() && can_be_empty) {
return Ref<Cell>();
}
Expand All @@ -946,7 +946,7 @@ td::Result<Ref<Cell>> std_boc_deserialize(td::Slice data, bool can_be_empty) {
if (root.is_null()) {
return td::Status::Error("bag of cells has null root cell (?)");
}
if (root->get_level() != 0) {
if (!allow_nonzero_level && root->get_level() != 0) {
return td::Status::Error("bag of cells has a root with non-zero level");
}
return std::move(root);
Expand Down
2 changes: 1 addition & 1 deletion crypto/vm/boc.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class BagOfCells {
std::vector<td::uint8>* cell_should_cache);
};

td::Result<Ref<Cell>> std_boc_deserialize(td::Slice data, bool can_be_empty = false);
td::Result<Ref<Cell>> std_boc_deserialize(td::Slice data, bool can_be_empty = false, bool allow_nonzero_level = false);
td::Result<td::BufferSlice> std_boc_serialize(Ref<Cell> root, int mode = 0);

td::Result<std::vector<Ref<Cell>>> std_boc_deserialize_multi(td::Slice data,
Expand Down
2 changes: 1 addition & 1 deletion crypto/vm/db/CellStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class RefcntCellParser {
auto size = parser.get_left_len();
td::Slice data = parser.template fetch_string_raw<td::Slice>(size);
if (stored_boc_) {
TRY_RESULT(boc, vm::std_boc_deserialize(data));
TRY_RESULT(boc, vm::std_boc_deserialize(data, false, true));
TRY_RESULT(loaded_cell, boc->load_cell());
cell = std::move(loaded_cell.data_cell);
return td::Status::OK();
Expand Down
14 changes: 9 additions & 5 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3775,11 +3775,15 @@ int main(int argc, char *argv[]) {
acts.push_back([&x, at]() { td::actor::send_closure(x, &ValidatorEngine::schedule_shutdown, (double)at); });
return td::Status::OK();
});
p.add_checked_option('\0', "celldb-compress-depth", "(default: 0)", [&](td::Slice arg) {
TRY_RESULT(value, td::to_integer_safe<td::uint32>(arg));
acts.push_back([&x, value]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_compress_depth, value); });
return td::Status::OK();
});
p.add_checked_option('\0', "celldb-compress-depth",
"optimize celldb by storing cells of depth X with whole subtrees (experimental, default: 0)",
[&](td::Slice arg) {
TRY_RESULT(value, td::to_integer_safe<td::uint32>(arg));
acts.push_back([&x, value]() {
td::actor::send_closure(x, &ValidatorEngine::set_celldb_compress_depth, value);
});
return td::Status::OK();
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
Expand Down
61 changes: 45 additions & 16 deletions validator/db/celldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "ton/ton-tl.hpp"
#include "ton/ton-io.hpp"
#include "common/delay.h"

namespace ton {

Expand Down Expand Up @@ -68,14 +69,16 @@ CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb
}

void CellDbIn::start_up() {
on_load_callback_ = [db = actor_id(this),
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<MigrationProxy>>(
td::actor::create_actor<MigrationProxy>("celldbmigration", actor_id(this))),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
if (res.cell_.is_null()) {
return;
}
bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0;
if (expected_stored_boc != res.stored_boc_) {
td::actor::send_closure(db, &CellDbIn::migrate_cell, td::Bits256{res.cell_->get_hash().bits()});
td::actor::send_closure(*actor, &CellDbIn::MigrationProxy::migrate_cell,
td::Bits256{res.cell_->get_hash().bits()});
}
};

Expand Down Expand Up @@ -156,6 +159,13 @@ void CellDbIn::alarm() {
if (migrate_after_ && migrate_after_.is_in_past()) {
migrate_cells();
}
if (migration_stats_ && migration_stats_->end_at_.is_in_past()) {
LOG(INFO) << "CellDb migration, " << migration_stats_->start_.elapsed()
<< "s stats: batches=" << migration_stats_->batches_ << " migrated=" << migration_stats_->migrated_cells_
<< " checked=" << migration_stats_->checked_cells_ << " time=" << migration_stats_->total_time_
<< " queue_size=" << cells_to_migrate_.size();
migration_stats_ = {};
}
auto E = get_block(get_empty_key_hash()).move_as_ok();
auto N = get_block(E.next).move_as_ok();
if (N.is_empty()) {
Expand Down Expand Up @@ -291,23 +301,31 @@ void CellDbIn::set_block(KeyHash key_hash, DbEntry e) {

void CellDbIn::migrate_cell(td::Bits256 hash) {
cells_to_migrate_.insert(hash);
if (cells_to_migrate_.size() >= 32) {
migrate_cells();
} else if (!migrate_after_) {
migrate_after_ = td::Timestamp::in(1.0);
if (!migration_active_) {
migration_active_ = true;
migrate_after_ = td::Timestamp::in(10.0);
}
}

void CellDbIn::migrate_cells() {
migrate_after_ = td::Timestamp::never();
if (cells_to_migrate_.empty()) {
migration_active_ = false;
return;
}
td::Timer timer;
if (!migration_stats_) {
migration_stats_ = std::make_unique<MigrationStats>();
}
vm::CellStorer stor{*cell_db_};
auto loader = std::make_unique<vm::CellLoader>(cell_db_->snapshot());
boc_->set_loader(std::make_unique<vm::CellLoader>(*loader)).ensure();
cell_db_->begin_write_batch().ensure();
td::uint32 cnt = 0;
for (const auto& hash : cells_to_migrate_) {
td::uint32 checked = 0, migrated = 0;
for (auto it = cells_to_migrate_.begin(); it != cells_to_migrate_.end() && checked < 128; ) {
++checked;
td::Bits256 hash = *it;
it = cells_to_migrate_.erase(it);
auto R = loader->load(hash.as_slice(), true, boc_->as_ext_cell_creator());
if (R.is_error()) {
continue;
Expand All @@ -318,18 +336,27 @@ void CellDbIn::migrate_cells() {
bool expected_stored_boc =
R.ok().cell_->get_depth() == opts_->get_celldb_compress_depth() && opts_->get_celldb_compress_depth() != 0;
if (expected_stored_boc != R.ok().stored_boc_) {
++cnt;
++migrated;
stor.set(R.ok().refcnt(), R.ok().cell_, expected_stored_boc).ensure();
}
}
cells_to_migrate_.clear();
if (cnt > 0) {
LOG(DEBUG) << "Migrated " << cnt << " cells";
}
cell_db_->commit_write_batch().ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
migrate_after_ = td::Timestamp::never();

double time = timer.elapsed();
LOG(DEBUG) << "CellDb migration: migrated=" << migrated << " checked=" << checked << " time=" << time;
++migration_stats_->batches_;
migration_stats_->migrated_cells_ += migrated;
migration_stats_->checked_cells_ += checked;
migration_stats_->total_time_ += time;

if (cells_to_migrate_.empty()) {
migration_active_ = false;
} else {
delay_action([SelfId = actor_id(this)] { td::actor::send_closure(SelfId, &CellDbIn::migrate_cells); },
td::Timestamp::in(time * 2));
}
}

void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
Expand Down Expand Up @@ -361,14 +388,16 @@ void CellDb::start_up() {
boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_);
on_load_callback_ = [db = cell_db_.get(),
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<CellDbIn::MigrationProxy>>(
td::actor::create_actor<CellDbIn::MigrationProxy>("celldbmigration", cell_db_.get())),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
if (res.cell_.is_null()) {
return;
}
bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0;
if (expected_stored_boc != res.stored_boc_) {
td::actor::send_closure(db, &CellDbIn::migrate_cell, td::Bits256{res.cell_->get_hash().bits()});
td::actor::send_closure(*actor, &CellDbIn::MigrationProxy::migrate_cell,
td::Bits256{res.cell_->get_hash().bits()});
}
};
}
Expand Down
24 changes: 24 additions & 0 deletions validator/db/celldb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,30 @@ class CellDbIn : public CellDbBase {
std::function<void(const vm::CellLoader::LoadResult&)> on_load_callback_;
std::set<td::Bits256> cells_to_migrate_;
td::Timestamp migrate_after_ = td::Timestamp::never();
bool migration_active_ = false;

struct MigrationStats {
td::Timer start_;
td::Timestamp end_at_ = td::Timestamp::in(60.0);
size_t batches_ = 0;
size_t migrated_cells_ = 0;
size_t checked_cells_ = 0;
double total_time_ = 0.0;
};
std::unique_ptr<MigrationStats> migration_stats_;

public:
class MigrationProxy : public td::actor::Actor {
public:
explicit MigrationProxy(td::actor::ActorId<CellDbIn> cell_db) : cell_db_(cell_db) {
}
void migrate_cell(td::Bits256 hash) {
td::actor::send_closure(cell_db_, &CellDbIn::migrate_cell, hash);
}

private:
td::actor::ActorId<CellDbIn> cell_db_;
};
};

class CellDb : public CellDbBase {
Expand Down

0 comments on commit 83efceb

Please sign in to comment.