Skip to content

Commit

Permalink
Merge pull request #821 from skalenetwork/813_eth_sync
Browse files Browse the repository at this point in the history
813 eth sync
  • Loading branch information
kladkogex authored Dec 1, 2023
2 parents 79d6b7f + 952e389 commit 46a4106
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 106 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/clang-format-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ jobs:
./libjson-rpc-cpp ./spdlog ./sgxwallet ./scripts ./run_sgx_test ./libzmq ./thirdparty
./cppzmq'
extensions: 'h,hpp,hxx,cpp,cxx,cc,ipp'
clangFormatVersion: 10
clangFormatVersion: 11
inplace: True
69 changes: 62 additions & 7 deletions catchup/client/CatchupClientAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
@date 2018
*/

#include "SkaleCommon.h"

#include "thirdparty/json.hpp"
#include "SkaleCommon.h"
#include "Log.h"
#include "datastructures/PeerStateInfo.h"
#include "exceptions/ExitRequestedException.h"
#include "exceptions/FatalError.h"

#include "thirdparty/json.hpp"

#include "abstracttcpserver/ConnectionStatus.h"
#include "crypto/CryptoManager.h"

Expand Down Expand Up @@ -59,6 +57,12 @@ CatchupClientAgent::CatchupClientAgent( Schain& _sChain ) : Agent( _sChain, fals
this->catchupClientThreadPool = make_shared< CatchupClientThreadPool >( 1, this );
catchupClientThreadPool->startService();
}

for (int i = 0; i < _sChain.getNodeCount(); i++) {
// init peer state infos
peerStateInfos.push_back(nullptr);
}

} catch ( ExitRequestedException& ) {
throw;
} catch ( ... ) {
Expand Down Expand Up @@ -128,6 +132,14 @@ nlohmann::json CatchupClientAgent::readCatchupResponseHeader(
throw_with_nested( NetworkProtocolException( errString, __CLASS_NAME__ ) );
}

// now see if peerinfo information returned by the peer
ptr<PeerStateInfo> peerStateInfo = PeerStateInfo::extract(response);

if (peerStateInfo) {
// update the info on the peer
WRITE_LOCK(peerStateInfosMutex)
peerStateInfos.at((uint64_t)_dstIndex - 1) = peerStateInfo;
}

LOG( debug, "Catchupc step 2: read catchup response requestHeader" );

Expand All @@ -138,15 +150,15 @@ nlohmann::json CatchupClientAgent::readCatchupResponseHeader(
return 0;
}


if ( status != CONNECTION_PROCEED ) {
BOOST_THROW_EXCEPTION( NetworkProtocolException(
"Server error in catchup response:" + to_string( status ), __CLASS_NAME__ ) );
}


ptr< CommittedBlockList > blocks;

// Starting blockdownload, update last starting block
lastStartingBlock = getSchain()->getLastCommittedBlockID();

try {
blocks = readMissingBlocks( socket, response, requestHeader );
Expand Down Expand Up @@ -342,3 +354,46 @@ schain_index CatchupClientAgent::nextSyncNodeIndex(

return index + 1;
}


constexpr uint64_t ALLOWED_CATCHUP_DELAY_S = 30;

[[nodiscard]] ConsensusInterface::SyncInfo CatchupClientAgent::getSyncInfo() {

ConsensusInterface::SyncInfo syncInfo;

// If the last block timestamp is less than 30 seconds in the past
// we always set is_syncing to false. This is is to prevent nodes that
// are just a little behing to flip back and forth into is_syncing
if (Time::getCurrentTimeSec() < getSchain()->getLastCommittedBlockTimeStamp().getS() + ALLOWED_CATCHUP_DELAY_S) {
return syncInfo;
}

// find the maximum block on peer nodes
uint64_t highestBlock = 0;
{
READ_LOCK(peerStateInfosMutex)
for (auto&& item : peerStateInfos) {
if (item && item->getLastBlockId() > highestBlock) {
highestBlock = (uint64_t) item->getLastBlockId();
}
}
}

// no peer has a block larger than last committed block
// return is_syncing false
if (highestBlock <= getSchain()->getLastCommittedBlockID()) {
return syncInfo;
}

// we are syncing since we are more than 60 seconds behind and other nodes have
// more blocks. Set info and return

syncInfo.isSyncing = true;
syncInfo.highestBlock = highestBlock;
syncInfo.currentBlock = (uint64_t) getSchain()->getLastCommittedBlockID();
syncInfo.startingBlock = (uint64_t) lastStartingBlock;

return syncInfo;

}
45 changes: 32 additions & 13 deletions catchup/client/CatchupClientAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,56 @@

#pragma once


class CommittedBlockList;

class ClientSocket;

class Schain;

class CatchupClientThreadPool;

class CatchupRequestHeader;

class CatchupResponseHeader;

class PeerStateInfo;


class CatchupClientAgent : public Agent {
ptr< CatchupClientThreadPool > catchupClientThreadPool = nullptr;

ptr<CatchupClientThreadPool> catchupClientThreadPool = nullptr;

// vector of information on the state of peer nodes
vector<ptr<PeerStateInfo>> peerStateInfos;
shared_mutex peerStateInfosMutex;

// last catchup starting block
block_id lastStartingBlock;

public:
explicit CatchupClientAgent( Schain& _sChain );
explicit CatchupClientAgent(Schain &_sChain);

[[nodiscard]] uint64_t sync(schain_index _dstIndex);

[[nodiscard]] uint64_t sync( schain_index _dstIndex );
static void workerThreadItemSendLoop(CatchupClientAgent *_agent);

[[nodiscard]] nlohmann::json readCatchupResponseHeader(
const ptr<ClientSocket> &_socket, ptr<CatchupRequestHeader> _requestHeader);

static void workerThreadItemSendLoop( CatchupClientAgent* _agent );

nlohmann::json readCatchupResponseHeader(
const ptr< ClientSocket >& _socket, ptr< CatchupRequestHeader > _requestHeader );
[[nodiscard]] ptr<CommittedBlockList> readMissingBlocks(ptr<ClientSocket> &_socket,
nlohmann::json &_responseHeader,
ptr<CatchupRequestHeader> _requestHeader);


ptr< CommittedBlockList > readMissingBlocks( ptr< ClientSocket >& _socket,
nlohmann::json& _responseHeader, ptr< CatchupRequestHeader > _requestHeader );
[[nodiscard]] size_t parseBlockSizes(nlohmann::json _responseHeader,
const ptr<vector<uint64_t> > &_blockSizes,
ptr<CatchupRequestHeader> _requestHeader);

[[nodiscard]] block_id getMaxKnownBlockId();

size_t parseBlockSizes( nlohmann::json _responseHeader,
const ptr< vector< uint64_t > >& _blockSizes, ptr< CatchupRequestHeader > _requestHeader );
[[nodiscard]] static schain_index nextSyncNodeIndex(
const CatchupClientAgent *_agent, schain_index _destinationSchainIndex);

static schain_index nextSyncNodeIndex(
const CatchupClientAgent* _agent, schain_index _destinationSchainIndex );
[[nodiscard]] ConsensusInterface::SyncInfo getSyncInfo();
};
Loading

0 comments on commit 46a4106

Please sign in to comment.