From 91f69dba01bb01b680f5d6d9d17830a611553101 Mon Sep 17 00:00:00 2001 From: Stan Kladko <13399135+kladkogex@users.noreply.github.com> Date: Thu, 30 Nov 2023 19:26:39 +0000 Subject: [PATCH 1/6] #813 eth_sync --- .github/workflows/clang-format-check.yml | 3 +- catchup/server/CatchupServerAgent.cpp | 74 ++++++++++++------------ headers/CatchupResponseHeader.cpp | 33 +++++------ headers/CatchupResponseHeader.h | 21 +++---- 4 files changed, 66 insertions(+), 65 deletions(-) diff --git a/.github/workflows/clang-format-check.yml b/.github/workflows/clang-format-check.yml index f3d4a394..92e88713 100644 --- a/.github/workflows/clang-format-check.yml +++ b/.github/workflows/clang-format-check.yml @@ -14,4 +14,5 @@ jobs: ./libjson-rpc-cpp ./spdlog ./sgxwallet ./scripts ./run_sgx_test ./libzmq ./thirdparty ./cppzmq' extensions: 'h,hpp,hxx,cpp,cxx,cc,ipp' - clangFormatVersion: 10 + clangFormatVersion: 11 + inplace: True \ No newline at end of file diff --git a/catchup/server/CatchupServerAgent.cpp b/catchup/server/CatchupServerAgent.cpp index 18f7a2a4..041dc4ec 100644 --- a/catchup/server/CatchupServerAgent.cpp +++ b/catchup/server/CatchupServerAgent.cpp @@ -66,7 +66,7 @@ #include "CatchupServerAgent.h" CatchupServerAgent::CatchupServerAgent( Schain& _schain, const ptr< TCPServerSocket >& _s ) - : AbstractServerAgent( "CatchupServer", _schain, _s ) { + : AbstractServerAgent( "CatchupServer", _schain, _s ) { CHECK_ARGUMENT( _s ); catchupWorkerThreadPool = make_shared< CatchupWorkerThreadPool >( num_threads( 2 ), this ); catchupWorkerThreadPool->startService(); @@ -77,7 +77,7 @@ CatchupServerAgent::~CatchupServerAgent() {} void CatchupServerAgent::processNextAvailableConnection( - const ptr< ServerConnection >& _connection ) { + const ptr< ServerConnection >& _connection ) { MONITOR( __CLASS_NAME__, __FUNCTION__ ); CHECK_ARGUMENT( _connection ); @@ -97,7 +97,7 @@ void CatchupServerAgent::processNextAvailableConnection( try { jsonRequest = sChain->getIo()->readJsonHeader( - _connection->getDescriptor(), "Read catchup request", 10, _connection->getIP() ); + _connection->getDescriptor(), "Read catchup request", 10, _connection->getIP() ); } catch ( ExitRequestedException& ) { throw; } catch ( ... ) { @@ -116,14 +116,14 @@ void CatchupServerAgent::processNextAvailableConnection( responseHeader = make_shared< BlockFinalizeResponseHeader >(); } else { BOOST_THROW_EXCEPTION( - InvalidMessageFormatException( "Unknown request type:" + type, __CLASS_NAME__ ) ); + InvalidMessageFormatException( "Unknown request type:" + type, __CLASS_NAME__ ) ); } ptr< vector< uint8_t > > serializedBinary = nullptr; try { serializedBinary = - this->createResponseHeaderAndBinary( _connection, jsonRequest, responseHeader ); + this->createResponseHeaderAndBinary( _connection, jsonRequest, responseHeader ); } catch ( ExitRequestedException& ) { throw; } catch ( ... ) { @@ -136,7 +136,7 @@ void CatchupServerAgent::processNextAvailableConnection( } catch ( ... ) { } throw_with_nested( CouldNotSendMessageException( - "Could not create catchup response header", __CLASS_NAME__ ) ); + "Could not create catchup response header", __CLASS_NAME__ ) ); } @@ -146,7 +146,7 @@ void CatchupServerAgent::processNextAvailableConnection( throw; } catch ( ... ) { throw_with_nested( - CouldNotSendMessageException( "Could not send response", __CLASS_NAME__ ) ); + CouldNotSendMessageException( "Could not send response", __CLASS_NAME__ ) ); } @@ -164,7 +164,7 @@ void CatchupServerAgent::processNextAvailableConnection( throw; } catch ( ... ) { throw_with_nested( - CouldNotSendMessageException( "Could not send serialized binary", __CLASS_NAME__ ) ); + CouldNotSendMessageException( "Could not send serialized binary", __CLASS_NAME__ ) ); } LOG( debug, "Server step 3: response completed: blocks sent" ); @@ -174,8 +174,8 @@ void CatchupServerAgent::processNextAvailableConnection( ptr< vector< uint8_t > > CatchupServerAgent::createResponseHeaderAndBinary( - const ptr< ServerConnection >&, nlohmann::json _jsonRequest, - const ptr< Header >& _responseHeader ) { + const ptr< ServerConnection >&, nlohmann::json _jsonRequest, + const ptr< Header >& _responseHeader ) { CHECK_ARGUMENT( _responseHeader ); try { @@ -186,9 +186,9 @@ ptr< vector< uint8_t > > CatchupServerAgent::createResponseHeaderAndBinary( if ( ( uint64_t ) sChain->getSchainID() != schainID ) { _responseHeader->setStatusSubStatus( - CONNECTION_ERROR, CONNECTION_ERROR_UNKNOWN_SCHAIN_ID ); + CONNECTION_ERROR, CONNECTION_ERROR_UNKNOWN_SCHAIN_ID ); BOOST_THROW_EXCEPTION( InvalidSchainException( - "Incorrect schain " + to_string( schainID ), __CLASS_NAME__ ) ); + "Incorrect schain " + to_string( schainID ), __CLASS_NAME__ ) ); }; @@ -198,22 +198,22 @@ ptr< vector< uint8_t > > CatchupServerAgent::createResponseHeaderAndBinary( if ( type.compare( Header::BLOCK_CATCHUP_REQ ) == 0 ) { serializedBinary = createBlockCatchupResponse( _jsonRequest, - dynamic_pointer_cast< CatchupResponseHeader >( _responseHeader ), blockID ); + dynamic_pointer_cast< CatchupResponseHeader >( _responseHeader ), blockID ); } else if ( type.compare( Header::BLOCK_FINALIZE_REQ ) == 0 ) { ptr< NodeInfo > nmi = sChain->getNode()->getNodeInfoById( nodeID ); if ( nmi == nullptr ) { _responseHeader->setStatusSubStatus( - CONNECTION_ERROR, CONNECTION_ERROR_DONT_KNOW_THIS_NODE ); + CONNECTION_ERROR, CONNECTION_ERROR_DONT_KNOW_THIS_NODE ); BOOST_THROW_EXCEPTION( InvalidNodeIDException( - "Could not find node info for NODE_ID:" + to_string( ( uint64_t ) nodeID ), - __CLASS_NAME__ ) ); + "Could not find node info for NODE_ID:" + to_string( ( uint64_t ) nodeID ), + __CLASS_NAME__ ) ); } serializedBinary = createBlockFinalizeResponse( _jsonRequest, - dynamic_pointer_cast< BlockFinalizeResponseHeader >( _responseHeader ), blockID ); + dynamic_pointer_cast< BlockFinalizeResponseHeader >( _responseHeader ), blockID ); } @@ -227,8 +227,8 @@ ptr< vector< uint8_t > > CatchupServerAgent::createResponseHeaderAndBinary( ptr< vector< uint8_t > > CatchupServerAgent::createBlockCatchupResponse( - nlohmann::json /*_jsonRequest */, const ptr< CatchupResponseHeader >& _responseHeader, - block_id _blockID ) { + nlohmann::json /*_jsonRequest */, const ptr< CatchupResponseHeader >& _responseHeader, + block_id _blockID ) { CHECK_ARGUMENT( _responseHeader ); MONITOR( __CLASS_NAME__, __FUNCTION__ ); @@ -246,10 +246,11 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockCatchupResponse( auto blockSizes = make_shared< list< uint64_t > >(); - auto committedBlockID = sChain->getLastCommittedBlockID(); + auto lastCommittedBlockID = sChain->getLastCommittedBlockID(); + auto lastCommittedBlockTimestampS = sChain->getLastCommittedBlockTimeStamp().getS(); - if ( _blockID >= committedBlockID ) { - LOG( debug, "Catchups: blockID >= committedBlockID" ); + if ( _blockID >= lastCommittedBlockID ) { + LOG( debug, "Catchups: blockID >= lastCommittedBlockID" ); _responseHeader->setStatusSubStatus( CONNECTION_DISCONNECT, CONNECTION_OK ); _responseHeader->setComplete(); return nullptr; @@ -257,15 +258,15 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockCatchupResponse( auto serializedBlocks = - getSchain()->getNode()->getBlockDB()->getSerializedBlocksFromLevelDB( - ( uint64_t ) _blockID + 1, committedBlockID, blockSizes ); + getSchain()->getNode()->getBlockDB()->getSerializedBlocksFromLevelDB( + ( uint64_t ) _blockID + 1, lastCommittedBlockID, blockSizes ); CHECK_STATE( blockSizes->size() > 0 ); if ( serializedBlocks == nullptr ) { _responseHeader->setStatusSubStatus( - CONNECTION_DISCONNECT, CONNECTION_CATCHUP_DONT_HAVE_THIS_BLOCK ); + CONNECTION_DISCONNECT, CONNECTION_CATCHUP_DONT_HAVE_THIS_BLOCK ); _responseHeader->setComplete(); return nullptr; } @@ -273,7 +274,8 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockCatchupResponse( _responseHeader->setStatusSubStatus( CONNECTION_PROCEED, CONNECTION_OK ); - _responseHeader->setBlockSizes( blockSizes ); + _responseHeader->setBlockSizesAndLatestBlockInfo( + blockSizes, lastCommittedBlockID, lastCommittedBlockTimestampS ); auto responseTimeMs = Time::getCurrentTimeMs() - responseStartTimeMs; @@ -289,8 +291,8 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockCatchupResponse( ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse( - nlohmann::json _jsonRequest, const ptr< BlockFinalizeResponseHeader >& _responseHeader, - block_id _blockID ) { + nlohmann::json _jsonRequest, const ptr< BlockFinalizeResponseHeader >& _responseHeader, + block_id _blockID ) { CHECK_ARGUMENT( _responseHeader ); MONITOR( __CLASS_NAME__, __FUNCTION__ ); @@ -301,7 +303,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse( if ( fragmentIndex < 1 || ( uint64_t ) fragmentIndex > getSchain()->getNodeCount() - 1 ) { LOG( debug, "Incorrect fragment index:" << to_string( fragmentIndex ) ); _responseHeader->setStatusSubStatus( - CONNECTION_DISCONNECT, CONNECTION_ERROR_INVALID_FRAGMENT_INDEX ); + CONNECTION_DISCONNECT, CONNECTION_ERROR_INVALID_FRAGMENT_INDEX ); _responseHeader->setComplete(); return nullptr; } @@ -313,7 +315,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse( if ( proposerIndex < 1 || ( uint64_t ) fragmentIndex > getSchain()->getNodeCount() ) { LOG( debug, "Incorrect proposer index:" << to_string( proposerIndex ) ); _responseHeader->setStatusSubStatus( - CONNECTION_DISCONNECT, CONNECTION_ERROR_INVALID_PROPOSER_INDEX ); + CONNECTION_DISCONNECT, CONNECTION_ERROR_INVALID_PROPOSER_INDEX ); _responseHeader->setComplete(); return nullptr; } @@ -322,7 +324,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse( // We could have either a proposal or a committed block. Try proposal first. auto proposal = getSchain()->getNode()->getBlockProposalDB()->getBlockProposal( - _blockID, proposerIndex ); + _blockID, proposerIndex ); string daSig; @@ -340,7 +342,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse( // since at this time we already not which index has been committed if ( committedBlock->getProposerIndex() != ( uint64_t ) proposerIndex ) { _responseHeader->setStatusSubStatus( CONNECTION_DISCONNECT, - CONNECTION_FINALIZER_CLIENT_ASKING_FOR_INCORRECT_PROPOSER_INDEX ); + CONNECTION_FINALIZER_CLIENT_ASKING_FOR_INCORRECT_PROPOSER_INDEX ); _responseHeader->setComplete(); return nullptr; } @@ -348,20 +350,20 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse( daSig = committedBlock->getDaSig(); } else { _responseHeader->setStatusSubStatus( - CONNECTION_DISCONNECT, CONNECTION_FINALIZE_DONT_HAVE_PROPOSAL ); + CONNECTION_DISCONNECT, CONNECTION_FINALIZE_DONT_HAVE_PROPOSAL ); _responseHeader->setComplete(); return nullptr; } } else { daSig = getNode()->getDaProofDB()->getDASig( - proposal->getBlockID(), proposal->getProposerIndex() ); + proposal->getBlockID(), proposal->getProposerIndex() ); } CHECK_STATE( !daSig.empty() ); auto fragment = - proposal->getFragment( ( uint64_t ) getSchain()->getNodeCount() - 1, fragmentIndex ); + proposal->getFragment( ( uint64_t ) getSchain()->getNodeCount() - 1, fragmentIndex ); CHECK_STATE( fragment ); @@ -373,7 +375,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse( CHECK_STATE( serializedFragment ); _responseHeader->setFragmentParams( serializedFragment->size(), - proposal->serializeProposal()->size(), proposal->getHash().toHex(), daSig ); + proposal->serializeProposal()->size(), proposal->getHash().toHex(), daSig ); return serializedFragment; } catch ( ExitRequestedException& e ) { diff --git a/headers/CatchupResponseHeader.cpp b/headers/CatchupResponseHeader.cpp index 5e410fd8..5954e8c0 100644 --- a/headers/CatchupResponseHeader.cpp +++ b/headers/CatchupResponseHeader.cpp @@ -34,31 +34,28 @@ using namespace std; -CatchupResponseHeader::CatchupResponseHeader() : Header( Header::BLOCK_CATCHUP_RSP ) {} +CatchupResponseHeader::CatchupResponseHeader() : Header(Header::BLOCK_CATCHUP_RSP) {} -void CatchupResponseHeader::setBlockSizes( const ptr< list< uint64_t > >& _blockSizes ) { - CHECK_ARGUMENT( _blockSizes ); - - blockCount = _blockSizes->size(); +void CatchupResponseHeader::setBlockSizesAndLatestBlockInfo( + const ptr > &_blockSizes, block_id _lastCommittedBlockId, + uint64_t _lastCommittedBlockTimestampS) { + CHECK_ARGUMENT(_blockSizes); + CHECK_STATE(!complete) blockSizes = _blockSizes; - + lastCommittedBlockId = (uint64_t) _lastCommittedBlockId; + lastCommittedBlockTimestampS = _lastCommittedBlockTimestampS; complete = true; } -void CatchupResponseHeader::addFields( nlohmann::json& _j ) { - Header::addFields( _j ); +void CatchupResponseHeader::addFields(nlohmann::json &_j) { + Header::addFields(_j); - _j["count"] = blockCount; - if ( blockSizes != nullptr ) - _j["sizes"] = *blockSizes; -} + _j["count"] = blockSizes ? blockSizes->size() : 0; + _j["lastBid"] = lastCommittedBlockId; + _j["lastTs"] = lastCommittedBlockTimestampS; -uint64_t CatchupResponseHeader::getBlockCount() const { - return blockCount; -} - -void CatchupResponseHeader::setBlockCount( uint64_t _blockCount ) { - CatchupResponseHeader::blockCount = _blockCount; + if (blockSizes != nullptr) + _j["sizes"] = *blockSizes; } diff --git a/headers/CatchupResponseHeader.h b/headers/CatchupResponseHeader.h index cc62a377..7bb927ca 100644 --- a/headers/CatchupResponseHeader.h +++ b/headers/CatchupResponseHeader.h @@ -31,21 +31,22 @@ class Schain; class Transaction; class CatchupResponseHeader : public Header { public: - [[nodiscard]] uint64_t getBlockCount() const; - void setBlockCount( uint64_t blockCount ); - -private: - uint64_t blockCount = 0; - - ptr< list< uint64_t > > blockSizes = nullptr; - -public: CatchupResponseHeader(); explicit CatchupResponseHeader( const ptr< list< uint64_t > > _blockSizes ); - void setBlockSizes( const ptr< list< uint64_t > >& _blockSizes ); + void setBlockSizesAndLatestBlockInfo( const ptr< list< uint64_t > >& _blockSizes, + block_id _lastCommittedBlockId, uint64_t _lastCommittedBlockTimestampS ); void addFields( nlohmann::basic_json<>& j_ ) override; + +private: + + ptr< list< uint64_t > > blockSizes = nullptr; + + // latest block known to this consensus instance and its + uint64_t lastCommittedBlockId = 0; + uint64_t lastCommittedBlockTimestampS = 0; + }; From 88e30855611e0e4b90e5cb3220158e71df30b592 Mon Sep 17 00:00:00 2001 From: Stan Kladko <13399135+kladkogex@users.noreply.github.com> Date: Thu, 30 Nov 2023 20:47:03 +0000 Subject: [PATCH 2/6] #813 eth_sync --- catchup/client/CatchupClientAgent.cpp | 25 ++++++++---- catchup/client/CatchupClientAgent.h | 7 +++- datastructures/PeerStateInfo.cpp | 59 +++++++++++++++++++++++++++ datastructures/PeerStateInfo.h | 42 +++++++++++++++++++ 4 files changed, 124 insertions(+), 9 deletions(-) create mode 100644 datastructures/PeerStateInfo.cpp create mode 100644 datastructures/PeerStateInfo.h diff --git a/catchup/client/CatchupClientAgent.cpp b/catchup/client/CatchupClientAgent.cpp index ebf52356..5dc0b7be 100644 --- a/catchup/client/CatchupClientAgent.cpp +++ b/catchup/client/CatchupClientAgent.cpp @@ -21,14 +21,12 @@ @date 2018 */ -#include "SkaleCommon.h" +#include "thirdparty/json.hpp" +#include "SkaleCommon.h" #include "Log.h" +#include "datastructures/PeerStateInfo.h" #include "exceptions/ExitRequestedException.h" -#include "exceptions/FatalError.h" - -#include "thirdparty/json.hpp" - #include "abstracttcpserver/ConnectionStatus.h" #include "crypto/CryptoManager.h" @@ -59,6 +57,12 @@ CatchupClientAgent::CatchupClientAgent( Schain& _sChain ) : Agent( _sChain, fals this->catchupClientThreadPool = make_shared< CatchupClientThreadPool >( 1, this ); catchupClientThreadPool->startService(); } + + for (int i = 0; i < _sChain.getNodeCount(); i++) { + // init peer state infos + peerStateInfos.push_back(nullptr); + } + } catch ( ExitRequestedException& ) { throw; } catch ( ... ) { @@ -128,6 +132,14 @@ nlohmann::json CatchupClientAgent::readCatchupResponseHeader( throw_with_nested( NetworkProtocolException( errString, __CLASS_NAME__ ) ); } + // now see if peerinfo information returned by the peer + ptr peerStateInfo = PeerStateInfo::extract(response); + + if (peerStateInfo) { + // update the info + LOCK(peerStateInfosMutex) + peerStateInfos.at((uint64_t)_dstIndex) = peerStateInfo; + } LOG( debug, "Catchupc step 2: read catchup response requestHeader" ); @@ -138,16 +150,13 @@ nlohmann::json CatchupClientAgent::readCatchupResponseHeader( return 0; } - if ( status != CONNECTION_PROCEED ) { BOOST_THROW_EXCEPTION( NetworkProtocolException( "Server error in catchup response:" + to_string( status ), __CLASS_NAME__ ) ); } - ptr< CommittedBlockList > blocks; - try { blocks = readMissingBlocks( socket, response, requestHeader ); diff --git a/catchup/client/CatchupClientAgent.h b/catchup/client/CatchupClientAgent.h index 13813d85..cb4bd797 100644 --- a/catchup/client/CatchupClientAgent.h +++ b/catchup/client/CatchupClientAgent.h @@ -23,17 +23,22 @@ #pragma once - class CommittedBlockList; class ClientSocket; class Schain; class CatchupClientThreadPool; class CatchupRequestHeader; class CatchupResponseHeader; +class PeerStateInfo; + class CatchupClientAgent : public Agent { + ptr< CatchupClientThreadPool > catchupClientThreadPool = nullptr; + vector> peerStateInfos; + recursive_mutex peerStateInfosMutex; + public: explicit CatchupClientAgent( Schain& _sChain ); diff --git a/datastructures/PeerStateInfo.cpp b/datastructures/PeerStateInfo.cpp new file mode 100644 index 00000000..c7ed9dff --- /dev/null +++ b/datastructures/PeerStateInfo.cpp @@ -0,0 +1,59 @@ +/* + Copyright (C) 2023- SKALE Labs + + This file is part of skale-consensus. + + skale-consensus is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + skale-consensus is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with skale-consensus. If not, see . + +*/ + +#include "thirdparty/json.hpp" +#include "SkaleCommon.h" +#include "PeerStateInfo.h" + +const block_id &PeerStateInfo::getLastBlockId() const { + return lastBlockId; +} + +uint64_t PeerStateInfo::getLastBlockTimestampS() const { + return lastBlockTimestampS; +} + +PeerStateInfo::PeerStateInfo(const block_id &lastBlockId, uint64_t lastBlockTimestampS) : lastBlockId(lastBlockId), + lastBlockTimestampS( + lastBlockTimestampS) { + +} + +// extract PeerStateInfo object from catchup response header +ptr PeerStateInfo::extract(nlohmann::json _catchupResponseHeasder) { + + uint64_t lastBid = 0; + uint64_t lastTs = 0; + + if (_catchupResponseHeasder.find("lastBid") != _catchupResponseHeasder.end()) { + lastBid = _catchupResponseHeasder.at("lastBid").get(); + } + + if (_catchupResponseHeasder.find("lastTs") != _catchupResponseHeasder.end()) { + lastTs = _catchupResponseHeasder.at("lastTs").get(); + } + + if (lastBid > 0 && lastTs > 0) { + return make_shared(block_id(lastBid), lastTs); + } else { + // the node did not provide info + return nullptr; + } +} diff --git a/datastructures/PeerStateInfo.h b/datastructures/PeerStateInfo.h new file mode 100644 index 00000000..1a08482c --- /dev/null +++ b/datastructures/PeerStateInfo.h @@ -0,0 +1,42 @@ +/* + Copyright (C) 2023- SKALE Labs + + This file is part of skale-consensus. + + skale-consensus is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + skale-consensus is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with skale-consensus. If not, see . + +*/ + + +#pragma once + + + +class PeerStateInfo { + + block_id lastBlockId = 0; + uint64_t lastBlockTimestampS = 0; + +public: + const block_id &getLastBlockId() const; + + uint64_t getLastBlockTimestampS() const; + + PeerStateInfo(const block_id &lastBlockId, uint64_t lastBlockTimestampS); + + static ptr extract(nlohmann::json _catchupResponseHeasder); + +}; + + From 40a74a0c9d0e71d076abffcbe7f9e854e03225ef Mon Sep 17 00:00:00 2001 From: Stan Kladko <13399135+kladkogex@users.noreply.github.com> Date: Thu, 30 Nov 2023 20:51:14 +0000 Subject: [PATCH 3/6] #813 eth_sync --- catchup/client/CatchupClientAgent.h | 34 ++++++++++++++++++----------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/catchup/client/CatchupClientAgent.h b/catchup/client/CatchupClientAgent.h index cb4bd797..ae6b61e7 100644 --- a/catchup/client/CatchupClientAgent.h +++ b/catchup/client/CatchupClientAgent.h @@ -24,41 +24,49 @@ #pragma once class CommittedBlockList; + class ClientSocket; + class Schain; + class CatchupClientThreadPool; + class CatchupRequestHeader; + class CatchupResponseHeader; + class PeerStateInfo; class CatchupClientAgent : public Agent { - ptr< CatchupClientThreadPool > catchupClientThreadPool = nullptr; + ptr catchupClientThreadPool = nullptr; vector> peerStateInfos; recursive_mutex peerStateInfosMutex; public: - explicit CatchupClientAgent( Schain& _sChain ); - + explicit CatchupClientAgent(Schain &_sChain); - [[nodiscard]] uint64_t sync( schain_index _dstIndex ); + [[nodiscard]] uint64_t sync(schain_index _dstIndex); + static void workerThreadItemSendLoop(CatchupClientAgent *_agent); - static void workerThreadItemSendLoop( CatchupClientAgent* _agent ); + [[nodiscard]] nlohmann::json readCatchupResponseHeader( + const ptr &_socket, ptr _requestHeader); - nlohmann::json readCatchupResponseHeader( - const ptr< ClientSocket >& _socket, ptr< CatchupRequestHeader > _requestHeader ); + [[nodiscard]] ptr readMissingBlocks(ptr &_socket, + nlohmann::json &_responseHeader, + ptr _requestHeader); - ptr< CommittedBlockList > readMissingBlocks( ptr< ClientSocket >& _socket, - nlohmann::json& _responseHeader, ptr< CatchupRequestHeader > _requestHeader ); + [[nodiscard]] size_t parseBlockSizes(nlohmann::json _responseHeader, + const ptr > &_blockSizes, + ptr _requestHeader); - size_t parseBlockSizes( nlohmann::json _responseHeader, - const ptr< vector< uint64_t > >& _blockSizes, ptr< CatchupRequestHeader > _requestHeader ); + [[nodiscard]] block_id getMaxKnownBlockId(); - static schain_index nextSyncNodeIndex( - const CatchupClientAgent* _agent, schain_index _destinationSchainIndex ); + [[nodiscard]] static schain_index nextSyncNodeIndex( + const CatchupClientAgent *_agent, schain_index _destinationSchainIndex); }; From e6e53069f686c503451ef32a9b0b8d12bdc4ba45 Mon Sep 17 00:00:00 2001 From: Stan Kladko <13399135+kladkogex@users.noreply.github.com> Date: Fri, 1 Dec 2023 12:29:56 +0000 Subject: [PATCH 4/6] #813 eth_sync adding api function --- catchup/client/CatchupClientAgent.cpp | 5 +++ catchup/client/CatchupClientAgent.h | 2 ++ chains/Schain.cpp | 2 ++ chains/Schain.h | 3 ++ chains/SchainGettersSetters.cpp | 7 ++++ node/ConsensusEngine.cpp | 10 ++++++ node/ConsensusEngine.h | 6 ++++ node/ConsensusInterface.h | 48 +++++++++++++++------------ 8 files changed, 62 insertions(+), 21 deletions(-) diff --git a/catchup/client/CatchupClientAgent.cpp b/catchup/client/CatchupClientAgent.cpp index 5dc0b7be..b6d29f5e 100644 --- a/catchup/client/CatchupClientAgent.cpp +++ b/catchup/client/CatchupClientAgent.cpp @@ -351,3 +351,8 @@ schain_index CatchupClientAgent::nextSyncNodeIndex( return index + 1; } + + +[[nodiscard]] ConsensusInterface::SyncInfo CatchupClientAgent::getSyncInfo() { + return ConsensusInterface::SyncInfo(); +} \ No newline at end of file diff --git a/catchup/client/CatchupClientAgent.h b/catchup/client/CatchupClientAgent.h index ae6b61e7..f7037bd1 100644 --- a/catchup/client/CatchupClientAgent.h +++ b/catchup/client/CatchupClientAgent.h @@ -69,4 +69,6 @@ class CatchupClientAgent : public Agent { [[nodiscard]] static schain_index nextSyncNodeIndex( const CatchupClientAgent *_agent, schain_index _destinationSchainIndex); + + [[nodiscard]] ConsensusInterface::SyncInfo getSyncInfo(); }; diff --git a/chains/Schain.cpp b/chains/Schain.cpp index 690c4af2..e9e1d42a 100644 --- a/chains/Schain.cpp +++ b/chains/Schain.cpp @@ -1459,4 +1459,6 @@ uint64_t Schain::getVerifyDaSigsPatchTimestampS() const { } + + mutex Schain::vdsMutex; diff --git a/chains/Schain.h b/chains/Schain.h index 22c4fe5a..b3c8e2e5 100644 --- a/chains/Schain.h +++ b/chains/Schain.h @@ -387,4 +387,7 @@ class Schain : public Agent { bool verifyDASigsPatch( uint64_t _blockTimeStampSec ); void updateInternalChainInfo( block_id _lastCommittedBlockID ); + + const ptr &getCatchupClientAgent() const; + }; diff --git a/chains/SchainGettersSetters.cpp b/chains/SchainGettersSetters.cpp index 1eb402e1..fdafa28a 100644 --- a/chains/SchainGettersSetters.cpp +++ b/chains/SchainGettersSetters.cpp @@ -335,4 +335,11 @@ const ptr< OracleClient > Schain::getOracleClient() const { bool Schain::isInCreateBlock() const { return inCreateBlock; +} + + + +const ptr &Schain::getCatchupClientAgent() const { + CHECK_STATE(catchupClientAgent); + return catchupClientAgent; } \ No newline at end of file diff --git a/node/ConsensusEngine.cpp b/node/ConsensusEngine.cpp index 32cc3e34..d3b9f398 100644 --- a/node/ConsensusEngine.cpp +++ b/node/ConsensusEngine.cpp @@ -38,6 +38,7 @@ #include "exceptions/FatalError.h" #include "node/Node.h" #include "oracle/OracleClient.h" +#include "catchup/client/CatchupClientAgent.h" #include "thirdparty/json.hpp" #include "threads/GlobalThreadRegistry.h" @@ -1217,3 +1218,12 @@ ptr< vector< uint8_t > > ConsensusEngine::getSerializedBlock( std::uint64_t _blo const map< string, uint64_t >& ConsensusEngine::getPatchTimestamps() const { return patchTimestamps; } + + +[[nodiscard]] ConsensusInterface::SyncInfo ConsensusEngine::getSyncInfo() { + + CHECK_STATE( nodes.size() > 0 ); + + return nodes.begin()->second->getSchain()->getCatchupClientAgent()->getSyncInfo(); + +} \ No newline at end of file diff --git a/node/ConsensusEngine.h b/node/ConsensusEngine.h index a1a17b99..d5b0629f 100644 --- a/node/ConsensusEngine.h +++ b/node/ConsensusEngine.h @@ -306,4 +306,10 @@ class ConsensusEngine : public ConsensusInterface { std::shared_ptr< std::vector< std::uint8_t > > getSerializedBlock( std::uint64_t _blockNumber ); + + // return sync information as requested by eth_syncing API of geth + // if isSyncing is false, all fields will be set to zero. + + [[nodiscard]] SyncInfo getSyncInfo() override; + }; diff --git a/node/ConsensusInterface.h b/node/ConsensusInterface.h index 118b61ea..59dead12 100644 --- a/node/ConsensusInterface.h +++ b/node/ConsensusInterface.h @@ -43,15 +43,15 @@ enum consensus_engine_status { }; -using u256 = boost::multiprecision::number< boost::multiprecision::backends::cpp_int_backend< 256, - 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void > >; +using u256 = boost::multiprecision::number >; class ConsensusInterface { public: virtual ~ConsensusInterface() = default; virtual void parseFullConfigAndCreateNode( - const std::string& fullPathToConfigFile, const string& gethURL ) = 0; + const std::string &fullPathToConfigFile, const string &gethURL) = 0; // If starting from a snapshot, start all will pass to consensus the last comitted @@ -77,15 +77,15 @@ class ConsensusInterface { virtual void exitGracefully() = 0; - virtual u256 getPriceForBlockId( uint64_t _blockId ) const = 0; + virtual u256 getPriceForBlockId(uint64_t _blockId) const = 0; - virtual u256 getRandomForBlockId( uint64_t _blockId ) const = 0; + virtual u256 getRandomForBlockId(uint64_t _blockId) const = 0; - virtual map< string, uint64_t > getConsensusDbUsage() const = 0; + virtual map getConsensusDbUsage() const = 0; virtual uint64_t getEmptyBlockIntervalMs() const { return -1; } - virtual void setEmptyBlockIntervalMs( uint64_t ) {} + virtual void setEmptyBlockIntervalMs(uint64_t) {} virtual consensus_engine_status getStatus() const = 0; @@ -162,7 +162,7 @@ class ConsensusInterface { */ virtual uint64_t submitOracleRequest( - const string& _spec, string& _receipt, string& _errorMessage ) = 0; + const string &_spec, string &_receipt, string &_errorMessage) = 0; /* * Check if Oracle result has been derived. This will return ORACLE_SUCCESS if @@ -176,15 +176,21 @@ class ConsensusInterface { */ - virtual uint64_t checkOracleResult( const string& _receipt, string& _result ) = 0; + virtual uint64_t checkOracleResult(const string &_receipt, string &_result) = 0; - /* - * This will return a consensus block serialized as byte array from consensus db. - * Returns nullptr if the block is not in consensus DB - */ - // virtual std::shared_ptr> getSerializedBlock( - // std::uint64_t _blockNumber) = 0; + struct SyncInfo { + // sync information as required by eth_syncing API request of geth + bool isSyncing = false; + std::uint64_t startingBlock = 0; + std::uint64_t currentBlock = 0; + std::uint64_t highestBlock = 0; + }; + + // return sync information as requested by eth_syncing API of geth + // if isSyncing is false, all fields will be set to zero. + virtual SyncInfo getSyncInfo() = 0; + }; /** @@ -192,19 +198,19 @@ class ConsensusInterface { */ class ConsensusExtFace { public: - typedef std::vector< std::vector< uint8_t > > transactions_vector; + typedef std::vector > transactions_vector; // Returns hashes and bytes of new transactions as well as state root to put into block proposal - virtual transactions_vector pendingTransactions( size_t _limit, u256& _stateRoot ) = 0; + virtual transactions_vector pendingTransactions(size_t _limit, u256 &_stateRoot) = 0; // Creates new block with specified transactions AND removes them from the queue - virtual void createBlock( const transactions_vector& _approvedTransactions, uint64_t _timeStamp, - uint32_t _timeStampMillis, uint64_t _blockID, u256 _gasPrice, u256 _stateRoot, - uint64_t _winningNodeIndex ) = 0; + virtual void createBlock(const transactions_vector &_approvedTransactions, uint64_t _timeStamp, + uint32_t _timeStampMillis, uint64_t _blockID, u256 _gasPrice, u256 _stateRoot, + uint64_t _winningNodeIndex) = 0; virtual ~ConsensusExtFace() = default; - virtual void terminateApplication(){}; + virtual void terminateApplication() {}; }; #endif // CONSENSUSINTERFACE_H From b0bb6ef6d473dd0ca4c1ed75e6059cf0e9d2a7e4 Mon Sep 17 00:00:00 2001 From: Stan Kladko <13399135+kladkogex@users.noreply.github.com> Date: Fri, 1 Dec 2023 13:35:38 +0000 Subject: [PATCH 5/6] #813 eth_sync adding api function --- catchup/client/CatchupClientAgent.cpp | 49 ++++++++++++++++++++++++--- catchup/client/CatchupClientAgent.h | 6 +++- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/catchup/client/CatchupClientAgent.cpp b/catchup/client/CatchupClientAgent.cpp index b6d29f5e..ccdc4f5c 100644 --- a/catchup/client/CatchupClientAgent.cpp +++ b/catchup/client/CatchupClientAgent.cpp @@ -136,9 +136,9 @@ nlohmann::json CatchupClientAgent::readCatchupResponseHeader( ptr peerStateInfo = PeerStateInfo::extract(response); if (peerStateInfo) { - // update the info - LOCK(peerStateInfosMutex) - peerStateInfos.at((uint64_t)_dstIndex) = peerStateInfo; + // update the info on the peer + WRITE_LOCK(peerStateInfosMutex) + peerStateInfos.at((uint64_t)_dstIndex - 1) = peerStateInfo; } LOG( debug, "Catchupc step 2: read catchup response requestHeader" ); @@ -157,6 +157,9 @@ nlohmann::json CatchupClientAgent::readCatchupResponseHeader( ptr< CommittedBlockList > blocks; + // Starting blockdownload, update last starting block + lastStartingBlock = getSchain()->getLastCommittedBlockID(); + try { blocks = readMissingBlocks( socket, response, requestHeader ); @@ -353,6 +356,44 @@ schain_index CatchupClientAgent::nextSyncNodeIndex( } +constexpr uint64_t ALLOWED_CATCHUP_DELAY_S = 30; + [[nodiscard]] ConsensusInterface::SyncInfo CatchupClientAgent::getSyncInfo() { - return ConsensusInterface::SyncInfo(); + + ConsensusInterface::SyncInfo syncInfo; + + // If the last block timestamp is less than 30 seconds in the past + // we always set is_syncing to false. This is is to prevent nodes that + // are just a little behing to flip back and forth into is_syncing + if (Time::getCurrentTimeSec() < getSchain()->getLastCommittedBlockTimeStamp().getS() + ALLOWED_CATCHUP_DELAY_S) { + return syncInfo; + } + + // find the maximum block on peer nodes + uint64_t highestBlock = 0; + { + READ_LOCK(peerStateInfosMutex) + for (auto&& item : peerStateInfos) { + if (item && item->getLastBlockId() > highestBlock) { + highestBlock = (uint64_t) item->getLastBlockId(); + } + } + } + + // no peer has a block larger than last committed block + // return is_syncing false + if (highestBlock <= getSchain()->getLastCommittedBlockID()) { + return syncInfo; + } + + // we are syncing since we are more than 60 seconds behind and other nodes have + // more blocks. Set info and return + + syncInfo.isSyncing = true; + syncInfo.highestBlock = highestBlock; + syncInfo.currentBlock = (uint64_t) getSchain()->getLastCommittedBlockID(); + syncInfo.startingBlock = (uint64_t) lastStartingBlock; + + return syncInfo; + } \ No newline at end of file diff --git a/catchup/client/CatchupClientAgent.h b/catchup/client/CatchupClientAgent.h index f7037bd1..2516d6bd 100644 --- a/catchup/client/CatchupClientAgent.h +++ b/catchup/client/CatchupClientAgent.h @@ -42,8 +42,12 @@ class CatchupClientAgent : public Agent { ptr catchupClientThreadPool = nullptr; + // vector of information on the state of peer nodes vector> peerStateInfos; - recursive_mutex peerStateInfosMutex; + shared_mutex peerStateInfosMutex; + + // last catchup starting block + block_id lastStartingBlock; public: explicit CatchupClientAgent(Schain &_sChain); From 952e3890d620d55d56e32dc0fbd26637f23d0694 Mon Sep 17 00:00:00 2001 From: Stan Kladko <13399135+kladkogex@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:02:04 +0000 Subject: [PATCH 6/6] #813 eth_sync adding api function --- network/Network.cpp | 4 ++++ node/ConsensusInterface.h | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/network/Network.cpp b/network/Network.cpp index a09441bb..650dbfd2 100644 --- a/network/Network.cpp +++ b/network/Network.cpp @@ -25,6 +25,7 @@ #include "Log.h" #include "SkaleCommon.h" #include "blockproposal/pusher/BlockProposalClientAgent.h" +#include "catchup/client/CatchupClientAgent.h" #include "chains/Schain.h" #include "crypto/BLAKE3Hash.h" #include "crypto/ConsensusBLSSigShare.h" @@ -256,7 +257,10 @@ void Network::networkReadLoop() { auto msg = dynamic_pointer_cast< NetworkMessage >( m->getMessage() ); + // catchup test if ( msg->getBlockID() <= catchupBlocks ) { + auto syncInfo = getSchain()->getCatchupClientAgent()->getSyncInfo(); + cerr << "Sync Info:" << syncInfo.toString() << endl; continue; } diff --git a/node/ConsensusInterface.h b/node/ConsensusInterface.h index 59dead12..c51bd8ae 100644 --- a/node/ConsensusInterface.h +++ b/node/ConsensusInterface.h @@ -185,6 +185,11 @@ class ConsensusInterface { std::uint64_t startingBlock = 0; std::uint64_t currentBlock = 0; std::uint64_t highestBlock = 0; + + std::string toString() { + return std::to_string(isSyncing) + ":" + std::to_string(startingBlock) + ":" + + std::to_string(currentBlock) + ":" + std::to_string(highestBlock); + } }; // return sync information as requested by eth_syncing API of geth