Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved queue cleaning: reuse QueueMerger logic, async size counter #829

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions crypto/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ td::uint64 BlockLimitStatus::estimate_block_size(const vm::NewCellStorageStat::S
sum += *extra;
}
return 2000 + (sum.bits >> 3) + sum.cells * 12 + sum.internal_refs * 3 + sum.external_refs * 40 + accounts * 200 +
transactions * 200 + (extra ? 200 : 0) + extra_out_msgs * 300 + extra_library_diff * 700;
transactions * 200 + (extra ? 200 : 0) + extra_out_msgs * 300 + public_library_diff * 700;
}

int BlockLimitStatus::classify() const {
Expand Down Expand Up @@ -1009,16 +1009,16 @@ td::Status ShardState::merge_with(ShardState& sib) {
return td::Status::OK();
}

td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_out_msg_queue(
ton::ShardIdFull subshard) {
td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_out_msg_queue(ton::ShardIdFull subshard,
td::uint32* queue_size) {
auto shard = id_.shard_full();
if (!ton::shard_is_parent(shard, subshard)) {
return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() +
" because it is not a parent");
}
CHECK(out_msg_queue_);
auto subqueue = std::make_unique<vm::AugmentedDictionary>(*out_msg_queue_);
int res = block::filter_out_msg_queue(*subqueue, shard, subshard);
int res = block::filter_out_msg_queue(*subqueue, shard, subshard, queue_size);
if (res < 0) {
return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str());
}
Expand All @@ -1040,7 +1040,7 @@ td::Result<std::shared_ptr<block::MsgProcessedUptoCollection>> ShardState::compu
return std::move(sub_processed_upto);
}

td::Status ShardState::split(ton::ShardIdFull subshard) {
td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size) {
if (!ton::shard_is_parent(id_.shard_full(), subshard)) {
return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() +
" because it is not a parent");
Expand All @@ -1058,7 +1058,7 @@ td::Status ShardState::split(ton::ShardIdFull subshard) {
auto shard1 = id_.shard_full();
CHECK(ton::shard_is_parent(shard1, subshard));
CHECK(out_msg_queue_);
int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard);
int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard, queue_size);
if (res1 < 0) {
return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str());
}
Expand Down Expand Up @@ -1098,8 +1098,12 @@ td::Status ShardState::split(ton::ShardIdFull subshard) {
return td::Status::OK();
}

int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard) {
return out_queue.filter([subshard, old_shard](vm::CellSlice& cs, td::ConstBitPtr key, int key_len) -> int {
int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard,
td::uint32* queue_size) {
if (queue_size) {
*queue_size = 0;
}
return out_queue.filter([=](vm::CellSlice& cs, td::ConstBitPtr key, int key_len) -> int {
CHECK(key_len == 352);
LOG(DEBUG) << "scanning OutMsgQueue entry with key " << key.to_hex(key_len);
block::tlb::MsgEnvelope::Record_std env;
Expand All @@ -1122,7 +1126,11 @@ int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull ol
<< " does not contain current address belonging to shard " << old_shard.to_str();
return -1;
}
return ton::shard_contains(subshard, cur_prefix);
bool res = ton::shard_contains(subshard, cur_prefix);
if (res && queue_size) {
++*queue_size;
}
return res;
});
}

Expand Down
12 changes: 7 additions & 5 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ struct BlockLimitStatus {
td::uint64 gas_used{};
vm::NewCellStorageStat st_stat;
unsigned accounts{}, transactions{}, extra_out_msgs{};
unsigned extra_library_diff{}; // Number of public libraries in deleted/frozen accounts
unsigned public_library_diff{};
BlockLimitStatus(const BlockLimits& limits_, ton::LogicalTime lt = 0)
: limits(limits_), cur_lt(std::max(limits_.start_lt, lt)) {
}
Expand All @@ -272,7 +272,7 @@ struct BlockLimitStatus {
transactions = accounts = 0;
gas_used = 0;
extra_out_msgs = 0;
extra_library_diff = 0;
public_library_diff = 0;
}
td::uint64 estimate_block_size(const vm::NewCellStorageStat::Stat* extra = nullptr) const;
int classify() const;
Expand Down Expand Up @@ -433,10 +433,11 @@ struct ShardState {
ton::BlockSeqno prev_mc_block_seqno, bool after_split, bool clear_history,
std::function<bool(ton::BlockSeqno)> for_each_mcseqno);
td::Status merge_with(ShardState& sib);
td::Result<std::unique_ptr<vm::AugmentedDictionary>> compute_split_out_msg_queue(ton::ShardIdFull subshard);
td::Result<std::unique_ptr<vm::AugmentedDictionary>> compute_split_out_msg_queue(ton::ShardIdFull subshard,
td::uint32* queue_size = nullptr);
td::Result<std::shared_ptr<block::MsgProcessedUptoCollection>> compute_split_processed_upto(
ton::ShardIdFull subshard);
td::Status split(ton::ShardIdFull subshard);
td::Status split(ton::ShardIdFull subshard, td::uint32* queue_size = nullptr);
td::Status unpack_out_msg_queue_info(Ref<vm::Cell> out_msg_queue_info);
bool clear_load_history() {
overload_history_ = underload_history_ = 0;
Expand Down Expand Up @@ -656,7 +657,8 @@ class MtCarloComputeShare {
void gen_vset();
};

int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard);
int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard,
td::uint32* queue_size = nullptr);

std::ostream& operator<<(std::ostream& os, const ShardId& shard_id);

Expand Down
20 changes: 14 additions & 6 deletions crypto/block/output-queue-merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,30 @@ bool OutputQueueMerger::add_root(int src, Ref<vm::Cell> outmsg_root) {
return true;
}

OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors)
OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<Neighbor> _neighbors)
: queue_for(_queue_for), neighbors(std::move(_neighbors)), eof(false), failed(false) {
init();
}

OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors)
: queue_for(_queue_for), eof(false), failed(false) {
for (auto& nb : _neighbors) {
neighbors.emplace_back(nb.top_block_id(), nb.outmsg_root, nb.is_disabled());
}
init();
}

void OutputQueueMerger::init() {
common_pfx.bits().store_int(queue_for.workchain, 32);
int l = queue_for.pfx_len();
td::bitstring::bits_store_long_top(common_pfx.bits() + 32, queue_for.shard, l);
common_pfx_len = 32 + l;
int i = 0;
for (block::McShardDescr& neighbor : neighbors) {
if (!neighbor.is_disabled()) {
LOG(DEBUG) << "adding " << (neighbor.outmsg_root.is_null() ? "" : "non-") << "empty output queue for neighbor #"
<< i << " (" << neighbor.blk_.to_str() << ")";
add_root(i++, neighbor.outmsg_root);
for (Neighbor& neighbor : neighbors) {
if (!neighbor.disabled_) {
LOG(DEBUG) << "adding " << (neighbor.outmsg_root_.is_null() ? "" : "non-") << "empty output queue for neighbor #"
<< i << " (" << neighbor.block_id_.to_str() << ")";
add_root(i++, neighbor.outmsg_root_);
} else {
LOG(DEBUG) << "skipping output queue for disabled neighbor #" << i;
i++;
Expand Down
12 changes: 11 additions & 1 deletion crypto/block/output-queue-merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,22 @@ struct OutputQueueMerger {
bool unpack_node(td::ConstBitPtr key_pfx, int key_pfx_len, Ref<vm::Cell> node);
bool split(MsgKeyValue& second);
};
struct Neighbor {
ton::BlockIdExt block_id_;
td::Ref<vm::Cell> outmsg_root_;
bool disabled_;
Neighbor() = default;
Neighbor(ton::BlockIdExt block_id, td::Ref<vm::Cell> outmsg_root, bool disabled = false)
: block_id_(block_id), outmsg_root_(std::move(outmsg_root)), disabled_(disabled) {
}
};
//
ton::ShardIdFull queue_for;
std::vector<std::unique_ptr<MsgKeyValue>> msg_list;
std::vector<block::McShardDescr> neighbors;
std::vector<Neighbor> neighbors;

public:
OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<Neighbor> _neighbors);
OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors);
bool is_eof() const {
return eof;
Expand Down
37 changes: 33 additions & 4 deletions crypto/block/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2536,6 +2536,31 @@ static td::uint32 get_public_libraries_count(const td::Ref<vm::Cell>& libraries)
return count;
}

/**
* Calculates the number of changes of public libraries in the dictionary.
*
* @param old_libraries The dictionary of account libraries before the transaction.
* @param new_libraries The dictionary of account libraries after the transaction.
*
* @returns The number of changed public libraries.
*/
static td::uint32 get_public_libraries_diff_count(const td::Ref<vm::Cell>& old_libraries,
const td::Ref<vm::Cell>& new_libraries) {
td::uint32 count = 0;
vm::Dictionary dict1{old_libraries, 256};
vm::Dictionary dict2{new_libraries, 256};
dict1.scan_diff(dict2, [&](td::ConstBitPtr key, int n, Ref<vm::CellSlice> val1, Ref<vm::CellSlice> val2) -> bool {
CHECK(n == 256);
bool is_public1 = val1.not_null() && block::is_public_library(key, val1);
bool is_public2 = val2.not_null() && block::is_public_library(key, val2);
if (is_public1 != is_public2) {
++count;
}
return true;
});
return count;
}

/**
* Checks that the new account state fits in the limits.
* This function is not called for special accounts.
Expand Down Expand Up @@ -2979,14 +3004,14 @@ bool Transaction::serialize() {
vm::load_cell_slice(root).print_rec(std::cerr);
}

if (!block::gen::t_Transaction.validate_ref(root)) {
if (!block::gen::t_Transaction.validate_ref(4096, root)) {
LOG(ERROR) << "newly-generated transaction failed to pass automated validation:";
vm::load_cell_slice(root).print_rec(std::cerr);
block::gen::t_Transaction.print_ref(std::cerr, root);
root.clear();
return false;
}
if (!block::tlb::t_Transaction.validate_ref(root)) {
if (!block::tlb::t_Transaction.validate_ref(4096, root)) {
LOG(ERROR) << "newly-generated transaction failed to pass hand-written validation:";
vm::load_cell_slice(root).print_rec(std::cerr);
block::gen::t_Transaction.print_ref(std::cerr, root);
Expand Down Expand Up @@ -3187,8 +3212,12 @@ bool Transaction::update_limits(block::BlockLimitStatus& blimst, bool with_size)
blimst.add_account(is_first))) {
return false;
}
if (account.is_masterchain() && (was_frozen || was_deleted)) {
blimst.extra_library_diff += get_public_libraries_count(account.orig_library);
if (account.is_masterchain()) {
if (was_frozen || was_deleted) {
blimst.public_library_diff += get_public_libraries_count(account.orig_library);
} else {
blimst.public_library_diff += get_public_libraries_diff_count(account.orig_library, new_library);
}
}
}
return true;
Expand Down
13 changes: 13 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3397,6 +3397,19 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getShardO
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "no such block")));
return;
}
if (!dest) {
td::actor::send_closure(
manager, &ton::validator::ValidatorManagerInterface::get_out_msg_queue_size, handle->id(),
[promise = std::move(promise)](td::Result<td::uint32> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error_prefix("failed to get queue size: ")));
} else {
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_shardOutQueueSize>(
R.move_as_ok()));
}
});
return;
}
td::actor::send_closure(
manager, &ton::validator::ValidatorManagerInterface::get_shard_state_from_db, handle,
[=, promise = std::move(promise)](td::Result<td::Ref<ton::validator::ShardState>> R) mutable {
Expand Down
8 changes: 6 additions & 2 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ set(VALIDATOR_HEADERS
invariants.hpp

import-db-slice.hpp
queue-size-counter.hpp

manager-disk.h
manager-disk.hpp
Expand All @@ -77,6 +78,7 @@ set(VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp

downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp
Expand All @@ -98,7 +100,8 @@ set(DISK_VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp

queue-size-counter.cpp

downloaders/wait-block-data-disk.cpp
downloaders/wait-block-state.cpp
downloaders/wait-block-state-merge.cpp
Expand All @@ -117,7 +120,8 @@ set(HARDFORK_VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp

queue-size-counter.cpp

downloaders/wait-block-data-disk.cpp
downloaders/wait-block-state.cpp
downloaders/wait-block-state-merge.cpp
Expand Down
5 changes: 3 additions & 2 deletions validator/impl/collator-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class Collator final : public td::actor::Actor {
bool preinit_complete{false};
bool is_key_block_{false};
bool block_full_{false};
bool outq_cleanup_partial_{false};
bool inbound_queues_empty_{false};
bool libraries_changed_{false};
bool prev_key_block_exists_{false};
Expand Down Expand Up @@ -159,7 +158,6 @@ class Collator final : public td::actor::Actor {
bool report_version_{false};
bool skip_topmsgdescr_{false};
bool skip_extmsg_{false};
bool queue_too_big_{false};
bool short_dequeue_records_{false};
td::uint64 overload_history_{0}, underload_history_{0};
td::uint64 block_size_estimate_{};
Expand Down Expand Up @@ -189,6 +187,7 @@ class Collator final : public td::actor::Actor {
std::priority_queue<NewOutMsg, std::vector<NewOutMsg>, std::greater<NewOutMsg>> new_msgs;
std::pair<ton::LogicalTime, ton::Bits256> last_proc_int_msg_, first_unproc_int_msg_;
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_;
td::uint32 out_msg_queue_size_ = 0;
std::unique_ptr<vm::Dictionary> ihr_pending;
std::shared_ptr<block::MsgProcessedUptoCollection> processed_upto_, sibling_processed_upto_;
std::unique_ptr<vm::Dictionary> block_create_stats_;
Expand Down Expand Up @@ -227,6 +226,7 @@ class Collator final : public td::actor::Actor {
bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner);
bool fix_processed_upto(block::MsgProcessedUptoCollection& upto);
void got_neighbor_out_queue(int i, td::Result<Ref<MessageQueue>> res);
void got_out_queue_size(size_t i, td::Result<td::uint32> res);
bool adjust_shard_config();
bool store_shard_fees(ShardIdFull shard, const block::CurrencyCollection& fees,
const block::CurrencyCollection& created);
Expand Down Expand Up @@ -260,6 +260,7 @@ class Collator final : public td::actor::Actor {
bool check_prev_block_exact(const BlockIdExt& listed, const BlockIdExt& prev);
bool check_this_shard_mc_info();
bool request_neighbor_msg_queues();
bool request_out_msg_queue_size();
void update_max_lt(ton::LogicalTime lt);
bool is_masterchain() const {
return shard_.is_masterchain();
Expand Down
Loading