diff --git a/chain/chain/src/store_validator.rs b/chain/chain/src/store_validator.rs index 8bceb0b043d..a8a0f259585 100644 --- a/chain/chain/src/store_validator.rs +++ b/chain/chain/src/store_validator.rs @@ -10,6 +10,7 @@ use near_primitives::borsh; use near_primitives::epoch_block_info::BlockInfo; use near_primitives::epoch_info::EpochInfo; use near_primitives::epoch_manager::AGGREGATOR_KEY; +use near_primitives::epoch_sync::EpochSyncProof; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::get_block_shard_uid_rev; use near_primitives::sharding::{ChunkHash, ShardChunk, StateSyncInfo}; @@ -75,6 +76,9 @@ pub struct StoreValidator { timeout: Option, start_time: Instant, pub is_archival: bool, + // If present, the node was bootstrapped with epoch sync, and this block height + // represents the first block of the target epoch that we epoch synced to. + epoch_sync_boundary: Option, pub errors: Vec, tests: u64, @@ -90,6 +94,12 @@ impl StoreValidator { store: Store, is_archival: bool, ) -> Self { + let epoch_sync_boundary = store + .get_ser::(DBCol::EpochSyncProof, &[]) + .expect("Store IO error when getting EpochSyncProof") + .map(|epoch_sync_proof| { + epoch_sync_proof.current_epoch.first_block_header_in_epoch.height() + }); StoreValidator { me, config, @@ -101,6 +111,7 @@ impl StoreValidator { timeout: None, start_time: Clock::real().now(), is_archival, + epoch_sync_boundary, errors: vec![], tests: 0, } diff --git a/chain/chain/src/store_validator/validate.rs b/chain/chain/src/store_validator/validate.rs index 61163f7f10c..8de071ee000 100644 --- a/chain/chain/src/store_validator/validate.rs +++ b/chain/chain/src/store_validator/validate.rs @@ -358,6 +358,15 @@ pub(crate) fn block_chunks_exist( _block_hash: &CryptoHash, block: &Block, ) -> Result<(), StoreValidatorError> { + let tail_height = + sv.store.get_ser::(DBCol::BlockMisc, TAIL_KEY).unwrap().unwrap_or(0); + if block.header().height() <= tail_height { + // If this node has undergone state sync to block H (where H is the first block of an epoch), + // then it appears that blocks before H may not have the chunk bodies in storage. + // Note, that this is NOT a completely correct check. It is only a heuristic that is good enough + // for single-shard, no-missing-chunks state sync or epoch sync tests. + return Ok(()); + } for chunk_header in block.chunks().iter() { if chunk_header.height_included() == block.header().height() { if let Some(me) = &sv.me { @@ -482,12 +491,20 @@ pub(crate) fn canonical_prev_block_validity( height: &BlockHeight, hash: &CryptoHash, ) -> Result<(), StoreValidatorError> { + if let Some(epoch_sync_boundary) = &sv.epoch_sync_boundary { + // Headers that are below the epoch_sync_boundary are not expected to be present, + // so skip the check in that case. + if height <= epoch_sync_boundary { + return Ok(()); + } + } if *height != sv.config.genesis_height { let header = unwrap_or_err_db!( sv.store.get_ser::(DBCol::BlockHeader, hash.as_ref()), "Can't get Block Header {:?} from DBCol::BlockHeader", hash ); + let prev_hash = *header.prev_hash(); let prev_header = unwrap_or_err_db!( sv.store.get_ser::(DBCol::BlockHeader, prev_hash.as_ref()), @@ -756,12 +773,19 @@ pub(crate) fn chunk_extra_block_exists( pub(crate) fn block_info_block_header_exists( sv: &mut StoreValidator, block_hash: &CryptoHash, - _block_info: &BlockInfo, + block_info: &BlockInfo, ) -> Result<(), StoreValidatorError> { // fake block info for pre-genesis block if *block_hash == CryptoHash::default() { return Ok(()); } + if let Some(epoch_sync_boundary) = &sv.epoch_sync_boundary { + // BlockInfo before the epoch sync boundary is not guaranteed to have a + // corresponding header. + if block_info.height() < *epoch_sync_boundary { + return Ok(()); + } + } unwrap_or_err_db!( sv.store.get_ser::(DBCol::BlockHeader, block_hash.as_ref()), "Can't get Block Header from DB" diff --git a/chain/client/src/sync/epoch.rs b/chain/client/src/sync/epoch.rs index f0c8a3fee8b..e7be21c4f5a 100644 --- a/chain/client/src/sync/epoch.rs +++ b/chain/client/src/sync/epoch.rs @@ -436,6 +436,8 @@ impl EpochSync { .choose(&mut rand::thread_rng()) .ok_or_else(|| Error::Other("No peers to request epoch sync from".to_string()))?; + tracing::info!(peer_id=?peer.peer_info.id, "Bootstrapping node via epoch sync"); + *status = SyncStatus::EpochSync(EpochSyncStatus { source_peer_id: peer.peer_info.id.clone(), source_peer_height: peer.highest_block_height, @@ -545,6 +547,7 @@ impl EpochSync { update.commit()?; *status = SyncStatus::EpochSyncDone; + tracing::info!(epoch_id=?last_header.epoch_id(), "Bootstrapped from epoch sync"); Ok(()) } diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index d71952e85b2..aa972d8a240 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -197,4 +197,8 @@ pytest sanity/replay_chain_from_archive.py --features nightly # Tests that offline block+chunk producers and chunk validators are kicked out. pytest --timeout=120 sanity/kickout_offline_validators.py -pytest --timeout=120 sanity/kickout_offline_validators.py --features nightly \ No newline at end of file +pytest --timeout=120 sanity/kickout_offline_validators.py --features nightly + +# Epoch sync +pytest --timeout=240 sanity/epoch_sync.py +pytest --timeout=240 sanity/epoch_sync.py --features nightly \ No newline at end of file diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index 43cc2c365de..622915871e9 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -1006,6 +1006,7 @@ def apply_config_changes(node_dir: str, # when None. allowed_missing_configs = ( 'archive', 'consensus.block_fetch_horizon', + 'consensus.block_header_fetch_horizon', 'consensus.min_block_production_delay', 'consensus.max_block_production_delay', 'consensus.max_block_wait_delay', 'consensus.state_sync_timeout', diff --git a/pytest/tests/sanity/epoch_sync.py b/pytest/tests/sanity/epoch_sync.py new file mode 100755 index 00000000000..d61f27858c2 --- /dev/null +++ b/pytest/tests/sanity/epoch_sync.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +# Spins up a node, then waits for 5 epochs. +# Spin up another node with epoch sync enabled, and make sure it catches up. + +import sys +import pathlib +import unittest + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import init_cluster, spin_up_node, load_config +import state_sync_lib +import utils + +# The time it takes for an epoch needs to be long enough to reliably have +# state dumps. If at some point this test ends up being flaky because node0 +# is spitting out state sync dumper errors like "Wrong snapshot hash", +# increase this further. +EPOCH_LENGTH = 15 + +# We can only do epoch sync if there are enough epochs to begin with, so have +# a few epochs. +SYNC_FROM_BLOCK = 5 * EPOCH_LENGTH +# After epoch sync, let's run for enough epochs for GC to kick in, to verify +# that the node is fine with GC too. +CATCHUP_BLOCK = 12 * EPOCH_LENGTH + + +class EpochSyncTest(unittest.TestCase): + + def setUp(self): + self.config = load_config() + node_config = state_sync_lib.get_state_sync_config_combined() + + node_config['epoch_sync'] = { + "enabled": True, + "epoch_sync_horizon": EPOCH_LENGTH * 3, + "epoch_sync_accept_proof_max_horizon": EPOCH_LENGTH * 3, + "timeout_for_epoch_sync": { + "secs": 5, + "nanos": 0 + } + } + + # Make sure that state sync targets an epoch *later* than the epoch sync target epoch. + node_config["consensus.block_fetch_horizon"] = 3 + node_config["consensus.block_header_fetch_horizon"] = EPOCH_LENGTH + + self.near_root, self.node_dirs = init_cluster( + num_nodes=2, + num_observers=1, + num_shards=1, + config=self.config, + genesis_config_changes=[["min_gas_price", 0], + ["epoch_length", EPOCH_LENGTH]], + client_config_changes={x: node_config for x in range(3)}) + + def test(self): + node0 = spin_up_node(self.config, self.near_root, self.node_dirs[0], 0) + node1 = spin_up_node(self.config, + self.near_root, + self.node_dirs[1], + 1, + boot_node=node0) + + ctx = utils.TxContext([0, 0], [node0, node1]) + + for height, block_hash in utils.poll_blocks(node0, + timeout=SYNC_FROM_BLOCK * 2, + poll_interval=0.1): + if height >= SYNC_FROM_BLOCK: + break + ctx.send_moar_txs(block_hash, 1, False) + + node2 = spin_up_node(self.config, + self.near_root, + self.node_dirs[2], + 2, + boot_node=node0) + tracker = utils.LogTracker(node2) + + utils.wait_for_blocks(node2, + target=CATCHUP_BLOCK, + timeout=(CATCHUP_BLOCK - SYNC_FROM_BLOCK) * 2) + + # Verify that we did bootstrap using epoch sync (rather than header sync). + tracker.check('Bootstrapped from epoch sync') + + +if __name__ == '__main__': + unittest.main()