Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snap): add retry mechanism; add extra configurability; fix snap state chunk msg encoding; other minor fixes #2900

Draft
wants to merge 2 commits into
base: vovchyk/snap/extra-checks
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rskj-core/src/main/java/co/rsk/RskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,7 @@ private SnapshotProcessor getSnapshotProcessor() {
new ValidGasUsedRule()
),
getRskSystemProperties().getSnapshotChunkSize(),
getRskSystemProperties().checkHistoricalHeaders(),
getRskSystemProperties().isSnapshotParallelEnabled()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class RskSystemProperties extends SystemProperties {
public static final String USE_PEERS_FROM_LAST_SESSION = "peer.discovery.usePeersFromLastSession";

public static final String PROPERTY_SNAP_CLIENT_ENABLED = "sync.snapshot.client.enabled";
public static final String PROPERTY_SNAP_CLIENT_CHECK_HISTORICAL_HEADERS = "sync.snapshot.client.checkHistoricalHeaders";
public static final String PROPERTY_SNAP_NODES = "sync.snapshot.client.snapBootNodes";

//TODO: REMOVE THIS WHEN THE LocalBLockTests starts working with REMASC
Expand Down Expand Up @@ -429,6 +430,8 @@ public int getLongSyncLimit() {
public boolean isServerSnapshotSyncEnabled() { return configFromFiles.getBoolean("sync.snapshot.server.enabled");}
public boolean isClientSnapshotSyncEnabled() { return configFromFiles.getBoolean(PROPERTY_SNAP_CLIENT_ENABLED);}

public boolean checkHistoricalHeaders() { return configFromFiles.getBoolean(PROPERTY_SNAP_CLIENT_CHECK_HISTORICAL_HEADERS);}

public boolean isSnapshotParallelEnabled() { return configFromFiles.getBoolean("sync.snapshot.client.parallel");}

public int getSnapshotChunkSize() { return configFromFiles.getInt("sync.snapshot.client.chunkSize");}
Expand Down Expand Up @@ -512,10 +515,14 @@ public boolean fastBlockPropagation() {
return configFromFiles.getBoolean("peer.fastBlockPropagation");
}

public Integer getMessageQueueMaxSize() {
public int getMessageQueueMaxSize() {
return configFromFiles.getInt("peer.messageQueue.maxSizePerPeer");
}

public int getMessageQueuePerMinuteThreshold() {
return configFromFiles.getInt("peer.messageQueue.thresholdPerMinutePerPeer");
}

public boolean rpcZeroSignatureIfRemasc() {
return configFromFiles.getBoolean("rpc.zeroSignatureIfRemasc");
}
Expand Down
11 changes: 8 additions & 3 deletions rskj-core/src/main/java/co/rsk/net/NodeMessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private void tryAddMessage(Peer sender, Message message, NodeMsgTraceInfo nodeMs
*/
private boolean controlMessageIngress(Peer sender, Message message, double score) {
return
allowByScore(score) &&
allowByScore(sender, message, score) &&
allowByMessageCount(sender) &&
allowByMinerNotBanned(sender, message) &&
allowByMessageUniqueness(sender, message); // prevent repeated is the most expensive and MUST be the last
Expand All @@ -221,8 +221,13 @@ private boolean controlMessageIngress(Peer sender, Message message, double score
/**
* assert score is acceptable
*/
private boolean allowByScore(double score) {
return score >= 0;
private boolean allowByScore(Peer sender, Message message, double score) {
boolean allow = score >= 0;
if (!allow) {
logger.debug("Message: [{}] from: [{}] with score: [{}] was not allowed", message.getMessageType(), sender, score);
}

return allow;
}

/**
Expand Down
84 changes: 54 additions & 30 deletions rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static co.rsk.net.sync.SnapSyncRequestManager.PeerSelector;

/**
* Snapshot Synchronization consist in 3 steps:
* 1. Status: exchange message with the server, to know which block we are going to sync and what the size of the Unitrie of that block is.
Expand Down Expand Up @@ -86,8 +87,7 @@ public class SnapshotProcessor implements InternalService {
private final BlockHeaderParentDependantValidationRule blockHeaderParentValidator;
private final BlockHeaderValidationRule blockHeaderValidator;

private final AtomicLong messageId = new AtomicLong(0);

private final boolean checkHistoricalHeaders;
// flag for parallel requests
private final boolean parallel;

Expand All @@ -106,10 +106,11 @@ public SnapshotProcessor(Blockchain blockchain,
BlockHeaderParentDependantValidationRule blockHeaderParentValidator,
BlockHeaderValidationRule blockHeaderValidator,
int chunkSize,
boolean checkHistoricalHeaders,
boolean isParallelEnabled) {
this(blockchain, trieStore, peersInformation, blockStore, transactionPool,
blockParentValidator, blockValidator, blockHeaderParentValidator, blockHeaderValidator,
chunkSize, isParallelEnabled, null);
chunkSize, checkHistoricalHeaders, isParallelEnabled, null);
}

@VisibleForTesting
Expand All @@ -123,6 +124,7 @@ public SnapshotProcessor(Blockchain blockchain,
BlockHeaderParentDependantValidationRule blockHeaderParentValidator,
BlockHeaderValidationRule blockHeaderValidator,
int chunkSize,
boolean checkHistoricalHeaders,
boolean isParallelEnabled,
@Nullable SyncMessageHandler.Listener listener) {
this.blockchain = blockchain;
Expand All @@ -138,6 +140,7 @@ public SnapshotProcessor(Blockchain blockchain,
this.blockHeaderParentValidator = blockHeaderParentValidator;
this.blockHeaderValidator = blockHeaderValidator;

this.checkHistoricalHeaders = checkHistoricalHeaders;
this.parallel = isParallelEnabled;
this.thread = new Thread(new SyncMessageHandler("SNAP/server", requestQueue, listener) {

Expand All @@ -157,7 +160,7 @@ public void startSyncing(SnapSyncState state) {
}

logger.info("Starting Snap sync");
requestSnapStatus(bestPeerOpt.get());
requestSnapStatus(state, bestPeerOpt.get());
}

private void completeSyncing(SnapSyncState state) {
Expand All @@ -177,9 +180,22 @@ private void failSyncing(SnapSyncState state, Peer peer, EventType eventType, St
/**
* STATUS
*/
private void requestSnapStatus(Peer peer) {
SnapStatusRequestMessage message = new SnapStatusRequestMessage();
peer.sendMessage(message);
private void requestSnapStatus(SnapSyncState state, Peer peer) {
state.submitRequest(snapPeerSelector(peer), SnapStatusRequestMessage::new);
}

private PeerSelector peerSelector(@Nullable Peer peer) {
return PeerSelector.builder()
.withDefaultPeer(() -> peer)
.withAltPeer(peersInformation::getBestPeer)
.build();
}

private PeerSelector snapPeerSelector(@Nullable Peer snapPeer) {
return PeerSelector.builder()
.withDefaultPeer(() -> snapPeer)
.withAltPeer(peersInformation::getBestSnapPeer)
.build();
}

public void processSnapStatusRequest(Peer sender, SnapStatusRequestMessage requestMessage) {
Expand All @@ -201,7 +217,7 @@ public void run() {
}
}

void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage ignoredRequestMessage) {
void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage requestMessage) {
long bestBlockNumber = blockchain.getBestBlock().getNumber();
long checkpointBlockNumber = bestBlockNumber - (bestBlockNumber % BLOCK_NUMBER_CHECKPOINT);
logger.debug("Processing snapshot status request, checkpointBlockNumber: {}, bestBlockNumber: {}", checkpointBlockNumber, bestBlockNumber);
Expand All @@ -226,7 +242,7 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno

long trieSize = opt.get().getTotalSize();
logger.debug("Processing snapshot status request - rootHash: {} trieSize: {}", rootHash, trieSize);
SnapStatusResponseMessage responseMessage = new SnapStatusResponseMessage(blocks, difficulties, trieSize);
SnapStatusResponseMessage responseMessage = new SnapStatusResponseMessage(requestMessage.getId(), blocks, difficulties, trieSize);
sender.sendMessage(responseMessage);
}

Expand Down Expand Up @@ -261,7 +277,7 @@ public void processSnapStatusResponse(SnapSyncState state, Peer sender, SnapStat
generateChunkRequestTasks(state);
startRequestingChunks(state);
} else {
requestBlocksChunk(sender, blocksFromResponse.get(0).getNumber());
requestBlocksChunk(state, blocksFromResponse.get(0).getNumber());
}
}

Expand Down Expand Up @@ -318,9 +334,11 @@ private boolean areBlockPairsValid(Pair<Block, BlockDifficulty> blockPair, @Null
/**
* BLOCK CHUNK
*/
private void requestBlocksChunk(Peer sender, long blockNumber) {
logger.debug("Requesting block chunk to node {} - block {}", sender.getPeerNodeID(), blockNumber);
sender.sendMessage(new SnapBlocksRequestMessage(blockNumber));
private void requestBlocksChunk(SnapSyncState state, long blockNumber) {
state.submitRequest(
peerSelector(null),
messageId -> new SnapBlocksRequestMessage(messageId, blockNumber)
);
}

public void processBlockHeaderChunk(SnapSyncState state, Peer sender, List<BlockHeader> chunk) {
Expand Down Expand Up @@ -405,7 +423,10 @@ private void requestNextBlockHeadersChunk(SnapSyncState state, Peer sender) {

logger.debug("Requesting block header chunk to node {} - block [{}/{}]", peer.getPeerNodeID(), lastVerifiedBlockHeader.getNumber() - 1, parentHash);

state.getSyncEventsHandler().sendBlockHeadersRequest(peer, new ChunkDescriptor(parentHash.getBytes(), (int) count));
state.submitRequest(
peerSelector(sender),
messageId -> new BlockHeadersRequestMessage(messageId, parentHash.getBytes(), (int) count)
);
}

public void processSnapBlocksRequest(Peer sender, SnapBlocksRequestMessage requestMessage) {
Expand Down Expand Up @@ -447,7 +468,7 @@ void processSnapBlocksRequestInternal(Peer sender, SnapBlocksRequestMessage requ
difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes()));
}
logger.debug("Sending snap blocks response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE);
SnapBlocksResponseMessage responseMessage = new SnapBlocksResponseMessage(blocks, difficulties);
SnapBlocksResponseMessage responseMessage = new SnapBlocksResponseMessage(requestMessage.getId(), blocks, difficulties);
sender.sendMessage(responseMessage);
}

Expand All @@ -474,7 +495,12 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc
generateChunkRequestTasks(state);
startRequestingChunks(state);
} else if (nextChunk > lastRequiredBlock) {
requestBlocksChunk(sender, nextChunk);
requestBlocksChunk(state, nextChunk);
} else if (!this.checkHistoricalHeaders) {
logger.info("Finished Snap blocks request sending. Start requesting state chunks without historical headers check");

generateChunkRequestTasks(state);
startRequestingChunks(state);
} else {
logger.info("Finished Snap blocks request sending. Start requesting state chunks and block headers");

Expand All @@ -488,10 +514,11 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc
/**
* STATE CHUNK
*/
private void requestStateChunk(Peer peer, long from, long blockNumber, int chunkSize) {
logger.debug("Requesting state chunk to node {} - block {} - chunkNumber {}", peer.getPeerNodeID(), blockNumber, from / chunkSize);
SnapStateChunkRequestMessage message = new SnapStateChunkRequestMessage(messageId.getAndIncrement(), blockNumber, from, chunkSize);
peer.sendMessage(message);
private void requestStateChunk(SnapSyncState state, Peer peer, long from, long blockNumber, int chunkSize) {
state.submitRequest(
snapPeerSelector(peer),
messageId -> new SnapStateChunkRequestMessage(messageId, blockNumber, from, chunkSize)
);
}

public void processStateChunkRequest(Peer sender, SnapStateChunkRequestMessage requestMessage) {
Expand Down Expand Up @@ -573,7 +600,7 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC
state.setNextExpectedFrom(nextExpectedFrom + chunkSize * CHUNK_ITEM_SIZE);
} catch (Exception e) {
logger.error("Error while processing chunk response. {}", e.getMessage(), e);
onStateChunkResponseError(peer, nextMessage);
onStateChunkResponseError(state, peer, nextMessage);
}
} else {
break;
Expand All @@ -587,19 +614,18 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC
}

@VisibleForTesting
void onStateChunkResponseError(Peer peer, SnapStateChunkResponseMessage responseMessage) {
void onStateChunkResponseError(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage responseMessage) {
logger.error("Error while processing chunk response from {} of peer {}. Asking for chunk again.", responseMessage.getFrom(), peer.getPeerNodeID());
Peer alternativePeer = peersInformation.getBestSnapPeerCandidates().stream()
.filter(listedPeer -> !listedPeer.getPeerNodeID().equals(peer.getPeerNodeID()))
.findFirst()
.orElse(peer);
logger.debug("Requesting state chunk \"from\" {} to peer {}", responseMessage.getFrom(), peer.getPeerNodeID());
requestStateChunk(alternativePeer, responseMessage.getFrom(), responseMessage.getBlockNumber(), chunkSize);
requestStateChunk(state, alternativePeer, responseMessage.getFrom(), responseMessage.getBlockNumber(), chunkSize);
}

private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) throws Exception {
logger.debug("Processing State chunk received from {} to {}", message.getFrom(), message.getTo());
peersInformation.getOrRegisterPeer(peer);

RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue());
final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData());
Expand Down Expand Up @@ -648,10 +674,8 @@ private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, Sn
state.getAllNodes().addAll(nodes);
state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size())));
state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length)));
if (!message.isComplete()) {
executeNextChunkRequestTask(state, peer);
} else {
if (blocksVerified(state)) {
if (message.isComplete()) {
if (!this.checkHistoricalHeaders || blocksVerified(state)) {
completeSyncing(state);
} else {
state.setStateFetched();
Expand Down Expand Up @@ -716,7 +740,7 @@ private void executeNextChunkRequestTask(SnapSyncState state, Peer peer) {
if (!taskQueue.isEmpty()) {
ChunkTask task = taskQueue.poll();

requestStateChunk(peer, task.getFrom(), task.getBlockNumber(), chunkSize);
requestStateChunk(state, peer, task.getFrom(), task.getBlockNumber(), chunkSize);
} else {
logger.warn("No more chunk request tasks.");
}
Expand Down
Loading
Loading