From 5e6b67ae96d31a8608375ee12835b11c7721a6d3 Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Wed, 13 Dec 2023 12:57:34 +0300 Subject: [PATCH 1/3] Improve handling outbound message queues (#825) * Improve handling outbound message queues * Cleanup queue faster * Calculate queue sizes in background * Force or limit split/merge depending on queue size * Increase validate_ref limit for transaction * Add all changes of public libraries to block size estimation * Don't crash on timeout in GC * Don't import external messages when queue is too big --------- Co-authored-by: SpyCheese --- crypto/block/block.cpp | 26 ++- crypto/block/block.h | 12 +- crypto/block/output-queue-merger.cpp | 20 +- crypto/block/output-queue-merger.h | 12 +- crypto/block/transaction.cpp | 37 +++- validator-engine/validator-engine.cpp | 13 ++ validator/CMakeLists.txt | 2 + validator/impl/collator-impl.h | 5 +- validator/impl/collator.cpp | 282 +++++++++++++++++------- validator/manager-disk.hpp | 9 + validator/manager-hardfork.hpp | 9 + validator/manager.cpp | 10 +- validator/manager.hpp | 14 ++ validator/queue-size-counter.cpp | 301 ++++++++++++++++++++++++++ validator/queue-size-counter.hpp | 82 +++++++ validator/validator.h | 2 + 16 files changed, 726 insertions(+), 110 deletions(-) create mode 100644 validator/queue-size-counter.cpp create mode 100644 validator/queue-size-counter.hpp diff --git a/crypto/block/block.cpp b/crypto/block/block.cpp index 9a3dba601..a22fd1e56 100644 --- a/crypto/block/block.cpp +++ b/crypto/block/block.cpp @@ -715,7 +715,7 @@ td::uint64 BlockLimitStatus::estimate_block_size(const vm::NewCellStorageStat::S sum += *extra; } return 2000 + (sum.bits >> 3) + sum.cells * 12 + sum.internal_refs * 3 + sum.external_refs * 40 + accounts * 200 + - transactions * 200 + (extra ? 200 : 0) + extra_out_msgs * 300 + extra_library_diff * 700; + transactions * 200 + (extra ? 200 : 0) + extra_out_msgs * 300 + public_library_diff * 700; } int BlockLimitStatus::classify() const { @@ -1009,8 +1009,8 @@ td::Status ShardState::merge_with(ShardState& sib) { return td::Status::OK(); } -td::Result> ShardState::compute_split_out_msg_queue( - ton::ShardIdFull subshard) { +td::Result> ShardState::compute_split_out_msg_queue(ton::ShardIdFull subshard, + td::uint32* queue_size) { auto shard = id_.shard_full(); if (!ton::shard_is_parent(shard, subshard)) { return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() + @@ -1018,7 +1018,7 @@ td::Result> ShardState::compute_split_o } CHECK(out_msg_queue_); auto subqueue = std::make_unique(*out_msg_queue_); - int res = block::filter_out_msg_queue(*subqueue, shard, subshard); + int res = block::filter_out_msg_queue(*subqueue, shard, subshard, queue_size); if (res < 0) { return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str()); } @@ -1040,7 +1040,7 @@ td::Result> ShardState::compu return std::move(sub_processed_upto); } -td::Status ShardState::split(ton::ShardIdFull subshard) { +td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size) { if (!ton::shard_is_parent(id_.shard_full(), subshard)) { return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() + " because it is not a parent"); @@ -1058,7 +1058,7 @@ td::Status ShardState::split(ton::ShardIdFull subshard) { auto shard1 = id_.shard_full(); CHECK(ton::shard_is_parent(shard1, subshard)); CHECK(out_msg_queue_); - int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard); + int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard, queue_size); if (res1 < 0) { return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str()); } @@ -1098,8 +1098,12 @@ td::Status ShardState::split(ton::ShardIdFull subshard) { return td::Status::OK(); } -int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard) { - return out_queue.filter([subshard, old_shard](vm::CellSlice& cs, td::ConstBitPtr key, int key_len) -> int { +int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard, + td::uint32* queue_size) { + if (queue_size) { + *queue_size = 0; + } + return out_queue.filter([=](vm::CellSlice& cs, td::ConstBitPtr key, int key_len) -> int { CHECK(key_len == 352); LOG(DEBUG) << "scanning OutMsgQueue entry with key " << key.to_hex(key_len); block::tlb::MsgEnvelope::Record_std env; @@ -1122,7 +1126,11 @@ int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull ol << " does not contain current address belonging to shard " << old_shard.to_str(); return -1; } - return ton::shard_contains(subshard, cur_prefix); + bool res = ton::shard_contains(subshard, cur_prefix); + if (res && queue_size) { + ++*queue_size; + } + return res; }); } diff --git a/crypto/block/block.h b/crypto/block/block.h index 09169429b..c54949f43 100644 --- a/crypto/block/block.h +++ b/crypto/block/block.h @@ -262,7 +262,7 @@ struct BlockLimitStatus { td::uint64 gas_used{}; vm::NewCellStorageStat st_stat; unsigned accounts{}, transactions{}, extra_out_msgs{}; - unsigned extra_library_diff{}; // Number of public libraries in deleted/frozen accounts + unsigned public_library_diff{}; BlockLimitStatus(const BlockLimits& limits_, ton::LogicalTime lt = 0) : limits(limits_), cur_lt(std::max(limits_.start_lt, lt)) { } @@ -272,7 +272,7 @@ struct BlockLimitStatus { transactions = accounts = 0; gas_used = 0; extra_out_msgs = 0; - extra_library_diff = 0; + public_library_diff = 0; } td::uint64 estimate_block_size(const vm::NewCellStorageStat::Stat* extra = nullptr) const; int classify() const; @@ -433,10 +433,11 @@ struct ShardState { ton::BlockSeqno prev_mc_block_seqno, bool after_split, bool clear_history, std::function for_each_mcseqno); td::Status merge_with(ShardState& sib); - td::Result> compute_split_out_msg_queue(ton::ShardIdFull subshard); + td::Result> compute_split_out_msg_queue(ton::ShardIdFull subshard, + td::uint32* queue_size = nullptr); td::Result> compute_split_processed_upto( ton::ShardIdFull subshard); - td::Status split(ton::ShardIdFull subshard); + td::Status split(ton::ShardIdFull subshard, td::uint32* queue_size = nullptr); td::Status unpack_out_msg_queue_info(Ref out_msg_queue_info); bool clear_load_history() { overload_history_ = underload_history_ = 0; @@ -656,7 +657,8 @@ class MtCarloComputeShare { void gen_vset(); }; -int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard); +int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard, + td::uint32* queue_size = nullptr); std::ostream& operator<<(std::ostream& os, const ShardId& shard_id); diff --git a/crypto/block/output-queue-merger.cpp b/crypto/block/output-queue-merger.cpp index 1084bb1a8..aa425f6b8 100644 --- a/crypto/block/output-queue-merger.cpp +++ b/crypto/block/output-queue-merger.cpp @@ -146,22 +146,30 @@ bool OutputQueueMerger::add_root(int src, Ref outmsg_root) { return true; } -OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector _neighbors) +OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector _neighbors) : queue_for(_queue_for), neighbors(std::move(_neighbors)), eof(false), failed(false) { init(); } +OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector _neighbors) + : queue_for(_queue_for), eof(false), failed(false) { + for (auto& nb : _neighbors) { + neighbors.emplace_back(nb.top_block_id(), nb.outmsg_root, nb.is_disabled()); + } + init(); +} + void OutputQueueMerger::init() { common_pfx.bits().store_int(queue_for.workchain, 32); int l = queue_for.pfx_len(); td::bitstring::bits_store_long_top(common_pfx.bits() + 32, queue_for.shard, l); common_pfx_len = 32 + l; int i = 0; - for (block::McShardDescr& neighbor : neighbors) { - if (!neighbor.is_disabled()) { - LOG(DEBUG) << "adding " << (neighbor.outmsg_root.is_null() ? "" : "non-") << "empty output queue for neighbor #" - << i << " (" << neighbor.blk_.to_str() << ")"; - add_root(i++, neighbor.outmsg_root); + for (Neighbor& neighbor : neighbors) { + if (!neighbor.disabled_) { + LOG(DEBUG) << "adding " << (neighbor.outmsg_root_.is_null() ? "" : "non-") << "empty output queue for neighbor #" + << i << " (" << neighbor.block_id_.to_str() << ")"; + add_root(i++, neighbor.outmsg_root_); } else { LOG(DEBUG) << "skipping output queue for disabled neighbor #" << i; i++; diff --git a/crypto/block/output-queue-merger.h b/crypto/block/output-queue-merger.h index bf3d85868..07533f243 100644 --- a/crypto/block/output-queue-merger.h +++ b/crypto/block/output-queue-merger.h @@ -51,12 +51,22 @@ struct OutputQueueMerger { bool unpack_node(td::ConstBitPtr key_pfx, int key_pfx_len, Ref node); bool split(MsgKeyValue& second); }; + struct Neighbor { + ton::BlockIdExt block_id_; + td::Ref outmsg_root_; + bool disabled_; + Neighbor() = default; + Neighbor(ton::BlockIdExt block_id, td::Ref outmsg_root, bool disabled = false) + : block_id_(block_id), outmsg_root_(std::move(outmsg_root)), disabled_(disabled) { + } + }; // ton::ShardIdFull queue_for; std::vector> msg_list; - std::vector neighbors; + std::vector neighbors; public: + OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector _neighbors); OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector _neighbors); bool is_eof() const { return eof; diff --git a/crypto/block/transaction.cpp b/crypto/block/transaction.cpp index 62a48cab3..7a9073375 100644 --- a/crypto/block/transaction.cpp +++ b/crypto/block/transaction.cpp @@ -2536,6 +2536,31 @@ static td::uint32 get_public_libraries_count(const td::Ref& libraries) return count; } +/** + * Calculates the number of changes of public libraries in the dictionary. + * + * @param old_libraries The dictionary of account libraries before the transaction. + * @param new_libraries The dictionary of account libraries after the transaction. + * + * @returns The number of changed public libraries. + */ +static td::uint32 get_public_libraries_diff_count(const td::Ref& old_libraries, + const td::Ref& new_libraries) { + td::uint32 count = 0; + vm::Dictionary dict1{old_libraries, 256}; + vm::Dictionary dict2{new_libraries, 256}; + dict1.scan_diff(dict2, [&](td::ConstBitPtr key, int n, Ref val1, Ref val2) -> bool { + CHECK(n == 256); + bool is_public1 = val1.not_null() && block::is_public_library(key, val1); + bool is_public2 = val2.not_null() && block::is_public_library(key, val2); + if (is_public1 != is_public2) { + ++count; + } + return true; + }); + return count; +} + /** * Checks that the new account state fits in the limits. * This function is not called for special accounts. @@ -2979,14 +3004,14 @@ bool Transaction::serialize() { vm::load_cell_slice(root).print_rec(std::cerr); } - if (!block::gen::t_Transaction.validate_ref(root)) { + if (!block::gen::t_Transaction.validate_ref(4096, root)) { LOG(ERROR) << "newly-generated transaction failed to pass automated validation:"; vm::load_cell_slice(root).print_rec(std::cerr); block::gen::t_Transaction.print_ref(std::cerr, root); root.clear(); return false; } - if (!block::tlb::t_Transaction.validate_ref(root)) { + if (!block::tlb::t_Transaction.validate_ref(4096, root)) { LOG(ERROR) << "newly-generated transaction failed to pass hand-written validation:"; vm::load_cell_slice(root).print_rec(std::cerr); block::gen::t_Transaction.print_ref(std::cerr, root); @@ -3187,8 +3212,12 @@ bool Transaction::update_limits(block::BlockLimitStatus& blimst, bool with_size) blimst.add_account(is_first))) { return false; } - if (account.is_masterchain() && (was_frozen || was_deleted)) { - blimst.extra_library_diff += get_public_libraries_count(account.orig_library); + if (account.is_masterchain()) { + if (was_frozen || was_deleted) { + blimst.public_library_diff += get_public_libraries_count(account.orig_library); + } else { + blimst.public_library_diff += get_public_libraries_diff_count(account.orig_library, new_library); + } } } return true; diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index 273274930..d98c296c9 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -3397,6 +3397,19 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getShardO promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "no such block"))); return; } + if (!dest) { + td::actor::send_closure( + manager, &ton::validator::ValidatorManagerInterface::get_out_msg_queue_size, handle->id(), + [promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error_prefix("failed to get queue size: "))); + } else { + promise.set_value(ton::create_serialize_tl_object( + R.move_as_ok())); + } + }); + return; + } td::actor::send_closure( manager, &ton::validator::ValidatorManagerInterface::get_shard_state_from_db, handle, [=, promise = std::move(promise)](td::Result> R) mutable { diff --git a/validator/CMakeLists.txt b/validator/CMakeLists.txt index 068569de8..f858e6ffc 100644 --- a/validator/CMakeLists.txt +++ b/validator/CMakeLists.txt @@ -52,6 +52,7 @@ set(VALIDATOR_HEADERS invariants.hpp import-db-slice.hpp + queue-size-counter.hpp manager-disk.h manager-disk.hpp @@ -77,6 +78,7 @@ set(VALIDATOR_SOURCE validator-full-id.cpp validator-group.cpp validator-options.cpp + queue-size-counter.cpp downloaders/wait-block-data.cpp downloaders/wait-block-state.cpp diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index b30639930..fe931cd66 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -60,7 +60,6 @@ class Collator final : public td::actor::Actor { bool preinit_complete{false}; bool is_key_block_{false}; bool block_full_{false}; - bool outq_cleanup_partial_{false}; bool inbound_queues_empty_{false}; bool libraries_changed_{false}; bool prev_key_block_exists_{false}; @@ -159,7 +158,6 @@ class Collator final : public td::actor::Actor { bool report_version_{false}; bool skip_topmsgdescr_{false}; bool skip_extmsg_{false}; - bool queue_too_big_{false}; bool short_dequeue_records_{false}; td::uint64 overload_history_{0}, underload_history_{0}; td::uint64 block_size_estimate_{}; @@ -189,6 +187,7 @@ class Collator final : public td::actor::Actor { std::priority_queue, std::greater> new_msgs; std::pair last_proc_int_msg_, first_unproc_int_msg_; std::unique_ptr in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_; + td::uint32 out_msg_queue_size_ = 0; std::unique_ptr ihr_pending; std::shared_ptr processed_upto_, sibling_processed_upto_; std::unique_ptr block_create_stats_; @@ -227,6 +226,7 @@ class Collator final : public td::actor::Actor { bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner); bool fix_processed_upto(block::MsgProcessedUptoCollection& upto); void got_neighbor_out_queue(int i, td::Result> res); + void got_out_queue_size(size_t i, td::Result res); bool adjust_shard_config(); bool store_shard_fees(ShardIdFull shard, const block::CurrencyCollection& fees, const block::CurrencyCollection& created); @@ -260,6 +260,7 @@ class Collator final : public td::actor::Actor { bool check_prev_block_exact(const BlockIdExt& listed, const BlockIdExt& prev); bool check_this_shard_mc_info(); bool request_neighbor_msg_queues(); + bool request_out_msg_queue_size(); void update_max_lt(ton::LogicalTime lt); bool is_masterchain() const { return shard_.is_masterchain(); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 2e4dfa1d2..2f4a002a0 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -44,6 +44,12 @@ namespace validator { using td::Ref; using namespace std::literals::string_literals; +// Don't increase MERGE_MAX_QUEUE_LIMIT too much: merging requires cleaning the whole queue in out_msg_queue_cleanup +static const td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096; +static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000; +static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047; +static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000; + #define DBG(__n) dbg(__n)&& #define DSTART int __dcnt = 0; #define DEB DBG(++__dcnt) @@ -790,6 +796,26 @@ bool Collator::request_neighbor_msg_queues() { return true; } +/** + * Requests the size of the outbound message queue from the previous state(s). + * +* @returns True if the request was successful, false otherwise. + */ +bool Collator::request_out_msg_queue_size() { + if (after_split_) { + // If block is after split, the size is calculated during split (see Collator::split_last_state) + return true; + } + for (size_t i = 0; i < prev_blocks.size(); ++i) { + ++pending; + send_closure_later(manager, &ValidatorManager::get_out_msg_queue_size, prev_blocks[i], + [self = get_self(), i](td::Result res) { + td::actor::send_closure(std::move(self), &Collator::got_out_queue_size, i, std::move(res)); + }); + } + return true; +} + /** * Handles the result of obtaining the outbound queue for a neighbor. * @@ -854,6 +880,27 @@ void Collator::got_neighbor_out_queue(int i, td::Result> res) check_pending(); } +/** + * Handles the result of obtaining the size of the outbound message queue. + * + * If the block is after merge then the two sizes are added. + * + * @param i The index of the previous block (0 or 1). + * @param res The result object containing the size of the queue. + */ +void Collator::got_out_queue_size(size_t i, td::Result res) { + --pending; + if (res.is_error()) { + fatal_error( + res.move_as_error_prefix(PSTRING() << "failed to get message queue size from prev block #" << i << ": ")); + return; + } + td::uint32 size = res.move_as_ok(); + LOG(DEBUG) << "got outbound queue size from prev block #" << i << ": " << size; + out_msg_queue_size_ += size; + check_pending(); +} + /** * Unpacks and merges the states of two previous blocks. * Used if the block is after_merge. @@ -972,7 +1019,7 @@ bool Collator::split_last_state(block::ShardState& ss) { return fatal_error(res2.move_as_error()); } sibling_processed_upto_ = res2.move_as_ok(); - auto res3 = ss.split(shard_); + auto res3 = ss.split(shard_, &out_msg_queue_size_); if (res3.is_error()) { return fatal_error(std::move(res3)); } @@ -1449,6 +1496,9 @@ bool Collator::do_preinit() { if (!request_neighbor_msg_queues()) { return false; } + if (!request_out_msg_queue_size()) { + return false; + } return true; } @@ -1824,7 +1874,6 @@ bool Collator::init_utime() { // Extend collator timeout if previous block is too old td::Timestamp new_timeout = td::Timestamp::in(std::min(30.0, (td::Clocks::system() - (double)prev_now_) / 2)); if (timeout < new_timeout) { - double add = new_timeout.at() - timeout.at(); timeout = new_timeout; alarm_timestamp() = timeout; } @@ -2174,95 +2223,144 @@ bool Collator::out_msg_queue_cleanup() { block::gen::t_OutMsgQueue.print(std::cerr, *rt); rt->print_rec(std::cerr); } - for (const auto& nb : neighbors_) { - if (!nb.is_disabled() && (!nb.processed_upto || !nb.processed_upto->can_check_processed())) { - return fatal_error(-667, PSTRING() << "internal error: no info for checking processed messages from neighbor " - << nb.blk_.to_str()); - } - } - auto queue_root = out_msg_queue_->get_root_cell(); - if (queue_root.is_null()) { - LOG(DEBUG) << "out_msg_queue is empty"; - return true; - } - auto old_out_msg_queue = std::make_unique(queue_root, 352, block::tlb::aug_OutMsgQueue); - - int deleted = 0; - int total = 0; - bool fail = false; - old_out_msg_queue->check_for_each([&](Ref value, td::ConstBitPtr key, int n) -> bool { - ++total; - assert(n == 352); - vm::CellSlice& cs = value.write(); - // LOG(DEBUG) << "key is " << key.to_hex(n); - if (queue_cleanup_timeout_.is_in_past(td::Timestamp::now())) { - LOG(WARNING) << "cleaning up outbound queue takes too long, ending"; - outq_cleanup_partial_ = true; - return false; // retain all remaining outbound queue entries including this one without processing + if (after_merge_) { + // We need to clean the whole queue after merge + // Queue is not too big, see const MERGE_MAX_QUEUE_SIZE + for (const auto& nb : neighbors_) { + if (!nb.is_disabled() && (!nb.processed_upto || !nb.processed_upto->can_check_processed())) { + return fatal_error(-667, PSTRING() << "internal error: no info for checking processed messages from neighbor " + << nb.blk_.to_str()); + } } - if (block_full_) { - LOG(WARNING) << "BLOCK FULL while cleaning up outbound queue, cleanup completed only partially"; - outq_cleanup_partial_ = true; - return false; // retain all remaining outbound queue entries including this one without processing + td::uint32 deleted = 0; + auto res = out_msg_queue_->filter([&](vm::CellSlice& cs, td::ConstBitPtr key, int n) -> int { + assert(n == 352); + block::EnqueuedMsgDescr enq_msg_descr; + unsigned long long created_lt; + if (!(cs.fetch_ulong_bool(64, created_lt) // augmentation + && enq_msg_descr.unpack(cs) // unpack EnqueuedMsg + && enq_msg_descr.check_key(key) // check key + && enq_msg_descr.lt_ == created_lt)) { + LOG(ERROR) << "cannot unpack EnqueuedMsg with key " << key.to_hex(n); + return -1; + } + LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," + << enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_; + bool delivered = false; + ton::LogicalTime deliver_lt = 0; + for (const auto& neighbor : neighbors_) { + // could look up neighbor with shard containing enq_msg_descr.next_prefix more efficiently + // (instead of checking all neighbors) + if (!neighbor.is_disabled() && neighbor.processed_upto->already_processed(enq_msg_descr)) { + delivered = true; + deliver_lt = neighbor.end_lt(); + break; + } + } + if (delivered) { + ++deleted; + CHECK(out_msg_queue_size_ > 0); + --out_msg_queue_size_; + LOG(DEBUG) << "outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex() + << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ << " has been already delivered, dequeueing"; + if (!dequeue_message(std::move(enq_msg_descr.msg_env_), deliver_lt)) { + fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," + << enq_msg_descr.hash_.to_hex() << ") by inserting a msg_export_deq record"); + return -1; + } + register_out_msg_queue_op(); + if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) { + block_full_ = true; + } + } + return !delivered; + }); + LOG(INFO) << "deleted " << deleted << " messages from out_msg_queue after merge, remaining queue size is " + << out_msg_queue_size_; + if (res < 0) { + return fatal_error("error scanning/updating OutMsgQueue"); } - block::EnqueuedMsgDescr enq_msg_descr; - unsigned long long created_lt; - if (!(cs.fetch_ulong_bool(64, created_lt) // augmentation - && enq_msg_descr.unpack(cs) // unpack EnqueuedMsg - && enq_msg_descr.check_key(key) // check key - && enq_msg_descr.lt_ == created_lt)) { - LOG(ERROR) << "cannot unpack EnqueuedMsg with key " << key.to_hex(n); - fail = true; - return false; + } else { + std::vector> queue_parts; + + block::OutputQueueMerger::Neighbor this_queue{BlockIdExt{new_id} /* block id is only used for logs */, + out_msg_queue_->get_root_cell()}; + for (const auto& nb : neighbors_) { + if (nb.is_disabled()) { + continue; + } + if (!nb.processed_upto || !nb.processed_upto->can_check_processed()) { + return fatal_error(-667, PSTRING() << "internal error: no info for checking processed messages from neighbor " + << nb.blk_.to_str()); + } + queue_parts.emplace_back(block::OutputQueueMerger{nb.shard(), {this_queue}}, &nb); } - LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," - << enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_; - bool delivered = false; - ton::LogicalTime deliver_lt = 0; - for (const auto& neighbor : neighbors_) { - // could look up neighbor with shard containing enq_msg_descr.next_prefix more efficiently - // (instead of checking all neighbors) - if (!neighbor.is_disabled() && neighbor.processed_upto->already_processed(enq_msg_descr)) { - delivered = true; - deliver_lt = neighbor.end_lt(); + + size_t i = 0; + td::uint32 deleted = 0; + while (!queue_parts.empty()) { + if (block_full_) { + LOG(WARNING) << "BLOCK FULL while cleaning up outbound queue, cleanup completed only partially"; break; } - } - if (delivered) { - LOG(DEBUG) << "outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex() - << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ << " has been already delivered, dequeueing"; - ++deleted; - out_msg_queue_->lookup_delete_with_extra(key, n); - if (!dequeue_message(std::move(enq_msg_descr.msg_env_), deliver_lt)) { - fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," - << enq_msg_descr.hash_.to_hex() << ") by inserting a msg_export_deq record"); - fail = true; - return false; + if (queue_cleanup_timeout_.is_in_past(td::Timestamp::now())) { + LOG(WARNING) << "cleaning up outbound queue takes too long, ending"; + break; } - register_out_msg_queue_op(); - if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) { - block_full_ = true; + if (i == queue_parts.size()) { + i = 0; + } + auto& queue = queue_parts.at(i).first; + auto nb = queue_parts.at(i).second; + auto kv = queue.extract_cur(); + if (kv) { + block::EnqueuedMsgDescr enq_msg_descr; + if (!(enq_msg_descr.unpack(kv->msg.write()) // unpack EnqueuedMsg + && enq_msg_descr.check_key(kv->key.cbits()) // check key + )) { + return fatal_error(PSTRING() << "error scanning/updating OutMsgQueue: cannot unpack EnqueuedMsg with key " + << kv->key.to_hex()); + } + if (nb->processed_upto->already_processed(enq_msg_descr)) { + LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," + << enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ + << ": message has been already delivered, dequeueing"; + ++deleted; + CHECK(out_msg_queue_size_ > 0); + --out_msg_queue_size_; + out_msg_queue_->lookup_delete_with_extra(kv->key.cbits(), kv->key_len); + if (!dequeue_message(std::move(enq_msg_descr.msg_env_), nb->end_lt())) { + return fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ + << "," << enq_msg_descr.hash_.to_hex() + << ") by inserting a msg_export_deq record"); + } + register_out_msg_queue_op(); + if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) { + block_full_ = true; + } + queue.next(); + ++i; + continue; + } else { + LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," + << enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ + << ": message has not been delivered"; + } } + LOG(DEBUG) << "no more unprocessed messages to shard " << nb->shard().to_str(); + std::swap(queue_parts[i], queue_parts.back()); + queue_parts.pop_back(); } - return true; - }, false, true /* random order */); - LOG(INFO) << "deleted " << deleted << " messages from out_msg_queue, processed " << total << " messages in total"; - if (fail) { - return fatal_error("error scanning/updating OutMsgQueue"); - } - if (outq_cleanup_partial_ || total > 8000) { - LOG(INFO) << "out_msg_queue too big, skipping importing external messages"; - skip_extmsg_ = true; - queue_too_big_ = true; + LOG(INFO) << "deleted " << deleted << " messages from out_msg_queue, remaining queue size is " + << out_msg_queue_size_; } - auto rt = out_msg_queue_->get_root(); if (verbosity >= 2) { + auto rt = out_msg_queue_->get_root(); std::cerr << "new out_msg_queue is "; block::gen::t_OutMsgQueue.print(std::cerr, *rt); rt->print_rec(std::cerr); } - // CHECK(block::gen::t_OutMsgQueue.validate_upto(100000, *rt)); // DEBUG, comment later if SLOW return register_out_msg_queue_op(true); } @@ -3047,6 +3145,7 @@ bool Collator::enqueue_transit_message(Ref msg, Ref old_msg_ try { LOG(DEBUG) << "inserting into outbound queue message with (lt,key)=(" << start_lt << "," << key.to_hex() << ")"; ok = out_msg_queue_->set_builder(key.bits(), 352, cb, vm::Dictionary::SetMode::Add); + ++out_msg_queue_size_; } catch (vm::VmError) { ok = false; } @@ -3069,6 +3168,8 @@ bool Collator::delete_out_msg_queue_msg(td::ConstBitPtr key) { try { LOG(DEBUG) << "deleting from outbound queue message with key=" << key.to_hex(352); queue_rec = out_msg_queue_->lookup_delete(key, 352); + CHECK(out_msg_queue_size_ > 0); + --out_msg_queue_size_; } catch (vm::VmError err) { LOG(ERROR) << "error deleting from out_msg_queue dictionary: " << err.get_msg(); } @@ -3309,8 +3410,9 @@ bool Collator::process_inbound_external_messages() { LOG(INFO) << "skipping processing of inbound external messages"; return true; } - if (out_msg_queue_->get_root_cell().not_null() && out_msg_queue_->get_root_cell()->get_depth() > 12) { - LOG(INFO) << "skipping processing of inbound external messages: out msg queue is too big"; + if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE) { + LOG(INFO) << "skipping processing of inbound external messages because out_msg_queue is too big (" + << out_msg_queue_size_ << " > " << SKIP_EXTERNALS_QUEUE_SIZE << ")"; return true; } bool full = !block_limit_status_->fits(block::ParamLimits::cl_soft); @@ -3550,6 +3652,7 @@ bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_rema LOG(DEBUG) << "inserting into outbound queue a new message with (lt,key)=(" << start_lt << "," << key.to_hex() << ")"; ok = out_msg_queue_->set_builder(key.bits(), 352, cb, vm::Dictionary::SetMode::Add); + ++out_msg_queue_size_; } catch (vm::VmError) { ok = false; } @@ -4158,6 +4261,7 @@ static int history_weight(td::uint64 history) { * @returns True if the check is successful. */ bool Collator::check_block_overload() { + LOG(INFO) << "final out_msg_queue size is " << out_msg_queue_size_; overload_history_ <<= 1; underload_history_ <<= 1; block_size_estimate_ = block_limit_status_->estimate_block_size(); @@ -4166,18 +4270,32 @@ bool Collator::check_block_overload() { << " size_estimate=" << block_size_estimate_; auto cl = block_limit_status_->classify(); if (cl <= block::ParamLimits::cl_underload) { - if (queue_too_big_) { - LOG(INFO) << "block is underloaded, but don't set underload history because out msg queue is big"; + if (out_msg_queue_size_ > MERGE_MAX_QUEUE_SIZE) { + LOG(INFO) + << "block is underloaded, but don't set underload history because out_msg_queue size is too big to merge (" + << out_msg_queue_size_ << " > " << MERGE_MAX_QUEUE_SIZE << ")"; } else { underload_history_ |= 1; LOG(INFO) << "block is underloaded"; } } else if (cl >= block::ParamLimits::cl_soft) { - overload_history_ |= 1; - LOG(INFO) << "block is overloaded (category " << cl << ")"; + if (out_msg_queue_size_ > SPLIT_MAX_QUEUE_SIZE) { + LOG(INFO) << "block is overloaded (category " << cl + << "), but don't set overload history because out_msg_queue size is too big to split (" + << out_msg_queue_size_ << " > " << SPLIT_MAX_QUEUE_SIZE << ")"; + } else { + overload_history_ |= 1; + LOG(INFO) << "block is overloaded (category " << cl << ")"; + } } else { LOG(INFO) << "block is loaded normally"; } + if (!(overload_history_ & 1) && out_msg_queue_size_ >= FORCE_SPLIT_QUEUE_SIZE && + out_msg_queue_size_ <= SPLIT_MAX_QUEUE_SIZE) { + overload_history_ |= 1; + LOG(INFO) << "setting overload history because out_msg_queue reached force split limit (" << out_msg_queue_size_ + << " >= " << FORCE_SPLIT_QUEUE_SIZE << ")"; + } if (collator_settings & 1) { LOG(INFO) << "want_split manually set"; want_split_ = true; diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 2745e02e9..1c3f4f5b9 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -23,6 +23,7 @@ #include "validator-group.hpp" #include "manager-init.h" #include "manager-disk.h" +#include "queue-size-counter.hpp" #include #include @@ -376,6 +377,13 @@ class ValidatorManagerImpl : public ValidatorManager { void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override { UNREACHABLE(); } + void get_out_msg_queue_size(BlockIdExt block_id, td::Promise promise) override { + if (queue_size_counter_.empty()) { + queue_size_counter_ = + td::actor::create_actor("queuesizecounter", td::Ref{}, actor_id(this)); + } + td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise)); + } private: PublicKeyHash local_id_; @@ -393,6 +401,7 @@ class ValidatorManagerImpl : public ValidatorManager { int pending_new_shard_block_descr_{0}; std::vector>>> waiting_new_shard_block_descr_; + td::actor::ActorOwn queue_size_counter_; void update_shards(); void update_shard_blocks(); diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 6a145191e..675c23041 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -23,6 +23,7 @@ #include "validator-group.hpp" #include "manager-init.h" #include "manager-hardfork.h" +#include "queue-size-counter.hpp" #include #include @@ -437,6 +438,13 @@ class ValidatorManagerImpl : public ValidatorManager { void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override { UNREACHABLE(); } + void get_out_msg_queue_size(BlockIdExt block_id, td::Promise promise) override { + if (queue_size_counter_.empty()) { + queue_size_counter_ = + td::actor::create_actor("queuesizecounter", td::Ref{}, actor_id(this)); + } + td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise)); + } private: td::Ref opts_; @@ -445,6 +453,7 @@ class ValidatorManagerImpl : public ValidatorManager { std::string db_root_; ShardIdFull shard_to_generate_; BlockIdExt block_to_generate_; + td::actor::ActorOwn queue_size_counter_; }; } // namespace validator diff --git a/validator/manager.cpp b/validator/manager.cpp index 8caed0f69..54d272bc9 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -2277,7 +2277,15 @@ void ValidatorManagerImpl::allow_block_info_gc(BlockIdExt block_id, td::Promise< void ValidatorManagerImpl::got_next_gc_masterchain_handle(BlockHandle handle) { CHECK(gc_advancing_); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result> R) { - R.ensure(); + if (R.is_error()) { + if (R.error().code() == ErrorCode::timeout) { + LOG(ERROR) << "Failed to get gc masterchain state, retrying: " << R.move_as_error(); + td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_handle, std::move(handle)); + } else { + LOG(FATAL) << "Failed to get gc masterchain state: " << R.move_as_error(); + } + return; + } td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_state, std::move(handle), td::Ref{R.move_as_ok()}); }); diff --git a/validator/manager.hpp b/validator/manager.hpp index ccd854235..9f51cc27d 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -28,6 +28,7 @@ #include "state-serializer.hpp" #include "rldp/rldp.h" #include "token-manager.h" +#include "queue-size-counter.hpp" #include #include @@ -548,6 +549,18 @@ class ValidatorManagerImpl : public ValidatorManager { void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override; + void get_out_msg_queue_size(BlockIdExt block_id, td::Promise promise) override { + if (queue_size_counter_.empty()) { + if (last_masterchain_state_.is_null()) { + promise.set_error(td::Status::Error(ErrorCode::notready, "not ready")); + return; + } + queue_size_counter_ = td::actor::create_actor("queuesizecounter", + last_masterchain_state_, actor_id(this)); + } + td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise)); + } + private: td::Timestamp resend_shard_blocks_at_; td::Timestamp check_waiters_at_; @@ -612,6 +625,7 @@ class ValidatorManagerImpl : public ValidatorManager { private: std::map> shard_client_waiters_; + td::actor::ActorOwn queue_size_counter_; }; } // namespace validator diff --git a/validator/queue-size-counter.cpp b/validator/queue-size-counter.cpp new file mode 100644 index 000000000..4780f202c --- /dev/null +++ b/validator/queue-size-counter.cpp @@ -0,0 +1,301 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#include "queue-size-counter.hpp" +#include "block/block-auto.h" +#include "block/block-parse.h" +#include "common/delay.h" +#include "td/actor/MultiPromise.h" +#include "td/utils/Random.h" + +namespace ton::validator { + +static td::Result calc_queue_size(const td::Ref &state) { + td::uint32 size = 0; + TRY_RESULT(outq_descr, state->message_queue()); + block::gen::OutMsgQueueInfo::Record qinfo; + if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { + return td::Status::Error("invalid message queue"); + } + vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue}; + bool ok = queue.check_for_each([&](td::Ref, td::ConstBitPtr, int) -> bool { + ++size; + return true; + }); + if (!ok) { + return td::Status::Error("invalid message queue dict"); + } + return size; +} + +static td::Result recalc_queue_size(const td::Ref &state, const td::Ref &prev_state, + td::uint32 prev_size) { + TRY_RESULT(outq_descr, state->message_queue()); + block::gen::OutMsgQueueInfo::Record qinfo; + if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { + return td::Status::Error("invalid message queue"); + } + vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue}; + + TRY_RESULT(prev_outq_descr, prev_state->message_queue()); + block::gen::OutMsgQueueInfo::Record prev_qinfo; + if (!tlb::unpack_cell(prev_outq_descr->root_cell(), prev_qinfo)) { + return td::Status::Error("invalid message queue"); + } + vm::AugmentedDictionary prev_queue{prev_qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue}; + td::uint32 add = 0, rem = 0; + bool ok = prev_queue.scan_diff( + queue, [&](td::ConstBitPtr, int, td::Ref prev_val, td::Ref new_val) -> bool { + if (prev_val.not_null()) { + ++rem; + } + if (new_val.not_null()) { + ++add; + } + return true; + }); + if (!ok) { + return td::Status::Error("invalid message queue dict"); + } + if (prev_size + add < rem) { + return td::Status::Error("negative value"); + } + return prev_size + add - rem; +} + +void QueueSizeCounter::start_up() { + if (init_masterchain_state_.is_null()) { + // Used in manager-hardfork or manager-disk + simple_mode_ = true; + return; + } + current_seqno_ = init_masterchain_state_->get_seqno(); + process_top_shard_blocks_cont(init_masterchain_state_, true); + init_masterchain_state_ = {}; + alarm(); +} + +void QueueSizeCounter::get_queue_size(BlockIdExt block_id, td::Promise promise) { + get_queue_size_ex(block_id, simple_mode_ || is_block_too_old(block_id), std::move(promise)); +} + +void QueueSizeCounter::get_queue_size_ex(ton::BlockIdExt block_id, bool calc_whole, td::Promise promise) { + Entry &entry = results_[block_id]; + if (entry.done_) { + promise.set_result(entry.queue_size_); + return; + } + entry.promises_.push_back(std::move(promise)); + if (entry.started_) { + return; + } + entry.started_ = true; + entry.calc_whole_ = calc_whole; + td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, block_id, true, + [SelfId = actor_id(this), block_id, manager = manager_](td::Result R) mutable { + if (R.is_error()) { + td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, block_id, R.move_as_error()); + return; + } + BlockHandle handle = R.move_as_ok(); + td::actor::send_closure( + manager, &ValidatorManager::wait_block_state, handle, 0, td::Timestamp::in(10.0), + [SelfId, handle](td::Result> R) mutable { + if (R.is_error()) { + td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, handle->id(), + R.move_as_error()); + return; + } + td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont, + std::move(handle), R.move_as_ok()); + }); + }); +} + +void QueueSizeCounter::get_queue_size_cont(BlockHandle handle, td::Ref state) { + Entry &entry = results_[handle->id()]; + CHECK(entry.started_); + bool calc_whole = entry.calc_whole_ || handle->id().seqno() == 0; + if (!calc_whole) { + CHECK(handle->inited_prev()); + auto prev_blocks = handle->prev(); + bool after_split = prev_blocks.size() == 1 && handle->id().shard_full() != prev_blocks[0].shard_full(); + bool after_merge = prev_blocks.size() == 2; + calc_whole = after_split || after_merge; + } + if (calc_whole) { + auto r_size = calc_queue_size(state); + if (r_size.is_error()) { + on_error(handle->id(), r_size.move_as_error()); + return; + } + entry.done_ = true; + entry.queue_size_ = r_size.move_as_ok(); + for (auto &promise : entry.promises_) { + promise.set_result(entry.queue_size_); + } + entry.promises_.clear(); + return; + } + + auto prev_block_id = handle->one_prev(true); + get_queue_size(prev_block_id, [=, SelfId = actor_id(this), manager = manager_](td::Result R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error()); + return; + } + td::uint32 prev_size = R.move_as_ok(); + td::actor::send_closure( + manager, &ValidatorManager::wait_block_state_short, prev_block_id, 0, td::Timestamp::in(10.0), + [=](td::Result> R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error()); + return; + } + td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont2, state, R.move_as_ok(), prev_size); + }); + }); +} + +void QueueSizeCounter::get_queue_size_cont2(td::Ref state, td::Ref prev_state, + td::uint32 prev_size) { + BlockIdExt block_id = state->get_block_id(); + Entry &entry = results_[block_id]; + CHECK(entry.started_); + auto r_size = recalc_queue_size(state, prev_state, prev_size); + if (r_size.is_error()) { + on_error(block_id, r_size.move_as_error()); + return; + } + entry.done_ = true; + entry.queue_size_ = r_size.move_as_ok(); + for (auto &promise : entry.promises_) { + promise.set_result(entry.queue_size_); + } + entry.promises_.clear(); +} + +void QueueSizeCounter::on_error(ton::BlockIdExt block_id, td::Status error) { + auto it = results_.find(block_id); + if (it == results_.end()) { + return; + } + Entry &entry = it->second; + CHECK(!entry.done_); + for (auto &promise : entry.promises_) { + promise.set_error(error.clone()); + } + results_.erase(it); +} + +void QueueSizeCounter::process_top_shard_blocks() { + LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks seqno=" << current_seqno_; + td::actor::send_closure( + manager_, &ValidatorManager::get_block_by_seqno_from_db, AccountIdPrefixFull{masterchainId, 0}, current_seqno_, + [SelfId = actor_id(this), manager = manager_](td::Result R) { + if (R.is_error()) { + LOG(WARNING) << "Failed to get masterchain block id: " << R.move_as_error(); + delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); }, + td::Timestamp::in(5.0)); + return; + } + td::actor::send_closure( + manager, &ValidatorManager::wait_block_state_short, R.ok()->id(), 0, td::Timestamp::in(10.0), + [=](td::Result> R) { + if (R.is_error()) { + LOG(WARNING) << "Failed to get masterchain state: " << R.move_as_error(); + delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); }, + td::Timestamp::in(5.0)); + return; + } + td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_cont, + td::Ref(R.move_as_ok()), false); + }); + }); +} + +void QueueSizeCounter::process_top_shard_blocks_cont(td::Ref state, bool init) { + LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks_cont seqno=" << current_seqno_ << " init=" << init; + td::MultiPromise mp; + auto ig = mp.init_guard(); + last_top_blocks_.clear(); + last_top_blocks_.push_back(state->get_block_id()); + for (auto &shard : state->get_shards()) { + last_top_blocks_.push_back(shard->top_block_id()); + } + for (const BlockIdExt &block_id : last_top_blocks_) { + get_queue_size_ex_retry(block_id, init, ig.get_promise()); + } + ig.add_promise([SelfId = actor_id(this)](td::Result R) { + if (R.is_error()) { + return; + } + td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_finish); + }); + if (init) { + init_top_blocks_ = last_top_blocks_; + } +} + +void QueueSizeCounter::get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise promise) { + get_queue_size_ex(block_id, calc_whole, + [=, promise = std::move(promise), SelfId = actor_id(this)](td::Result R) mutable { + if (R.is_error()) { + LOG(WARNING) << "Failed to calculate queue size for block " << block_id.to_str() << ": " + << R.move_as_error(); + delay_action( + [=, promise = std::move(promise)]() mutable { + td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_ex_retry, block_id, + calc_whole, std::move(promise)); + }, + td::Timestamp::in(5.0)); + return; + } + promise.set_result(td::Unit()); + }); +} + +void QueueSizeCounter::process_top_shard_blocks_finish() { + ++current_seqno_; + wait_shard_client(); +} + +void QueueSizeCounter::wait_shard_client() { + LOG(DEBUG) << "QueueSizeCounter::wait_shard_client seqno=" << current_seqno_; + td::actor::send_closure( + manager_, &ValidatorManager::wait_shard_client_state, current_seqno_, td::Timestamp::in(60.0), + [SelfId = actor_id(this)](td::Result R) { + if (R.is_error()) { + delay_action([=]() mutable { td::actor::send_closure(SelfId, &QueueSizeCounter::wait_shard_client); }, + td::Timestamp::in(5.0)); + return; + } + td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); + }); +} + +void QueueSizeCounter::alarm() { + for (auto it = results_.begin(); it != results_.end();) { + if (it->second.done_ && is_block_too_old(it->first)) { + it = results_.erase(it); + } else { + ++it; + } + } + alarm_timestamp() = td::Timestamp::in(td::Random::fast(20.0, 40.0)); +} + +} // namespace ton::validator \ No newline at end of file diff --git a/validator/queue-size-counter.hpp b/validator/queue-size-counter.hpp new file mode 100644 index 000000000..fabb0cec3 --- /dev/null +++ b/validator/queue-size-counter.hpp @@ -0,0 +1,82 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once +#include "interfaces/validator-manager.h" + +namespace ton::validator { + +class QueueSizeCounter : public td::actor::Actor { + public: + QueueSizeCounter(td::Ref last_masterchain_state, td::actor::ActorId manager) + : init_masterchain_state_(last_masterchain_state), manager_(std::move(manager)) { + } + + void start_up() override; + void get_queue_size(BlockIdExt block_id, td::Promise promise); + void alarm() override; + + private: + td::Ref init_masterchain_state_; + td::actor::ActorId manager_; + bool simple_mode_ = false; + + BlockSeqno current_seqno_ = 0; + std::vector init_top_blocks_; + std::vector last_top_blocks_; + + struct Entry { + bool started_ = false; + bool done_ = false; + bool calc_whole_ = false; + td::uint32 queue_size_ = 0; + std::vector> promises_; + }; + std::map results_; + + void get_queue_size_ex(BlockIdExt block_id, bool calc_whole, td::Promise promise); + void get_queue_size_cont(BlockHandle handle, td::Ref state); + void get_queue_size_cont2(td::Ref state, td::Ref prev_state, td::uint32 prev_size); + void on_error(BlockIdExt block_id, td::Status error); + + void process_top_shard_blocks(); + void process_top_shard_blocks_cont(td::Ref state, bool init = false); + void get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise promise); + void process_top_shard_blocks_finish(); + void wait_shard_client(); + + bool is_block_too_old(const BlockIdExt& block_id) const { + for (const BlockIdExt& top_block : last_top_blocks_) { + if (shard_intersects(block_id.shard_full(), top_block.shard_full())) { + if (block_id.seqno() + 100 < top_block.seqno()) { + return true; + } + break; + } + } + for (const BlockIdExt& init_top_block : init_top_blocks_) { + if (shard_intersects(block_id.shard_full(), init_top_block.shard_full())) { + if (block_id.seqno() < init_top_block.seqno()) { + return true; + } + break; + } + } + return false; + } +}; + +} // namespace ton::validator diff --git a/validator/validator.h b/validator/validator.h index 7cdea805c..2fefb064e 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -228,6 +228,8 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void prepare_perf_timer_stats(td::Promise> promise) = 0; virtual void add_perf_timer_stat(std::string name, double duration) = 0; + virtual void get_out_msg_queue_size(BlockIdExt block_id, td::Promise promise) = 0; + }; } // namespace validator From 7a457ca2781bc2f69a90ac5e191942668c3170f7 Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Wed, 13 Dec 2023 15:12:58 +0300 Subject: [PATCH 2/3] Fix linking error (#827) * Fix linking error --------- Co-authored-by: SpyCheese --- validator/CMakeLists.txt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/validator/CMakeLists.txt b/validator/CMakeLists.txt index f858e6ffc..4ecc865cc 100644 --- a/validator/CMakeLists.txt +++ b/validator/CMakeLists.txt @@ -100,7 +100,8 @@ set(DISK_VALIDATOR_SOURCE validator-full-id.cpp validator-group.cpp validator-options.cpp - + queue-size-counter.cpp + downloaders/wait-block-data-disk.cpp downloaders/wait-block-state.cpp downloaders/wait-block-state-merge.cpp @@ -119,7 +120,8 @@ set(HARDFORK_VALIDATOR_SOURCE validator-full-id.cpp validator-group.cpp validator-options.cpp - + queue-size-counter.cpp + downloaders/wait-block-data-disk.cpp downloaders/wait-block-state.cpp downloaders/wait-block-state-merge.cpp From 1fc4a0faed069f6c68c84fa41288bdd2d8665128 Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Wed, 13 Dec 2023 21:33:54 +0300 Subject: [PATCH 3/3] Move low blockrate lt_limits to appropriate place (#828) * Fix setting lt_delta limits --------- Co-authored-by: SpyCheese --- validator/impl/collator.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 2f4a002a0..86ee1cf46 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -709,9 +709,6 @@ bool Collator::unpack_last_mc_state() { return fatal_error(limits.move_as_error()); } block_limits_ = limits.move_as_ok(); - if (now_ > prev_now_ + 15 && block_limits_->lt_delta.hard() > 200) { - block_limits_->lt_delta = {20, 180, 200}; - } LOG(DEBUG) << "block limits: bytes [" << block_limits_->bytes.underload() << ", " << block_limits_->bytes.soft() << ", " << block_limits_->bytes.hard() << "]"; LOG(DEBUG) << "block limits: gas [" << block_limits_->gas.underload() << ", " << block_limits_->gas.soft() << ", " @@ -1433,6 +1430,9 @@ bool Collator::check_this_shard_mc_info() { bool Collator::init_block_limits() { CHECK(block_limits_); CHECK(state_usage_tree_); + if (now_ > prev_now_ + 15 && block_limits_->lt_delta.hard() > 200) { + block_limits_->lt_delta = {20, 180, 200}; + } block_limits_->usage_tree = state_usage_tree_.get(); block_limit_status_ = std::make_unique(*block_limits_); return true;