Skip to content

Commit

Permalink
Merge pull request #829 from ton-blockchain/testnet
Browse files Browse the repository at this point in the history
Improved queue cleaning: reuse QueueMerger logic, async size counter
  • Loading branch information
EmelyanenkoK authored Dec 14, 2023
2 parents 9b6d699 + 1fc4a0f commit 6897b56
Show file tree
Hide file tree
Showing 16 changed files with 733 additions and 115 deletions.
26 changes: 17 additions & 9 deletions crypto/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ td::uint64 BlockLimitStatus::estimate_block_size(const vm::NewCellStorageStat::S
sum += *extra;
}
return 2000 + (sum.bits >> 3) + sum.cells * 12 + sum.internal_refs * 3 + sum.external_refs * 40 + accounts * 200 +
transactions * 200 + (extra ? 200 : 0) + extra_out_msgs * 300 + extra_library_diff * 700;
transactions * 200 + (extra ? 200 : 0) + extra_out_msgs * 300 + public_library_diff * 700;
}

int BlockLimitStatus::classify() const {
Expand Down Expand Up @@ -1009,16 +1009,16 @@ td::Status ShardState::merge_with(ShardState& sib) {
return td::Status::OK();
}

td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_out_msg_queue(
ton::ShardIdFull subshard) {
td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_out_msg_queue(ton::ShardIdFull subshard,
td::uint32* queue_size) {
auto shard = id_.shard_full();
if (!ton::shard_is_parent(shard, subshard)) {
return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() +
" because it is not a parent");
}
CHECK(out_msg_queue_);
auto subqueue = std::make_unique<vm::AugmentedDictionary>(*out_msg_queue_);
int res = block::filter_out_msg_queue(*subqueue, shard, subshard);
int res = block::filter_out_msg_queue(*subqueue, shard, subshard, queue_size);
if (res < 0) {
return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str());
}
Expand All @@ -1040,7 +1040,7 @@ td::Result<std::shared_ptr<block::MsgProcessedUptoCollection>> ShardState::compu
return std::move(sub_processed_upto);
}

td::Status ShardState::split(ton::ShardIdFull subshard) {
td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size) {
if (!ton::shard_is_parent(id_.shard_full(), subshard)) {
return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() +
" because it is not a parent");
Expand All @@ -1058,7 +1058,7 @@ td::Status ShardState::split(ton::ShardIdFull subshard) {
auto shard1 = id_.shard_full();
CHECK(ton::shard_is_parent(shard1, subshard));
CHECK(out_msg_queue_);
int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard);
int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard, queue_size);
if (res1 < 0) {
return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str());
}
Expand Down Expand Up @@ -1098,8 +1098,12 @@ td::Status ShardState::split(ton::ShardIdFull subshard) {
return td::Status::OK();
}

int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard) {
return out_queue.filter([subshard, old_shard](vm::CellSlice& cs, td::ConstBitPtr key, int key_len) -> int {
int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard,
td::uint32* queue_size) {
if (queue_size) {
*queue_size = 0;
}
return out_queue.filter([=](vm::CellSlice& cs, td::ConstBitPtr key, int key_len) -> int {
CHECK(key_len == 352);
LOG(DEBUG) << "scanning OutMsgQueue entry with key " << key.to_hex(key_len);
block::tlb::MsgEnvelope::Record_std env;
Expand All @@ -1122,7 +1126,11 @@ int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull ol
<< " does not contain current address belonging to shard " << old_shard.to_str();
return -1;
}
return ton::shard_contains(subshard, cur_prefix);
bool res = ton::shard_contains(subshard, cur_prefix);
if (res && queue_size) {
++*queue_size;
}
return res;
});
}

Expand Down
12 changes: 7 additions & 5 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ struct BlockLimitStatus {
td::uint64 gas_used{};
vm::NewCellStorageStat st_stat;
unsigned accounts{}, transactions{}, extra_out_msgs{};
unsigned extra_library_diff{}; // Number of public libraries in deleted/frozen accounts
unsigned public_library_diff{};
BlockLimitStatus(const BlockLimits& limits_, ton::LogicalTime lt = 0)
: limits(limits_), cur_lt(std::max(limits_.start_lt, lt)) {
}
Expand All @@ -272,7 +272,7 @@ struct BlockLimitStatus {
transactions = accounts = 0;
gas_used = 0;
extra_out_msgs = 0;
extra_library_diff = 0;
public_library_diff = 0;
}
td::uint64 estimate_block_size(const vm::NewCellStorageStat::Stat* extra = nullptr) const;
int classify() const;
Expand Down Expand Up @@ -433,10 +433,11 @@ struct ShardState {
ton::BlockSeqno prev_mc_block_seqno, bool after_split, bool clear_history,
std::function<bool(ton::BlockSeqno)> for_each_mcseqno);
td::Status merge_with(ShardState& sib);
td::Result<std::unique_ptr<vm::AugmentedDictionary>> compute_split_out_msg_queue(ton::ShardIdFull subshard);
td::Result<std::unique_ptr<vm::AugmentedDictionary>> compute_split_out_msg_queue(ton::ShardIdFull subshard,
td::uint32* queue_size = nullptr);
td::Result<std::shared_ptr<block::MsgProcessedUptoCollection>> compute_split_processed_upto(
ton::ShardIdFull subshard);
td::Status split(ton::ShardIdFull subshard);
td::Status split(ton::ShardIdFull subshard, td::uint32* queue_size = nullptr);
td::Status unpack_out_msg_queue_info(Ref<vm::Cell> out_msg_queue_info);
bool clear_load_history() {
overload_history_ = underload_history_ = 0;
Expand Down Expand Up @@ -656,7 +657,8 @@ class MtCarloComputeShare {
void gen_vset();
};

int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard);
int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard,
td::uint32* queue_size = nullptr);

std::ostream& operator<<(std::ostream& os, const ShardId& shard_id);

Expand Down
20 changes: 14 additions & 6 deletions crypto/block/output-queue-merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,30 @@ bool OutputQueueMerger::add_root(int src, Ref<vm::Cell> outmsg_root) {
return true;
}

OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors)
OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<Neighbor> _neighbors)
: queue_for(_queue_for), neighbors(std::move(_neighbors)), eof(false), failed(false) {
init();
}

OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors)
: queue_for(_queue_for), eof(false), failed(false) {
for (auto& nb : _neighbors) {
neighbors.emplace_back(nb.top_block_id(), nb.outmsg_root, nb.is_disabled());
}
init();
}

void OutputQueueMerger::init() {
common_pfx.bits().store_int(queue_for.workchain, 32);
int l = queue_for.pfx_len();
td::bitstring::bits_store_long_top(common_pfx.bits() + 32, queue_for.shard, l);
common_pfx_len = 32 + l;
int i = 0;
for (block::McShardDescr& neighbor : neighbors) {
if (!neighbor.is_disabled()) {
LOG(DEBUG) << "adding " << (neighbor.outmsg_root.is_null() ? "" : "non-") << "empty output queue for neighbor #"
<< i << " (" << neighbor.blk_.to_str() << ")";
add_root(i++, neighbor.outmsg_root);
for (Neighbor& neighbor : neighbors) {
if (!neighbor.disabled_) {
LOG(DEBUG) << "adding " << (neighbor.outmsg_root_.is_null() ? "" : "non-") << "empty output queue for neighbor #"
<< i << " (" << neighbor.block_id_.to_str() << ")";
add_root(i++, neighbor.outmsg_root_);
} else {
LOG(DEBUG) << "skipping output queue for disabled neighbor #" << i;
i++;
Expand Down
12 changes: 11 additions & 1 deletion crypto/block/output-queue-merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,22 @@ struct OutputQueueMerger {
bool unpack_node(td::ConstBitPtr key_pfx, int key_pfx_len, Ref<vm::Cell> node);
bool split(MsgKeyValue& second);
};
struct Neighbor {
ton::BlockIdExt block_id_;
td::Ref<vm::Cell> outmsg_root_;
bool disabled_;
Neighbor() = default;
Neighbor(ton::BlockIdExt block_id, td::Ref<vm::Cell> outmsg_root, bool disabled = false)
: block_id_(block_id), outmsg_root_(std::move(outmsg_root)), disabled_(disabled) {
}
};
//
ton::ShardIdFull queue_for;
std::vector<std::unique_ptr<MsgKeyValue>> msg_list;
std::vector<block::McShardDescr> neighbors;
std::vector<Neighbor> neighbors;

public:
OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<Neighbor> _neighbors);
OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors);
bool is_eof() const {
return eof;
Expand Down
37 changes: 33 additions & 4 deletions crypto/block/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2536,6 +2536,31 @@ static td::uint32 get_public_libraries_count(const td::Ref<vm::Cell>& libraries)
return count;
}

/**
* Calculates the number of changes of public libraries in the dictionary.
*
* @param old_libraries The dictionary of account libraries before the transaction.
* @param new_libraries The dictionary of account libraries after the transaction.
*
* @returns The number of changed public libraries.
*/
static td::uint32 get_public_libraries_diff_count(const td::Ref<vm::Cell>& old_libraries,
const td::Ref<vm::Cell>& new_libraries) {
td::uint32 count = 0;
vm::Dictionary dict1{old_libraries, 256};
vm::Dictionary dict2{new_libraries, 256};
dict1.scan_diff(dict2, [&](td::ConstBitPtr key, int n, Ref<vm::CellSlice> val1, Ref<vm::CellSlice> val2) -> bool {
CHECK(n == 256);
bool is_public1 = val1.not_null() && block::is_public_library(key, val1);
bool is_public2 = val2.not_null() && block::is_public_library(key, val2);
if (is_public1 != is_public2) {
++count;
}
return true;
});
return count;
}

/**
* Checks that the new account state fits in the limits.
* This function is not called for special accounts.
Expand Down Expand Up @@ -2979,14 +3004,14 @@ bool Transaction::serialize() {
vm::load_cell_slice(root).print_rec(std::cerr);
}

if (!block::gen::t_Transaction.validate_ref(root)) {
if (!block::gen::t_Transaction.validate_ref(4096, root)) {
LOG(ERROR) << "newly-generated transaction failed to pass automated validation:";
vm::load_cell_slice(root).print_rec(std::cerr);
block::gen::t_Transaction.print_ref(std::cerr, root);
root.clear();
return false;
}
if (!block::tlb::t_Transaction.validate_ref(root)) {
if (!block::tlb::t_Transaction.validate_ref(4096, root)) {
LOG(ERROR) << "newly-generated transaction failed to pass hand-written validation:";
vm::load_cell_slice(root).print_rec(std::cerr);
block::gen::t_Transaction.print_ref(std::cerr, root);
Expand Down Expand Up @@ -3187,8 +3212,12 @@ bool Transaction::update_limits(block::BlockLimitStatus& blimst, bool with_size)
blimst.add_account(is_first))) {
return false;
}
if (account.is_masterchain() && (was_frozen || was_deleted)) {
blimst.extra_library_diff += get_public_libraries_count(account.orig_library);
if (account.is_masterchain()) {
if (was_frozen || was_deleted) {
blimst.public_library_diff += get_public_libraries_count(account.orig_library);
} else {
blimst.public_library_diff += get_public_libraries_diff_count(account.orig_library, new_library);
}
}
}
return true;
Expand Down
13 changes: 13 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3397,6 +3397,19 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getShardO
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "no such block")));
return;
}
if (!dest) {
td::actor::send_closure(
manager, &ton::validator::ValidatorManagerInterface::get_out_msg_queue_size, handle->id(),
[promise = std::move(promise)](td::Result<td::uint32> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error_prefix("failed to get queue size: ")));
} else {
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_shardOutQueueSize>(
R.move_as_ok()));
}
});
return;
}
td::actor::send_closure(
manager, &ton::validator::ValidatorManagerInterface::get_shard_state_from_db, handle,
[=, promise = std::move(promise)](td::Result<td::Ref<ton::validator::ShardState>> R) mutable {
Expand Down
8 changes: 6 additions & 2 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ set(VALIDATOR_HEADERS
invariants.hpp

import-db-slice.hpp
queue-size-counter.hpp

manager-disk.h
manager-disk.hpp
Expand All @@ -77,6 +78,7 @@ set(VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp

downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp
Expand All @@ -98,7 +100,8 @@ set(DISK_VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp

queue-size-counter.cpp

downloaders/wait-block-data-disk.cpp
downloaders/wait-block-state.cpp
downloaders/wait-block-state-merge.cpp
Expand All @@ -117,7 +120,8 @@ set(HARDFORK_VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp

queue-size-counter.cpp

downloaders/wait-block-data-disk.cpp
downloaders/wait-block-state.cpp
downloaders/wait-block-state-merge.cpp
Expand Down
5 changes: 3 additions & 2 deletions validator/impl/collator-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class Collator final : public td::actor::Actor {
bool preinit_complete{false};
bool is_key_block_{false};
bool block_full_{false};
bool outq_cleanup_partial_{false};
bool inbound_queues_empty_{false};
bool libraries_changed_{false};
bool prev_key_block_exists_{false};
Expand Down Expand Up @@ -159,7 +158,6 @@ class Collator final : public td::actor::Actor {
bool report_version_{false};
bool skip_topmsgdescr_{false};
bool skip_extmsg_{false};
bool queue_too_big_{false};
bool short_dequeue_records_{false};
td::uint64 overload_history_{0}, underload_history_{0};
td::uint64 block_size_estimate_{};
Expand Down Expand Up @@ -189,6 +187,7 @@ class Collator final : public td::actor::Actor {
std::priority_queue<NewOutMsg, std::vector<NewOutMsg>, std::greater<NewOutMsg>> new_msgs;
std::pair<ton::LogicalTime, ton::Bits256> last_proc_int_msg_, first_unproc_int_msg_;
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_;
td::uint32 out_msg_queue_size_ = 0;
std::unique_ptr<vm::Dictionary> ihr_pending;
std::shared_ptr<block::MsgProcessedUptoCollection> processed_upto_, sibling_processed_upto_;
std::unique_ptr<vm::Dictionary> block_create_stats_;
Expand Down Expand Up @@ -227,6 +226,7 @@ class Collator final : public td::actor::Actor {
bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner);
bool fix_processed_upto(block::MsgProcessedUptoCollection& upto);
void got_neighbor_out_queue(int i, td::Result<Ref<MessageQueue>> res);
void got_out_queue_size(size_t i, td::Result<td::uint32> res);
bool adjust_shard_config();
bool store_shard_fees(ShardIdFull shard, const block::CurrencyCollection& fees,
const block::CurrencyCollection& created);
Expand Down Expand Up @@ -260,6 +260,7 @@ class Collator final : public td::actor::Actor {
bool check_prev_block_exact(const BlockIdExt& listed, const BlockIdExt& prev);
bool check_this_shard_mc_info();
bool request_neighbor_msg_queues();
bool request_out_msg_queue_size();
void update_max_lt(ton::LogicalTime lt);
bool is_masterchain() const {
return shard_.is_masterchain();
Expand Down
Loading

0 comments on commit 6897b56

Please sign in to comment.