diff --git a/consensus/core/src/ancestor.rs b/consensus/core/src/ancestor.rs index da0032780d4af..32524b80616a7 100644 --- a/consensus/core/src/ancestor.rs +++ b/consensus/core/src/ancestor.rs @@ -173,7 +173,7 @@ impl AncestorStateManager { } let low_score_threshold = - (self.propagation_scores.high_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100; + (self.propagation_scores.highest_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100; match ancestor_info.state { // Check conditions to switch to EXCLUDE state diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 3cceb4294272a..760374fcae72d 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -948,6 +948,10 @@ impl Core { } } + assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); + + let mut excluded_ancestors_count = 0; + // Inclusion of weak links for low score ancestors so there is no long // list of blocks that we need to include later. for (score, ancestor) in excluded_ancestors.iter() { @@ -961,49 +965,38 @@ impl Core { // as that is the max round we can include as an ancestor. network_low_quorum_round = network_low_quorum_round.min(quorum_round); - if let Some(last_block_ref) = self.last_included_ancestors[excluded_author] { - if last_block_ref.round < network_low_quorum_round { - if ancestor.round() == network_low_quorum_round { - // Include the ancestor block as it has been seen by a strong quorum - self.last_included_ancestors[excluded_author] = Some(ancestor.reference()); - ancestors_to_propose.push(ancestor.clone()); - debug!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); - self.context - .metrics - .node_metrics - .included_excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname, "weak"]) - .inc(); - continue; - } - - // Fetch ancestor block from the store for older round - let blocks = self - .dag_state - .read() - .get_uncommitted_blocks_at_slot(Slot::new( - network_low_quorum_round, - excluded_author, - )); - - if let Some(block) = blocks.first() { - self.last_included_ancestors[excluded_author] = Some(block.reference()); - ancestors_to_propose.push(block.clone()); - debug!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); - self.context - .metrics - .node_metrics - .included_excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname, "weak"]) - .inc(); - continue; - } else { - debug!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); - } - } + let last_included_ancestor_round = self.last_included_ancestors[excluded_author] + .map_or(GENESIS_ROUND, |block_ref| block_ref.round); + + // Check if there is a block that is at a round higher than the last included + // ancestor round but less than or equal to the network low quorum round. + if let Some(last_cached_block_in_range) = self + .dag_state + .read() + .get_last_cached_block_for_authority_in_range( + excluded_author, + last_included_ancestor_round + 1, + network_low_quorum_round + 1, + ) + { + // Include the ancestor block as it has been seen by a strong quorum + self.last_included_ancestors[excluded_author] = + Some(last_cached_block_in_range.reference()); + ancestors_to_propose.push(last_cached_block_in_range.clone()); + debug!("Included low scoring ancestor {last_cached_block_in_range} with score {score} seen between last included round {last_included_ancestor_round} and network quorum round {network_low_quorum_round} to propose for round {clock_round}"); + self.context + .metrics + .node_metrics + .included_excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname, "weak"]) + .inc(); + continue; + } else { + debug!("No cached block found for low scoring ancestor {ancestor} with score {score} between last included round {last_included_ancestor_round} and network quorum round {network_low_quorum_round} to propose for round {clock_round}"); } debug!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}"); + excluded_ancestors_count += 1; self.context .metrics .node_metrics @@ -1013,13 +1006,10 @@ impl Core { } info!( - "Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}", + "Included {} ancestors & excluded {excluded_ancestors_count} ancestors for proposal in round {clock_round}", ancestors_to_propose.len(), - excluded_ancestors.len(), ); - assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."); - ancestors_to_propose } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index bc373184e0fb1..6c5f58dd1cbc4 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -496,7 +496,19 @@ impl DagState { blocks } - /// Returns the last block proposed per authority with `round < end_round`. + /// Returns the last block cached for the authority within a range. + /// This can return None if there is no block within the range provided. + pub(crate) fn get_last_cached_block_for_authority_in_range( + &self, + authority: AuthorityIndex, + start_round: Round, + end_round: Round, + ) -> Option { + let blocks = self.get_last_cached_blocks_per_authority_in_range(start_round, end_round); + blocks[authority].clone() + } + + /// Returns the last block cached per authority with `round < end_round`. /// The method is guaranteed to return results only when the `end_round` is not earlier of the /// available cached data for each authority, otherwise the method will panic - it's the caller's /// responsibility to ensure that is not requesting filtering for earlier rounds . @@ -505,14 +517,40 @@ impl DagState { &self, end_round: Round, ) -> Vec { - // init with the genesis blocks as fallback - let mut blocks = self.genesis.values().cloned().collect::>(); + let blocks = self.get_last_cached_blocks_per_authority_in_range(GENESIS_ROUND, end_round); - if end_round == GENESIS_ROUND { - panic!( - "Attempted to retrieve blocks earlier than the genesis round which is not possible" - ); - } + // Ensure every authority has a valid block as we will fall back to genesis + // blocks which should always be available. + blocks + .into_iter() + .enumerate() + .map(|(idx, block_opt)| { + let authority_index = self.context.committee.to_authority_index(idx).unwrap(); + let hostname = &self.context.committee.authority(authority_index).hostname; + block_opt.unwrap_or_else(|| { + panic!("Expected to find a cached block for authority {hostname}") + }) + }) + .collect() + } + + fn get_last_cached_blocks_per_authority_in_range( + &self, + start_round: u32, + end_round: u32, + ) -> Vec> { + let mut blocks = if start_round != GENESIS_ROUND { + // Init with None, as we will selectively insert cached blocks in range + vec![None; self.context.committee.size()] + } else { + // Init with the genesis blocks as fallback as genesis round is within range. + self.genesis.values().cloned().map(Some).collect::>() + }; + + assert!( + end_round != GENESIS_ROUND, + "Attempted to retrieve blocks earlier than the genesis round which is not possible" + ); if end_round == GENESIS_ROUND + 1 { return blocks; @@ -526,14 +564,22 @@ impl DagState { .unwrap(); let last_evicted_round = self.evicted_rounds[authority_index]; - if end_round.saturating_sub(1) <= last_evicted_round { - panic!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}", ); + + // If the last evicted round is after the start round, start from the + // last evicted round + 1. + let first_included_round = start_round.max(last_evicted_round + 1); + + if end_round.saturating_sub(1) <= last_evicted_round || first_included_round > end_round + { + debug!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} and the first included round is {first_included_round} for authority {authority_index}"); + blocks[authority_index] = None; + continue; } if let Some(block_ref) = block_refs .range(( Included(BlockRef::new( - last_evicted_round + 1, + first_included_round, authority_index, BlockDigest::MIN, )), @@ -546,7 +592,7 @@ impl DagState { .get(block_ref) .expect("Block should exist in recent blocks"); - blocks[authority_index] = block.clone(); + blocks[authority_index] = Some(block.clone()); } } @@ -1947,6 +1993,70 @@ mod test { assert_eq!(cached_blocks[0].round(), 12); } + #[tokio::test] + async fn test_get_cached_last_block_for_authority_in_range() { + // GIVEN + const CACHED_ROUNDS: Round = 2; + let (mut context, _) = Context::new_for_test(4); + context.parameters.dag_state_cached_rounds = CACHED_ROUNDS; + + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + let mut dag_state = DagState::new(context.clone(), store.clone()); + + // Create no blocks for authority 0 + // Create one block (round 1) for authority 1 + // Create two blocks (rounds 1,2) for authority 2 + // Create three blocks (rounds 1,2,3) for authority 3 + let mut all_blocks = Vec::new(); + for author in 1..=3 { + for round in 1..=author { + let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build()); + all_blocks.push(block.clone()); + dag_state.accept_block(block); + } + } + + dag_state.add_commit(TrustedCommit::new_for_test( + 1 as CommitIndex, + CommitDigest::MIN, + context.clock.timestamp_utc_ms(), + all_blocks.last().unwrap().reference(), + all_blocks + .into_iter() + .map(|block| block.reference()) + .collect::>(), + )); + + // WHEN search for the latest blocks between rounds (2..4) + let start_round = 2; + let end_round = 4; + let last_block = dag_state.get_last_cached_block_for_authority_in_range( + AuthorityIndex::new_for_test(3), + start_round, + end_round, + ); + + // THEN we should have found a block within the range + assert_eq!(last_block.unwrap().round(), 3); + + // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger + // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND. + dag_state.flush(); + + // AND we request blocks between rounds (3..3) + let start_round = 3; + let end_round = 3; + let last_block = dag_state.get_last_cached_block_for_authority_in_range( + AuthorityIndex::new_for_test(3), + start_round, + end_round, + ); + + // THEN we hit the case where block is not found + assert!(last_block.is_none()); + } + #[rstest] #[tokio::test] async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) { @@ -2029,9 +2139,7 @@ mod test { } #[tokio::test] - #[should_panic( - expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C" - )] + #[should_panic(expected = "Expected to find a cached block for authority test_host_2")] async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() { // GIVEN const CACHED_ROUNDS: Round = 1; @@ -2076,9 +2184,7 @@ mod test { } #[tokio::test] - #[should_panic( - expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C" - )] + #[should_panic(expected = "Expected to find a cached block for authority test_host_2")] async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() { // GIVEN const CACHED_ROUNDS: Round = 1; diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index fb6c7f3453206..51ab0a5e4aa2e 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -129,7 +129,7 @@ impl ReputationScores { } } - pub(crate) fn high_score(&self) -> u64 { + pub(crate) fn highest_score(&self) -> u64 { *self.scores_per_authority.iter().max().unwrap_or(&0) }