Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge changes from develop #823

Merged
merged 11 commits into from
Dec 10, 2023
3 changes: 2 additions & 1 deletion .github/workflows/clang-format-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ jobs:
./libjson-rpc-cpp ./spdlog ./sgxwallet ./scripts ./run_sgx_test ./libzmq ./thirdparty
./cppzmq'
extensions: 'h,hpp,hxx,cpp,cxx,cc,ipp'
clangFormatVersion: 10
clangFormatVersion: 11
inplace: True
69 changes: 62 additions & 7 deletions catchup/client/CatchupClientAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
@date 2018
*/

#include "SkaleCommon.h"

#include "thirdparty/json.hpp"
#include "SkaleCommon.h"
#include "Log.h"
#include "datastructures/PeerStateInfo.h"
#include "exceptions/ExitRequestedException.h"
#include "exceptions/FatalError.h"

#include "thirdparty/json.hpp"

#include "abstracttcpserver/ConnectionStatus.h"
#include "crypto/CryptoManager.h"

Expand Down Expand Up @@ -59,6 +57,12 @@ CatchupClientAgent::CatchupClientAgent( Schain& _sChain ) : Agent( _sChain, fals
this->catchupClientThreadPool = make_shared< CatchupClientThreadPool >( 1, this );
catchupClientThreadPool->startService();
}

for (int i = 0; i < _sChain.getNodeCount(); i++) {
// init peer state infos
peerStateInfos.push_back(nullptr);
}

} catch ( ExitRequestedException& ) {
throw;
} catch ( ... ) {
Expand Down Expand Up @@ -128,6 +132,14 @@ nlohmann::json CatchupClientAgent::readCatchupResponseHeader(
throw_with_nested( NetworkProtocolException( errString, __CLASS_NAME__ ) );
}

// now see if peerinfo information returned by the peer
ptr<PeerStateInfo> peerStateInfo = PeerStateInfo::extract(response);

if (peerStateInfo) {
// update the info on the peer
WRITE_LOCK(peerStateInfosMutex)
peerStateInfos.at((uint64_t)_dstIndex - 1) = peerStateInfo;
}

LOG( debug, "Catchupc step 2: read catchup response requestHeader" );

Expand All @@ -138,15 +150,15 @@ nlohmann::json CatchupClientAgent::readCatchupResponseHeader(
return 0;
}


if ( status != CONNECTION_PROCEED ) {
BOOST_THROW_EXCEPTION( NetworkProtocolException(
"Server error in catchup response:" + to_string( status ), __CLASS_NAME__ ) );
}


ptr< CommittedBlockList > blocks;

// Starting blockdownload, update last starting block
lastStartingBlock = getSchain()->getLastCommittedBlockID();

try {
blocks = readMissingBlocks( socket, response, requestHeader );
Expand Down Expand Up @@ -342,3 +354,46 @@ schain_index CatchupClientAgent::nextSyncNodeIndex(

return index + 1;
}


constexpr uint64_t ALLOWED_CATCHUP_DELAY_S = 30;

[[nodiscard]] ConsensusInterface::SyncInfo CatchupClientAgent::getSyncInfo() {

ConsensusInterface::SyncInfo syncInfo;

// If the last block timestamp is less than 30 seconds in the past
// we always set is_syncing to false. This is is to prevent nodes that
// are just a little behing to flip back and forth into is_syncing
if (Time::getCurrentTimeSec() < getSchain()->getLastCommittedBlockTimeStamp().getS() + ALLOWED_CATCHUP_DELAY_S) {
return syncInfo;
}

// find the maximum block on peer nodes
uint64_t highestBlock = 0;
{
READ_LOCK(peerStateInfosMutex)
for (auto&& item : peerStateInfos) {
if (item && item->getLastBlockId() > highestBlock) {
highestBlock = (uint64_t) item->getLastBlockId();
}
}
}

// no peer has a block larger than last committed block
// return is_syncing false
if (highestBlock <= getSchain()->getLastCommittedBlockID()) {
return syncInfo;
}

// we are syncing since we are more than 60 seconds behind and other nodes have
// more blocks. Set info and return

syncInfo.isSyncing = true;
syncInfo.highestBlock = highestBlock;
syncInfo.currentBlock = (uint64_t) getSchain()->getLastCommittedBlockID();
syncInfo.startingBlock = (uint64_t) lastStartingBlock;

return syncInfo;

}
45 changes: 32 additions & 13 deletions catchup/client/CatchupClientAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,56 @@

#pragma once


class CommittedBlockList;

class ClientSocket;

class Schain;

class CatchupClientThreadPool;

class CatchupRequestHeader;

class CatchupResponseHeader;

class PeerStateInfo;


class CatchupClientAgent : public Agent {
ptr< CatchupClientThreadPool > catchupClientThreadPool = nullptr;

ptr<CatchupClientThreadPool> catchupClientThreadPool = nullptr;

// vector of information on the state of peer nodes
vector<ptr<PeerStateInfo>> peerStateInfos;
shared_mutex peerStateInfosMutex;

// last catchup starting block
block_id lastStartingBlock;

public:
explicit CatchupClientAgent( Schain& _sChain );
explicit CatchupClientAgent(Schain &_sChain);

[[nodiscard]] uint64_t sync(schain_index _dstIndex);

[[nodiscard]] uint64_t sync( schain_index _dstIndex );
static void workerThreadItemSendLoop(CatchupClientAgent *_agent);

[[nodiscard]] nlohmann::json readCatchupResponseHeader(
const ptr<ClientSocket> &_socket, ptr<CatchupRequestHeader> _requestHeader);

static void workerThreadItemSendLoop( CatchupClientAgent* _agent );

nlohmann::json readCatchupResponseHeader(
const ptr< ClientSocket >& _socket, ptr< CatchupRequestHeader > _requestHeader );
[[nodiscard]] ptr<CommittedBlockList> readMissingBlocks(ptr<ClientSocket> &_socket,
nlohmann::json &_responseHeader,
ptr<CatchupRequestHeader> _requestHeader);


ptr< CommittedBlockList > readMissingBlocks( ptr< ClientSocket >& _socket,
nlohmann::json& _responseHeader, ptr< CatchupRequestHeader > _requestHeader );
[[nodiscard]] size_t parseBlockSizes(nlohmann::json _responseHeader,
const ptr<vector<uint64_t> > &_blockSizes,
ptr<CatchupRequestHeader> _requestHeader);

[[nodiscard]] block_id getMaxKnownBlockId();

size_t parseBlockSizes( nlohmann::json _responseHeader,
const ptr< vector< uint64_t > >& _blockSizes, ptr< CatchupRequestHeader > _requestHeader );
[[nodiscard]] static schain_index nextSyncNodeIndex(
const CatchupClientAgent *_agent, schain_index _destinationSchainIndex);

static schain_index nextSyncNodeIndex(
const CatchupClientAgent* _agent, schain_index _destinationSchainIndex );
[[nodiscard]] ConsensusInterface::SyncInfo getSyncInfo();
};
Loading
Loading