Skip to content

Commit

Permalink
[Epoch Sync] Add a Nayduck test for Epoch Sync. (#12237)
Browse files Browse the repository at this point in the history
Most of the complexity here is changes to the StoreValidator:
* Do not check that previous block exists, if it's older than the epoch
sync boundary.
* Do not check that if BlockInfo exists the header also exists, because
this is also not true for some blocks before the epoch sync boundary.
* (This one was pre-existing before epoch sync) Be lenient when checking
chunk body's existence, because we don't seem to download these for the
extra blocks needed by state sync. I couldn't figure out a good way to
check this rigorously, so I included a heuristic to skip checking if the
block in question is <= tail height. I don't think it's correct, but it
works for now.
* The reason why this does not trigger state sync nayduck test failures
is because the utils.poll_blocks function doesn't trigger store
validations, but utils.wait_for_blocks does 🙄
  • Loading branch information
robin-near authored Oct 17, 2024
1 parent de6e88b commit ba6c707
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 2 deletions.
11 changes: 11 additions & 0 deletions chain/chain/src/store_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use near_primitives::borsh;
use near_primitives::epoch_block_info::BlockInfo;
use near_primitives::epoch_info::EpochInfo;
use near_primitives::epoch_manager::AGGREGATOR_KEY;
use near_primitives::epoch_sync::EpochSyncProof;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::get_block_shard_uid_rev;
use near_primitives::sharding::{ChunkHash, ShardChunk, StateSyncInfo};
Expand Down Expand Up @@ -75,6 +76,9 @@ pub struct StoreValidator {
timeout: Option<i64>,
start_time: Instant,
pub is_archival: bool,
// If present, the node was bootstrapped with epoch sync, and this block height
// represents the first block of the target epoch that we epoch synced to.
epoch_sync_boundary: Option<BlockHeight>,

pub errors: Vec<ErrorMessage>,
tests: u64,
Expand All @@ -90,6 +94,12 @@ impl StoreValidator {
store: Store,
is_archival: bool,
) -> Self {
let epoch_sync_boundary = store
.get_ser::<EpochSyncProof>(DBCol::EpochSyncProof, &[])
.expect("Store IO error when getting EpochSyncProof")
.map(|epoch_sync_proof| {
epoch_sync_proof.current_epoch.first_block_header_in_epoch.height()
});
StoreValidator {
me,
config,
Expand All @@ -101,6 +111,7 @@ impl StoreValidator {
timeout: None,
start_time: Clock::real().now(),
is_archival,
epoch_sync_boundary,
errors: vec![],
tests: 0,
}
Expand Down
26 changes: 25 additions & 1 deletion chain/chain/src/store_validator/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,15 @@ pub(crate) fn block_chunks_exist(
_block_hash: &CryptoHash,
block: &Block,
) -> Result<(), StoreValidatorError> {
let tail_height =
sv.store.get_ser::<BlockHeight>(DBCol::BlockMisc, TAIL_KEY).unwrap().unwrap_or(0);
if block.header().height() <= tail_height {
// If this node has undergone state sync to block H (where H is the first block of an epoch),
// then it appears that blocks before H may not have the chunk bodies in storage.
// Note, that this is NOT a completely correct check. It is only a heuristic that is good enough
// for single-shard, no-missing-chunks state sync or epoch sync tests.
return Ok(());
}
for chunk_header in block.chunks().iter() {
if chunk_header.height_included() == block.header().height() {
if let Some(me) = &sv.me {
Expand Down Expand Up @@ -482,12 +491,20 @@ pub(crate) fn canonical_prev_block_validity(
height: &BlockHeight,
hash: &CryptoHash,
) -> Result<(), StoreValidatorError> {
if let Some(epoch_sync_boundary) = &sv.epoch_sync_boundary {
// Headers that are below the epoch_sync_boundary are not expected to be present,
// so skip the check in that case.
if height <= epoch_sync_boundary {
return Ok(());
}
}
if *height != sv.config.genesis_height {
let header = unwrap_or_err_db!(
sv.store.get_ser::<BlockHeader>(DBCol::BlockHeader, hash.as_ref()),
"Can't get Block Header {:?} from DBCol::BlockHeader",
hash
);

let prev_hash = *header.prev_hash();
let prev_header = unwrap_or_err_db!(
sv.store.get_ser::<BlockHeader>(DBCol::BlockHeader, prev_hash.as_ref()),
Expand Down Expand Up @@ -756,12 +773,19 @@ pub(crate) fn chunk_extra_block_exists(
pub(crate) fn block_info_block_header_exists(
sv: &mut StoreValidator,
block_hash: &CryptoHash,
_block_info: &BlockInfo,
block_info: &BlockInfo,
) -> Result<(), StoreValidatorError> {
// fake block info for pre-genesis block
if *block_hash == CryptoHash::default() {
return Ok(());
}
if let Some(epoch_sync_boundary) = &sv.epoch_sync_boundary {
// BlockInfo before the epoch sync boundary is not guaranteed to have a
// corresponding header.
if block_info.height() < *epoch_sync_boundary {
return Ok(());
}
}
unwrap_or_err_db!(
sv.store.get_ser::<BlockHeader>(DBCol::BlockHeader, block_hash.as_ref()),
"Can't get Block Header from DB"
Expand Down
3 changes: 3 additions & 0 deletions chain/client/src/sync/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ impl EpochSync {
.choose(&mut rand::thread_rng())
.ok_or_else(|| Error::Other("No peers to request epoch sync from".to_string()))?;

tracing::info!(peer_id=?peer.peer_info.id, "Bootstrapping node via epoch sync");

*status = SyncStatus::EpochSync(EpochSyncStatus {
source_peer_id: peer.peer_info.id.clone(),
source_peer_height: peer.highest_block_height,
Expand Down Expand Up @@ -545,6 +547,7 @@ impl EpochSync {
update.commit()?;

*status = SyncStatus::EpochSyncDone;
tracing::info!(epoch_id=?last_header.epoch_id(), "Bootstrapped from epoch sync");

Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,8 @@ pytest sanity/replay_chain_from_archive.py --features nightly

# Tests that offline block+chunk producers and chunk validators are kicked out.
pytest --timeout=120 sanity/kickout_offline_validators.py
pytest --timeout=120 sanity/kickout_offline_validators.py --features nightly
pytest --timeout=120 sanity/kickout_offline_validators.py --features nightly

# Epoch sync
pytest --timeout=240 sanity/epoch_sync.py
pytest --timeout=240 sanity/epoch_sync.py --features nightly
1 change: 1 addition & 0 deletions pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ def apply_config_changes(node_dir: str,
# when None.
allowed_missing_configs = (
'archive', 'consensus.block_fetch_horizon',
'consensus.block_header_fetch_horizon',
'consensus.min_block_production_delay',
'consensus.max_block_production_delay',
'consensus.max_block_wait_delay', 'consensus.state_sync_timeout',
Expand Down
91 changes: 91 additions & 0 deletions pytest/tests/sanity/epoch_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python3
# Spins up a node, then waits for 5 epochs.
# Spin up another node with epoch sync enabled, and make sure it catches up.

import sys
import pathlib
import unittest

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from cluster import init_cluster, spin_up_node, load_config
import state_sync_lib
import utils

# The time it takes for an epoch needs to be long enough to reliably have
# state dumps. If at some point this test ends up being flaky because node0
# is spitting out state sync dumper errors like "Wrong snapshot hash",
# increase this further.
EPOCH_LENGTH = 15

# We can only do epoch sync if there are enough epochs to begin with, so have
# a few epochs.
SYNC_FROM_BLOCK = 5 * EPOCH_LENGTH
# After epoch sync, let's run for enough epochs for GC to kick in, to verify
# that the node is fine with GC too.
CATCHUP_BLOCK = 12 * EPOCH_LENGTH


class EpochSyncTest(unittest.TestCase):

def setUp(self):
self.config = load_config()
node_config = state_sync_lib.get_state_sync_config_combined()

node_config['epoch_sync'] = {
"enabled": True,
"epoch_sync_horizon": EPOCH_LENGTH * 3,
"epoch_sync_accept_proof_max_horizon": EPOCH_LENGTH * 3,
"timeout_for_epoch_sync": {
"secs": 5,
"nanos": 0
}
}

# Make sure that state sync targets an epoch *later* than the epoch sync target epoch.
node_config["consensus.block_fetch_horizon"] = 3
node_config["consensus.block_header_fetch_horizon"] = EPOCH_LENGTH

self.near_root, self.node_dirs = init_cluster(
num_nodes=2,
num_observers=1,
num_shards=1,
config=self.config,
genesis_config_changes=[["min_gas_price", 0],
["epoch_length", EPOCH_LENGTH]],
client_config_changes={x: node_config for x in range(3)})

def test(self):
node0 = spin_up_node(self.config, self.near_root, self.node_dirs[0], 0)
node1 = spin_up_node(self.config,
self.near_root,
self.node_dirs[1],
1,
boot_node=node0)

ctx = utils.TxContext([0, 0], [node0, node1])

for height, block_hash in utils.poll_blocks(node0,
timeout=SYNC_FROM_BLOCK * 2,
poll_interval=0.1):
if height >= SYNC_FROM_BLOCK:
break
ctx.send_moar_txs(block_hash, 1, False)

node2 = spin_up_node(self.config,
self.near_root,
self.node_dirs[2],
2,
boot_node=node0)
tracker = utils.LogTracker(node2)

utils.wait_for_blocks(node2,
target=CATCHUP_BLOCK,
timeout=(CATCHUP_BLOCK - SYNC_FROM_BLOCK) * 2)

# Verify that we did bootstrap using epoch sync (rather than header sync).
tracker.check('Bootstrapped from epoch sync')


if __name__ == '__main__':
unittest.main()

0 comments on commit ba6c707

Please sign in to comment.