diff --git a/consensus/core/src/ancestor.rs b/consensus/core/src/ancestor.rs new file mode 100644 index 0000000000000..86e9c20622735 --- /dev/null +++ b/consensus/core/src/ancestor.rs @@ -0,0 +1,345 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashMap, sync::Arc}; + +use consensus_config::AuthorityIndex; +use tokio::time::{Duration, Instant}; +use tracing::info; + +use crate::{context::Context, leader_scoring::ReputationScores, round_prober::QuorumRound}; + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum AncestorState { + Include, + Exclude(u64), +} +struct AncestorInfo { + state: AncestorState, + lock_expiry_time: Option, +} + +impl AncestorInfo { + fn new() -> Self { + Self { + state: AncestorState::Include, + lock_expiry_time: None, + } + } +} + +pub(crate) struct AncestorStateManager { + context: Arc, + state_map: HashMap, + pub(crate) quorum_round_per_authority: Vec, + pub(crate) propagation_scores: ReputationScores, +} + +impl AncestorStateManager { + #[cfg(not(test))] + const STATE_LOCK_DURATION: Duration = Duration::from_secs(30); + #[cfg(test)] + const STATE_LOCK_DURATION: Duration = Duration::from_secs(5); + + const NETWORK_QUORUM_ROUND_LAG_THRESHOLD: u32 = 0; + pub(crate) const EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 10; + + pub(crate) fn new(context: Arc, propagation_scores: ReputationScores) -> Self { + let mut state_map = HashMap::new(); + for (id, _) in context.committee.authorities() { + state_map.insert(id, AncestorInfo::new()); + } + + let quorum_round_per_authority = vec![(0, 0); context.committee.size()]; + Self { + context, + state_map, + propagation_scores, + quorum_round_per_authority, + } + } + + pub(crate) fn set_quorum_round_per_authority(&mut self, quorum_rounds: Vec) { + info!("Quorum round per authority set to: {quorum_rounds:?}"); + self.quorum_round_per_authority = quorum_rounds; + } + + pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) { + self.propagation_scores = scores; + } + + pub(crate) fn get_ancestor_states(&self) -> HashMap { + self.state_map + .iter() + .map(|(&id, info)| (id, info.state)) + .collect() + } + + /// Updates the state of all ancestors based on the latest scores and quorum rounds + pub(crate) fn update_all_ancestors_state(&mut self) { + // If round prober has not run yet and we don't have network quorum round, + // it is okay because network_low_quorum_round will be zero and we will + // include all ancestors until we get more information. + let network_low_quorum_round = self.calculate_network_low_quorum_round(); + let propagation_scores_by_authority = self + .propagation_scores + .scores_per_authority + .clone() + .into_iter() + .enumerate() + .map(|(idx, score)| { + ( + self.context + .committee + .to_authority_index(idx) + .expect("Index should be valid"), + score, + ) + }) + .collect::>(); + + // If propagation scores are not ready because the first 300 commits have not + // happened, this is okay as we will only start excluding ancestors after that + // point in time. + for (authority_id, score) in propagation_scores_by_authority { + let (authority_low_quorum_round, _high) = self.quorum_round_per_authority[authority_id]; + + self.update_state( + authority_id, + score, + authority_low_quorum_round, + network_low_quorum_round, + ); + } + } + + /// Updates the state of the given authority based on current scores and quorum rounds. + fn update_state( + &mut self, + authority_id: AuthorityIndex, + propagation_score: u64, + authority_low_quorum_round: u32, + network_low_quorum_round: u32, + ) { + let block_hostname = &self.context.committee.authority(authority_id).hostname; + let ancestor_info = self.state_map.get_mut(&authority_id).expect(&format!( + "Expected authority_id {authority_id} to be initialized in state_map", + )); + + // Check if the lock period has expired + if let Some(expiry) = ancestor_info.lock_expiry_time { + if Instant::now() < expiry { + // If still locked, return without making any changes + return; + } else { + ancestor_info.lock_expiry_time = None; + } + } + + let low_score_threshold = + (self.propagation_scores.high_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100; + + match ancestor_info.state { + // Check conditions to switch to EXCLUDE state + AncestorState::Include => { + if propagation_score <= low_score_threshold { + ancestor_info.state = AncestorState::Exclude(propagation_score); + ancestor_info.lock_expiry_time = + Some(Instant::now() + Self::STATE_LOCK_DURATION); + info!( + "Authority {authority_id} moved to EXCLUDE state with score {propagation_score} <= threshold of {low_score_threshold} and locked for {:?}", + Self::STATE_LOCK_DURATION + ); + self.context + .metrics + .node_metrics + .ancestor_state_change_by_authority + .with_label_values(&[block_hostname, "exclude"]) + .inc(); + } + } + // Check conditions to switch back to INCLUDE state + AncestorState::Exclude(_) => { + // It should not be possible for the scores to get over the threshold + // until the node is back in the INCLUDE state, but adding just in case. + if propagation_score > low_score_threshold + || authority_low_quorum_round + >= (network_low_quorum_round - Self::NETWORK_QUORUM_ROUND_LAG_THRESHOLD) + { + ancestor_info.state = AncestorState::Include; + ancestor_info.lock_expiry_time = + Some(Instant::now() + Self::STATE_LOCK_DURATION); + info!( + "Authority {authority_id} moved to INCLUDE state with {propagation_score} > threshold of {low_score_threshold} or {authority_low_quorum_round} >= {} and locked for {:?}.", + (network_low_quorum_round - Self::NETWORK_QUORUM_ROUND_LAG_THRESHOLD), + Self::STATE_LOCK_DURATION + ); + self.context + .metrics + .node_metrics + .ancestor_state_change_by_authority + .with_label_values(&[block_hostname, "include"]) + .inc(); + } + } + } + } + + /// Calculate the network's low quorum round from 2f+1 authorities by stake, + /// where low quorum round is the highest round a block has been seen by 2f+1 + /// authorities. + fn calculate_network_low_quorum_round(&self) -> u32 { + let committee = &self.context.committee; + let quorum_threshold = committee.quorum_threshold(); + let mut low_quorum_rounds_with_stake = self + .quorum_round_per_authority + .iter() + .zip(committee.authorities()) + .map(|((low, _high), (_, authority))| (*low, authority.stake)) + .collect::>(); + low_quorum_rounds_with_stake.sort(); + + let mut total_stake = 0; + let mut network_low_quorum_round = 0; + + for (round, stake) in low_quorum_rounds_with_stake.iter().rev() { + let reached_quorum_before = total_stake >= quorum_threshold; + total_stake += stake; + if !reached_quorum_before && total_stake >= quorum_threshold { + network_low_quorum_round = *round; + break; + } + } + + network_low_quorum_round + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use super::*; + use crate::leader_scoring::ReputationScores; + + #[tokio::test] + async fn test_calculate_network_low_quorum_round() { + telemetry_subscribers::init_for_testing(); + let context = Arc::new(Context::new_for_test(4).0); + + let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]); + let mut ancestor_state_manager = AncestorStateManager::new(context, scores); + + // Quorum rounds are not set yet, so we should calculate a network quorum + // round of 0 to start. + let network_low_quorum_round = ancestor_state_manager.calculate_network_low_quorum_round(); + assert_eq!(network_low_quorum_round, 0); + + let quorum_rounds = vec![(225, 229), (225, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + + let network_low_quorum_round = ancestor_state_manager.calculate_network_low_quorum_round(); + assert_eq!(network_low_quorum_round, 225); + } + + // Test all state transitions + // Default all INCLUDE -> EXCLUDE + // EXCLUDE -> INCLUDE (BLocked due to lock) + // EXCLUDE -> INCLUDE (Pass due to lock expired) + // INCLUDE -> EXCLUDE (Blocked due to lock) + // INCLUDE -> EXCLUDE (Pass due to lock expired) + #[tokio::test] + async fn test_update_all_ancestor_state() { + telemetry_subscribers::init_for_testing(); + let context = Arc::new(Context::new_for_test(4).0); + + let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]); + let mut ancestor_state_manager = AncestorStateManager::new(context, scores); + + let quorum_rounds = vec![(225, 229), (225, 300), (229, 300), (229, 300)]; + ancestor_state_manager.set_quorum_round_per_authority(quorum_rounds); + ancestor_state_manager.update_all_ancestors_state(); + + // Score threshold for exclude is (4 * 10) / 100 = 0 + // No ancestors should be excluded in with this threshold + let state_map = ancestor_state_manager.get_ancestor_states(); + for state in state_map.values() { + assert_eq!(*state, AncestorState::Include); + } + + let scores = ReputationScores::new((1..=300).into(), vec![10, 10, 100, 100]); + ancestor_state_manager.set_propagation_scores(scores); + ancestor_state_manager.update_all_ancestors_state(); + + // Score threshold for exclude is (100 * 10) / 100 = 10 + // 2 authorities should be excluded in with this threshold + let state_map = ancestor_state_manager.get_ancestor_states(); + println!("{state_map:?}"); + for (authority, state) in state_map { + if (0..=1).contains(&authority.value()) { + assert_eq!(state, AncestorState::Exclude(10)); + } else { + assert_eq!(state, AncestorState::Include); + } + } + + let scores = ReputationScores::new((1..=300).into(), vec![100, 10, 100, 100]); + ancestor_state_manager.set_propagation_scores(scores); + ancestor_state_manager.update_all_ancestors_state(); + + // Score threshold for exclude is (100 * 10) / 100 = 10 + // No authority should be excluded in with this threshold, but authority 0 + // was just marked as excluded so it will remain in the EXCLUDED state + // until 5s pass. + let state_map = ancestor_state_manager.get_ancestor_states(); + println!("{state_map:?}"); + for (authority, state) in state_map { + if (0..=1).contains(&authority.value()) { + assert_eq!(state, AncestorState::Exclude(10)); + } else { + assert_eq!(state, AncestorState::Include); + } + } + + // Sleep until the lock expires + tokio::time::sleep_until(Instant::now() + Duration::from_secs(5)).await; + + ancestor_state_manager.update_all_ancestors_state(); + + // Authority 0 should now be included again because scores are above the + // threshold. Authority 1 should also be included again because quorum + // rounds are at the network quorum round threashold 225 >= 225 + let state_map = ancestor_state_manager.get_ancestor_states(); + println!("{state_map:?}"); + for (_, state) in state_map { + assert_eq!(state, AncestorState::Include); + } + + let scores = ReputationScores::new((1..=300).into(), vec![100, 10, 100, 100]); + ancestor_state_manager.set_propagation_scores(scores); + ancestor_state_manager.update_all_ancestors_state(); + + // Ancestor 1 cannot transition to EXCLUDE state until lock expires. + let state_map = ancestor_state_manager.get_ancestor_states(); + println!("{state_map:?}"); + for (_, state) in state_map { + assert_eq!(state, AncestorState::Include); + } + + // Sleep until the lock expires + tokio::time::sleep_until(Instant::now() + Duration::from_secs(5)).await; + + ancestor_state_manager.update_all_ancestors_state(); + + // Ancestor 1 can transition to EXCLUDE state now that the lock expired. + let state_map = ancestor_state_manager.get_ancestor_states(); + println!("{state_map:?}"); + for (authority, state) in state_map { + if authority.value() == 1 { + assert_eq!(state, AncestorState::Exclude(10)); + } else { + assert_eq!(state, AncestorState::Include); + } + } + } +} diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index 7e4b26ac99537..a811859e03c42 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -601,6 +601,7 @@ async fn make_recv_future( #[cfg(test)] mod tests { use crate::commit::CommitRange; + use crate::round_prober::QuorumRound; use crate::test_dag_builder::DagBuilder; use crate::{ authority_service::AuthorityService, @@ -665,7 +666,10 @@ mod tests { todo!() } - fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> { + fn set_propagation_delay_and_quorum_rounds( + &self, + _quorum_rounds: Vec, + ) -> Result<(), CoreError> { todo!() } diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 39b6f7b672121..ce251efcb9ee4 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -19,6 +19,7 @@ use tokio::{ use tracing::{debug, info, warn}; use crate::{ + ancestor::{AncestorState, AncestorStateManager}, block::{ Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, Round, SignedBlock, Slot, VerifiedBlock, GENESIS_ROUND, @@ -30,6 +31,7 @@ use crate::{ dag_state::DagState, error::{ConsensusError, ConsensusResult}, leader_schedule::LeaderSchedule, + round_prober::QuorumRound, stake_aggregator::{QuorumThreshold, StakeAggregator}, threshold_clock::ThresholdClock, transaction::TransactionConsumer, @@ -97,6 +99,8 @@ pub(crate) struct Core { /// This is currently being used to avoid equivocations during a node recovering from amnesia. When value is None it means that /// the last block sync mechanism is enabled, but it hasn't been initialised yet. last_known_proposed_round: Option, + + ancestor_state_manager: AncestorStateManager, } impl Core { @@ -151,6 +155,13 @@ impl Core { Some(0) }; + let propagation_scores = leader_schedule + .leader_swap_table + .read() + .reputation_scores + .clone(); + let ancestor_state_manager = AncestorStateManager::new(context.clone(), propagation_scores); + Self { context: context.clone(), threshold_clock: ThresholdClock::new(0, context.clone()), @@ -168,6 +179,7 @@ impl Core { block_signer, dag_state, last_known_proposed_round: min_propose_round, + ancestor_state_manager, } .recover() } @@ -369,6 +381,35 @@ impl Core { } } + // TODO: produce the block for the clock_round. As the threshold clock can advance many rounds at once (ex + // because we synchronized a bulk of blocks) we can decide here whether we want to produce blocks per round + // or just the latest one. From earlier experiments I saw only benefit on proposing for the penultimate round + // only when the validator was supposed to be the leader of the round - so we bring down the missed leaders. + // Probably proposing for all the intermediate rounds might not make much sense. + + // Determine the ancestors to be included in proposal + + // Smart ancestor selection requires distributed scoring to be enabled. + let ancestors = if self + .context + .protocol_config + .consensus_distributed_vote_scoring_strategy() + { + let ancestors = self.smart_ancestors_to_propose(clock_round, !force); + + // If we did not find enough good ancestors to propose, continue to wait before proposing. + if ancestors.is_empty() { + assert!( + !force, + "Ancestors should have been returned if force is true!" + ); + return None; + } + ancestors + } else { + self.ancestors_to_propose(clock_round) + }; + let leader_authority = &self .context .committee @@ -391,14 +432,6 @@ impl Core { .with_label_values(&[leader_authority]) .inc(); - // TODO: produce the block for the clock_round. As the threshold clock can advance many rounds at once (ex - // because we synchronized a bulk of blocks) we can decide here whether we want to produce blocks per round - // or just the latest one. From earlier experiments I saw only benefit on proposing for the penultimate round - // only when the validator was supposed to be the leader of the round - so we bring down the missed leaders. - // Probably proposing for all the intermediate rounds might not make much sense. - - // Determine the ancestors to be included in proposal - let ancestors = self.ancestors_to_propose(clock_round); self.context .metrics .node_metrics @@ -557,6 +590,15 @@ impl Core { { self.leader_schedule .update_leader_schedule_v2(&self.dag_state); + + let propagation_scores = self + .leader_schedule + .leader_swap_table + .read() + .reputation_scores + .clone(); + self.ancestor_state_manager + .set_propagation_scores(propagation_scores); } else { self.leader_schedule .update_leader_schedule_v1(&self.dag_state); @@ -644,10 +686,36 @@ impl Core { self.subscriber_exists = exists; } - /// Sets the delay by round for propagating blocks to a quorum. - pub(crate) fn set_propagation_delay(&mut self, delay: Round) { - info!("Propagation round delay set to: {delay}"); - self.propagation_delay = delay; + /// Sets the propagation delay for propagating blocks to a quorum and the + /// quorum round per authority for ancestor state manager. + pub(crate) fn set_propagation_delay_and_quorum_rounds( + &mut self, + quorum_rounds: Vec, + ) { + let own_index = self.context.own_index; + let last_proposed_round = self + .dag_state + .read() + .get_last_block_for_authority(own_index) + .round(); + let propagation_delay = last_proposed_round.saturating_sub(quorum_rounds[own_index].0); + + info!("Quorum round per authority in ancestor state manager set to: {quorum_rounds:?}"); + self.ancestor_state_manager + .set_quorum_round_per_authority(quorum_rounds); + self.context + .metrics + .node_metrics + .round_prober_propagation_delays + .observe(propagation_delay as f64); + self.context + .metrics + .node_metrics + .round_prober_last_propagation_delay + .set(propagation_delay as i64); + + info!("Propagation round delay for own authority set to: {propagation_delay}"); + self.propagation_delay = propagation_delay; } /// Sets the min propose round for the proposer allowing to propose blocks only for round numbers @@ -758,6 +826,196 @@ impl Core { ancestors } + /// Retrieves the next ancestors to propose to form a block at `clock_round` round. + /// If smart selection is enabled then this will try to select the best ancestors + /// based on the propagation scores of the authorities. + fn smart_ancestors_to_propose( + &mut self, + clock_round: Round, + smart_select: bool, + ) -> Vec { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["Core::smart_ancestors_to_propose"]) + .start_timer(); + + // Now take the ancestors before the clock_round (excluded) for each authority. + let ancestors = self + .dag_state + .read() + .get_last_cached_block_per_authority(clock_round); + + assert_eq!( + ancestors.len(), + self.context.committee.size(), + "Fatal error, number of returned ancestors don't match committee size." + ); + + // Ensure ancestor state is up to date before selecting for proposal. + self.ancestor_state_manager.update_all_ancestors_state(); + let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states(); + + let mut temp_excluded_ancestors = Vec::new(); + + // Propose only ancestors of higher rounds than what has already been proposed. + // And always include own last proposed block first among ancestors. + // Start by only including the high scoring ancestors. Low scoring ancestors + // will be included in a second pass below. + let included_ancestors = iter::once(self.last_proposed_block.clone()) + .chain( + ancestors + .into_iter() + .filter(|ancestor| ancestor.author() != self.context.own_index) + .flat_map(|ancestor| { + + let ancestor_state = ancestor_state_map.get(&ancestor.author()).expect( + &format!("Should have ancestor state for {}", ancestor.author()), + ); + + match ancestor_state { + AncestorState::Include => { + debug!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}"); + } + AncestorState::Exclude(score) => { + debug!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}"); + temp_excluded_ancestors.push((*score, ancestor)); + return None; + } + } + + if let Some(last_block_ref) = + self.last_included_ancestors[ancestor.author()] + { + return (last_block_ref.round < ancestor.round()).then_some(ancestor); + } + Some(ancestor) + }), + ) + .collect::>(); + + let mut parent_round_quorum = StakeAggregator::::new(); + + // Check total stake of high scoring parent round ancestors + for ancestor in included_ancestors + .iter() + .filter(|a| a.round() == clock_round - 1) + { + parent_round_quorum.add(ancestor.author(), &self.context.committee); + } + + if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) { + debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); + return vec![]; + } + + // Now we can update the last included ancestor block refs as there is no way + // to short circuit ancestor proposal. + for ancestor in included_ancestors.iter() { + debug!( + "Included ancestor {ancestor} to propose for round {clock_round}, own_block = {}", + ancestor.author() == self.context.own_index + ); + self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference()); + } + + // Sort scores descending so we can include the best of the temp excluded + // ancestors first until we reach the threshold. + temp_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0)); + + let mut ancestors_to_propose = included_ancestors; + let mut excluded_ancestors = Vec::new(); + + for (score, ancestor) in temp_excluded_ancestors.into_iter() { + if !parent_round_quorum.reached_threshold(&self.context.committee) + && ancestor.round() == clock_round - 1 + { + debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); + parent_round_quorum.add(ancestor.author(), &self.context.committee); + self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference()); + ancestors_to_propose.push(ancestor); + } else { + excluded_ancestors.push((score, ancestor)); + } + } + + // Inclusion of weak links for low score ancestors so there is no long + // list of blocks that we need to include later. + for (score, ancestor) in excluded_ancestors.iter() { + let excluded_author = ancestor.author(); + let block_hostname = &self.context.committee.authority(excluded_author).hostname; + let mut network_low_quorum_round = + self.ancestor_state_manager.quorum_round_per_authority[excluded_author].0; + + // If the network quourum round for this ancestor is greater than or equal + // to the clock round then we want to make sure to set it to clock_round - 1 + // as that is the max round we can include as an ancestor. + network_low_quorum_round = network_low_quorum_round.min(clock_round - 1); + + if let Some(last_block_ref) = self.last_included_ancestors[excluded_author] { + if last_block_ref.round < network_low_quorum_round { + if ancestor.round() == network_low_quorum_round { + // Include the ancestor block as it has been seen by a strong quorum + self.last_included_ancestors[excluded_author] = Some(ancestor.reference()); + ancestors_to_propose.push(ancestor.clone()); + debug!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); + self.context + .metrics + .node_metrics + .included_excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + continue; + } + + // Fetch ancestor block from the store for older round + let blocks = self + .dag_state + .read() + .get_uncommitted_blocks_at_slot(Slot::new( + network_low_quorum_round, + excluded_author, + )); + + if let Some(block) = blocks.first() { + self.last_included_ancestors[excluded_author] = Some(block.reference()); + ancestors_to_propose.push(block.clone()); + debug!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); + self.context + .metrics + .node_metrics + .included_excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + continue; + } else { + debug!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); + } + } + } + + debug!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}"); + self.context + .metrics + .node_metrics + .excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname]) + .inc(); + } + + info!( + "Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}", + ancestors_to_propose.len(), + excluded_ancestors.len(), + ); + + assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {}. Possible mismatch between DagState and Core.", clock_round); + + ancestors_to_propose + } + /// Checks whether all the leaders of the round exist. /// TODO: we can leverage some additional signal here in order to more cleverly manipulate later the leader timeout /// Ex if we already have one leader - the first in order - we might don't want to wait as much. @@ -1606,6 +1864,117 @@ mod test { } } + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() { + telemetry_subscribers::init_for_testing(); + + // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time + // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each + // Core's clock will have initialised potentially with different values but it never advances. + // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time + // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the + // tokio clock. + async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) { + // Simulate the time wait before processing a block to ensure that block.timestamp <= now + let now = context.clock.timestamp_utc_ms(); + let max_timestamp = blocks + .iter() + .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs) + .map(|block| block.timestamp_ms()) + .unwrap_or(0); + + let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now)); + sleep(wait_time).await; + } + + let (context, _) = Context::new_for_test(4); + + // Create the cores for all authorities + let mut all_cores = create_cores(context, vec![1, 1, 1, 1]); + let (_last_core, cores) = all_cores.split_last_mut().unwrap(); + + // Create blocks for rounds 1..=30 from all Cores except last Core of authority 3. + let mut last_round_blocks = Vec::::new(); + for round in 1..=30 { + let mut this_round_blocks = Vec::new(); + + for core_fixture in cores.iter_mut() { + wait_blocks(&last_round_blocks, &core_fixture.core.context).await; + + core_fixture + .core + .add_blocks(last_round_blocks.clone()) + .unwrap(); + + // Only when round > 1 and using non-genesis parents. + if let Some(r) = last_round_blocks.first().map(|b| b.round()) { + assert_eq!(round - 1, r); + if core_fixture.core.last_proposed_round() == r { + // Force propose new block regardless of min round delay. + core_fixture + .core + .try_propose(true) + .unwrap() + .unwrap_or_else(|| { + panic!("Block should have been proposed for round {}", round) + }); + } + } + + assert_eq!(core_fixture.core.last_proposed_round(), round); + + this_round_blocks.push(core_fixture.core.last_proposed_block.clone()); + } + + last_round_blocks = this_round_blocks; + } + + // Now produce blocks for all Cores + for round in 31..=40 { + let mut this_round_blocks = Vec::new(); + + for core_fixture in all_cores.iter_mut() { + wait_blocks(&last_round_blocks, &core_fixture.core.context).await; + + core_fixture + .core + .add_blocks(last_round_blocks.clone()) + .unwrap(); + + // Only when round > 1 and using non-genesis parents. + if let Some(r) = last_round_blocks.first().map(|b| b.round()) { + assert_eq!(round - 1, r); + if core_fixture.core.last_proposed_round() == r { + // Force propose new block regardless of min round delay. + core_fixture + .core + .try_propose(true) + .unwrap() + .unwrap_or_else(|| { + panic!("Block should have been proposed for round {}", round) + }); + } + } + + this_round_blocks.push(core_fixture.core.last_proposed_block.clone()); + + for block in this_round_blocks.iter() { + if block.author() != AuthorityIndex::new_for_test(3) { + // Assert blocks created include only 3 ancestors per block as one + // should be excluded + assert_eq!(block.ancestors().len(), 3); + } else { + // Authority 3 is the low scoring authority so it will still include + // its own blocks. + assert_eq!(block.ancestors().len(), 4); + } + } + } + + last_round_blocks = this_round_blocks; + } + } + #[tokio::test] async fn test_core_set_subscriber_exists() { telemetry_subscribers::init_for_testing(); @@ -1671,7 +2040,7 @@ mod test { } #[tokio::test] - async fn test_core_set_propagation_delay() { + async fn test_core_set_propagation_delay_per_authority() { // TODO: create helper to avoid the duplicated code here. telemetry_subscribers::init_for_testing(); let (context, mut key_pairs) = Context::new_for_test(4); @@ -1725,8 +2094,8 @@ mod test { "No block should have been created other than genesis" ); - // Use a large propagation delay to disable proposing. - core.set_propagation_delay(1000); + // Use a large propagation for own authority delay to disable proposing. + core.propagation_delay = 1000; // Make propagation delay the only reason for not proposing. core.set_subscriber_exists(true); @@ -1734,8 +2103,8 @@ mod test { // There is no proposal even with forced proposing. assert!(core.try_propose(true).unwrap().is_none()); - // Let Core know there is no propagation delay. - core.set_propagation_delay(0); + // Let Core know there is no propagation delay for own authority. + core.propagation_delay = 0; // Proposing now would succeed. assert!(core.try_propose(true).unwrap().is_some()); diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index 1e9ec7e925c7b..82e5a9a483363 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -27,6 +27,7 @@ use crate::{ core_thread::CoreError::Shutdown, dag_state::DagState, error::{ConsensusError, ConsensusResult}, + round_prober::QuorumRound, BlockAPI as _, }; @@ -65,8 +66,13 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static { /// It is not a guarantee that produced blocks will be accepted by peers. fn set_subscriber_exists(&self, exists: bool) -> Result<(), CoreError>; - /// Sets the estimated delay to propagate a block to a quorum of peers, in number of rounds. - fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError>; + /// Sets the estimated delay per authority to propagate a block to a quorum of peers, + /// in number of rounds and the quorum rounds per authority for ancestor state + /// manager. + fn set_propagation_delay_and_quorum_rounds( + &self, + quorum_rounds: Vec, + ) -> Result<(), CoreError>; fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>; @@ -91,7 +97,7 @@ struct CoreThread { core: Core, receiver: Receiver, rx_subscriber_exists: watch::Receiver, - rx_propagation_delay: watch::Receiver, + rx_quorum_rounds: watch::Receiver>, rx_last_known_proposed_round: watch::Receiver, context: Arc, } @@ -141,11 +147,11 @@ impl CoreThread { self.core.new_block(Round::MAX, true)?; } } - _ = self.rx_propagation_delay.changed() => { - let _scope = monitored_scope("CoreThread::loop::set_propagation_delay"); + _ = self.rx_quorum_rounds.changed() => { + let _scope = monitored_scope("CoreThread::loop::set_propagation_delay_per_authority"); let should_propose_before = self.core.should_propose(); - let delay = *self.rx_propagation_delay.borrow(); - self.core.set_propagation_delay(delay); + let quorum_rounds = self.rx_quorum_rounds.borrow().clone(); + self.core.set_propagation_delay_and_quorum_rounds(quorum_rounds); if !should_propose_before && self.core.should_propose() { // If core cannnot propose before but can propose now, try to produce a new block to ensure liveness, // because block proposal could have been skipped. @@ -164,7 +170,7 @@ pub(crate) struct ChannelCoreThreadDispatcher { context: Arc, sender: WeakSender, tx_subscriber_exists: Arc>, - tx_propagation_delay: Arc>, + tx_quorum_rounds: Arc>>, tx_last_known_proposed_round: Arc>, highest_received_rounds: Arc>, } @@ -190,16 +196,17 @@ impl ChannelCoreThreadDispatcher { let (sender, receiver) = channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE); let (tx_subscriber_exists, mut rx_subscriber_exists) = watch::channel(false); - let (tx_propagation_delay, mut rx_propagation_delay) = watch::channel(0); + let (tx_quorum_rounds, mut rx_quorum_rounds) = + watch::channel(vec![(0, 0); context.committee.size()]); let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0); rx_subscriber_exists.mark_unchanged(); - rx_propagation_delay.mark_unchanged(); + rx_quorum_rounds.mark_unchanged(); rx_last_known_proposed_round.mark_unchanged(); let core_thread = CoreThread { core, receiver, rx_subscriber_exists, - rx_propagation_delay, + rx_quorum_rounds, rx_last_known_proposed_round, context: context.clone(), }; @@ -221,7 +228,7 @@ impl ChannelCoreThreadDispatcher { context, sender: sender.downgrade(), tx_subscriber_exists: Arc::new(tx_subscriber_exists), - tx_propagation_delay: Arc::new(tx_propagation_delay), + tx_quorum_rounds: Arc::new(tx_quorum_rounds), tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round), highest_received_rounds: Arc::new(highest_received_rounds), }; @@ -279,9 +286,12 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher { .map_err(|e| Shutdown(e.to_string())) } - fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError> { - self.tx_propagation_delay - .send(delay) + fn set_propagation_delay_and_quorum_rounds( + &self, + quorum_rounds: Vec, + ) -> Result<(), CoreError> { + self.tx_quorum_rounds + .send(quorum_rounds) .map_err(|e| Shutdown(e.to_string())) } @@ -353,7 +363,10 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher { todo!() } - fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> { + fn set_propagation_delay_and_quorum_rounds( + &self, + _quorum_rounds: Vec, + ) -> Result<(), CoreError> { todo!() } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index cb1112633287e..6a4c42688468c 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -877,8 +877,13 @@ impl DagState { self.scoring_subdag.is_empty() } - pub(crate) fn calculate_scoring_subdag_scores(&self) -> ReputationScores { - self.scoring_subdag.calculate_scores() + pub(crate) fn calculate_scoring_subdag_distributed_vote_scores(&self) -> ReputationScores { + self.scoring_subdag.calculate_distributed_vote_scores() + } + + #[allow(unused)] + pub(crate) fn calculate_scoring_subdag_certified_vote_scores(&self) -> ReputationScores { + self.scoring_subdag.calculate_certified_vote_scores() } pub(crate) fn scoring_subdag_commit_range(&self) -> CommitIndex { diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index 756cc9361d86b..0733e5bc2c636 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -136,15 +136,32 @@ impl LeaderSchedule { .with_label_values(&["LeaderSchedule::update_leader_schedule"]) .start_timer(); + let s1 = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["LeaderSchedule::update_leader_schedule::calculate_scoring_subdag_distributed_vote_scores"]) + .start_timer(); let (reputation_scores, last_commit_index) = { let dag_state = dag_state.read(); - let reputation_scores = dag_state.calculate_scoring_subdag_scores(); + let reputation_scores = dag_state.calculate_scoring_subdag_distributed_vote_scores(); let last_commit_index = dag_state.scoring_subdag_commit_range(); (reputation_scores, last_commit_index) }; + drop(s1); + let s2 = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&[ + "LeaderSchedule::update_leader_schedule::clear_scoring_subdag_&_add_commit_info", + ]) + .start_timer(); { let mut dag_state = dag_state.write(); // Clear scoring subdag as we have updated the leader schedule @@ -152,13 +169,31 @@ impl LeaderSchedule { // Buffer score and last commit rounds in dag state to be persisted later dag_state.add_commit_info(reputation_scores.clone()); } + drop(s2); + let s3 = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&[ + "LeaderSchedule::update_leader_schedule::update_leader_swap_table", + ]) + .start_timer(); self.update_leader_swap_table(LeaderSwapTable::new( self.context.clone(), last_commit_index, reputation_scores.clone(), )); + drop(s3); + let s4 = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["LeaderSchedule::update_leader_schedule::update_metrics"]) + .start_timer(); reputation_scores.update_metrics(self.context.clone()); self.context @@ -166,6 +201,7 @@ impl LeaderSchedule { .node_metrics .num_of_bad_nodes .set(self.leader_swap_table.read().bad_nodes.len() as i64); + drop(s4); } // TODO: Remove when DistributedVoteScoring is enabled. @@ -747,7 +783,9 @@ mod tests { dag_state.read().last_committed_rounds() ); assert_eq!(1, dag_state.read().scoring_subdags_count()); - let recovered_scores = dag_state.read().calculate_scoring_subdag_scores(); + let recovered_scores = dag_state + .read() + .calculate_scoring_subdag_distributed_vote_scores(); let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]); assert_eq!(recovered_scores, expected_scores); @@ -863,7 +901,9 @@ mod tests { expected_scored_subdags.len(), dag_state.read().scoring_subdags_count() ); - let recovered_scores = dag_state.read().calculate_scoring_subdag_scores(); + let recovered_scores = dag_state + .read() + .calculate_scoring_subdag_distributed_vote_scores(); let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]); assert_eq!(recovered_scores, expected_scores); diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index c7b151607cd01..ad874afad389a 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -129,6 +129,10 @@ impl ReputationScores { } } + pub(crate) fn high_score(&self) -> u64 { + *self.scores_per_authority.iter().max().unwrap_or(&0) + } + // Returns the authorities index with score tuples. pub(crate) fn authorities_by_score(&self, context: Arc) -> Vec<(AuthorityIndex, u64)> { self.scores_per_authority @@ -258,17 +262,9 @@ impl ScoringSubdag { } // Iterate through votes and calculate scores for each authority based on - // scoring strategy that is used. (Vote or CertifiedVote) - pub(crate) fn calculate_scores(&self) -> ReputationScores { - let _s = self - .context - .metrics - .node_metrics - .scope_processing_time - .with_label_values(&["ScoringSubdag::calculate_scores"]) - .start_timer(); - - let scores_per_authority = self.score_distributed_votes(); + // distributed vote scoring strategy. + pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores { + let scores_per_authority = self.distributed_votes_scores(); // TODO: Normalize scores ReputationScores::new( @@ -283,7 +279,15 @@ impl ScoringSubdag { /// Instead of only giving one point for each vote that is included in 2f+1 /// blocks. We give a score equal to the amount of stake of all blocks that /// included the vote. - fn score_distributed_votes(&self) -> Vec { + fn distributed_votes_scores(&self) -> Vec { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["ScoringSubdag::score_distributed_votes"]) + .start_timer(); + let num_authorities = self.context.committee.size(); let mut scores_per_authority = vec![0_u64; num_authorities]; @@ -299,12 +303,33 @@ impl ScoringSubdag { scores_per_authority } + // Iterate through votes and calculate scores for each authority based on + // certified vote scoring strategy. + #[allow(unused)] + pub(crate) fn calculate_certified_vote_scores(&self) -> ReputationScores { + let scores_per_authority = self.certified_votes_scores(); + + ReputationScores::new( + self.commit_range + .clone() + .expect("CommitRange should be set if calculate scores is called."), + scores_per_authority, + ) + } + /// This scoring strategy gives points equal to the amount of stake in blocks /// that include the authority's vote, if that amount of total_stake > 2f+1. /// We consider this a certified vote. - // TODO: This will be used for ancestor selection #[allow(unused)] - fn score_certified_votes(&self) -> Vec { + pub fn certified_votes_scores(&self) -> Vec { + let _s = self + .context + .metrics + .node_metrics + .scope_processing_time + .with_label_values(&["ScoringSubdag::score_certified_votes"]) + .start_timer(); + let num_authorities = self.context.committee.size(); let mut scores_per_authority = vec![0_u64; num_authorities]; @@ -573,7 +598,7 @@ mod tests { scoring_subdag.add_subdags(vec![subdag]); } - let scores = scoring_subdag.calculate_scores(); + let scores = scoring_subdag.calculate_distributed_vote_scores(); assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]); assert_eq!(scores.commit_range, (1..=4).into()); } @@ -619,9 +644,9 @@ mod tests { scoring_subdag.add_subdags(vec![subdag]); } - let scores_per_authority = scoring_subdag.score_certified_votes(); - assert_eq!(scores_per_authority, vec![4, 4, 4, 4]); - assert_eq!(scoring_subdag.commit_range.unwrap(), (1..=4).into()); + let scores = scoring_subdag.calculate_certified_vote_scores(); + assert_eq!(scores.scores_per_authority, vec![4, 4, 4, 4]); + assert_eq!(scores.commit_range, (1..=4).into()); } // TODO: Remove all tests below this when DistributedVoteScoring is enabled. diff --git a/consensus/core/src/leader_timeout.rs b/consensus/core/src/leader_timeout.rs index 4c2c15b1bbc55..2213d8f0d715b 100644 --- a/consensus/core/src/leader_timeout.rs +++ b/consensus/core/src/leader_timeout.rs @@ -133,6 +133,7 @@ mod tests { use crate::core::CoreSignals; use crate::core_thread::{CoreError, CoreThreadDispatcher}; use crate::leader_timeout::LeaderTimeoutTask; + use crate::round_prober::QuorumRound; #[derive(Clone, Default)] struct MockCoreThreadDispatcher { @@ -171,7 +172,10 @@ mod tests { todo!() } - fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> { + fn set_propagation_delay_and_quorum_rounds( + &self, + _quorum_rounds: Vec, + ) -> Result<(), CoreError> { todo!() } diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index b7597853dbd2a..cb40c9a61c59a 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +mod ancestor; mod authority_node; mod authority_service; mod base_committer; diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 25a1a1013f2e4..8f8cd19428879 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -140,6 +140,9 @@ pub(crate) struct NodeMetrics { pub(crate) commit_round_advancement_interval: Histogram, pub(crate) last_decided_leader_round: IntGauge, pub(crate) leader_timeout_total: IntCounterVec, + pub(crate) ancestor_state_change_by_authority: IntCounterVec, + pub(crate) excluded_proposal_ancestors_count_by_authority: IntCounterVec, + pub(crate) included_excluded_proposal_ancestors_count_by_authority: IntCounterVec, pub(crate) missing_blocks_total: IntCounter, pub(crate) missing_blocks_after_fetch_total: IntCounter, pub(crate) num_of_bad_nodes: IntGauge, @@ -173,6 +176,7 @@ pub(crate) struct NodeMetrics { pub(crate) commit_sync_fetch_once_latency: Histogram, pub(crate) commit_sync_fetch_once_errors: IntCounterVec, pub(crate) round_prober_quorum_round_gaps: IntGaugeVec, + pub(crate) round_prober_low_quorum_round: IntGaugeVec, pub(crate) round_prober_propagation_delays: Histogram, pub(crate) round_prober_last_propagation_delay: IntGauge, pub(crate) round_prober_request_errors: IntCounter, @@ -430,6 +434,24 @@ impl NodeMetrics { &["timeout_type"], registry, ).unwrap(), + ancestor_state_change_by_authority: register_int_counter_vec_with_registry!( + "ancestor_state_change_by_authority", + "The total number of times an ancestor state changed to EXCLUDE or INCLUDE.", + &["authority", "state"], + registry, + ).unwrap(), + excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!( + "excluded_proposal_ancestors_count_by_authority", + "Total number of excluded ancestors per authority during proposal.", + &["authority"], + registry, + ).unwrap(), + included_excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!( + "included_excluded_proposal_ancestors_count_by_authority", + "Total number of included excluded ancestors per authority during proposal.", + &["authority"], + registry, + ).unwrap(), missing_blocks_total: register_int_counter_with_registry!( "missing_blocks_total", "Total cumulative number of missing blocks", @@ -610,6 +632,12 @@ impl NodeMetrics { &["authority"], registry ).unwrap(), + round_prober_low_quorum_round: register_int_gauge_vec_with_registry!( + "round_prober_low_quorum_round", + "Low quorum round among peers for blocks proposed from each authority", + &["authority"], + registry + ).unwrap(), round_prober_propagation_delays: register_histogram_with_registry!( "round_prober_propagation_delays", "Round gaps between the last proposed block round and the lower bound of own quorum round", diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index ea29b6345d7c3..d8e8503d2deef 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -117,7 +117,7 @@ impl RoundProber { // Probes each peer for the latest rounds they received from others. // Returns the quorum round for each authority, and the propagation delay // of own blocks. - pub(crate) async fn probe(&self) -> (Vec, Round) { + pub(crate) async fn probe(&self) -> Vec { let _scope = monitored_scope("RoundProber"); let request_timeout = @@ -208,6 +208,12 @@ impl RoundProber { .round_prober_quorum_round_gaps .with_label_values(&[&authority.hostname]) .set((high - low) as i64); + self.context + .metrics + .node_metrics + .round_prober_low_quorum_round + .with_label_values(&[&authority.hostname]) + .set(*low as i64); } // TODO: consider using own quorum round gap to control proposing in addition to // propagation delay. For now they seem to be about the same. @@ -218,28 +224,17 @@ impl RoundProber { // that can reduce round rate. // Because of the nature of TCP and block streaming, propagation delay is expected to be // 0 in most cases, even when the actual latency of broadcasting blocks is high. - let propagation_delay = last_proposed_round.saturating_sub(quorum_rounds[own_index].0); - self.context - .metrics - .node_metrics - .round_prober_propagation_delays - .observe(propagation_delay as f64); - self.context - .metrics - .node_metrics - .round_prober_last_propagation_delay - .set(propagation_delay as i64); if let Err(e) = self .core_thread_dispatcher - .set_propagation_delay(propagation_delay) + .set_propagation_delay_and_quorum_rounds(quorum_rounds.clone()) { tracing::warn!( - "Failed to set propagation delay {propagation_delay} on Core: {:?}", + "Failed to set propagation delay and quorum rounds {quorum_rounds:?} on Core: {:?}", e ); } - (quorum_rounds, propagation_delay) + quorum_rounds } } @@ -306,21 +301,23 @@ mod test { Round, TestBlock, VerifiedBlock, }; + use super::QuorumRound; + struct FakeThreadDispatcher { highest_received_rounds: Vec, - propagation_delay: Mutex, + quorum_rounds: Mutex>, } impl FakeThreadDispatcher { fn new(highest_received_rounds: Vec) -> Self { Self { highest_received_rounds, - propagation_delay: Mutex::new(0), + quorum_rounds: Mutex::new(vec![]), } } - fn propagation_delay(&self) -> Round { - *self.propagation_delay.lock() + fn quorum_rounds(&self) -> Vec { + self.quorum_rounds.lock().clone() } } @@ -345,9 +342,12 @@ mod test { unimplemented!() } - fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError> { - let mut propagation_delay = self.propagation_delay.lock(); - *propagation_delay = delay; + fn set_propagation_delay_and_quorum_rounds( + &self, + quorum_rounds: Vec, + ) -> Result<(), CoreError> { + let mut quorum_round_per_authority = self.quorum_rounds.lock(); + *quorum_round_per_authority = quorum_rounds; Ok(()) } @@ -441,6 +441,8 @@ mod test { async fn test_round_prober() { const NUM_AUTHORITIES: usize = 7; let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0); + let own_index = context.own_index; + let last_proposed_round = 110; let core_thread_dispatcher = Arc::new(FakeThreadDispatcher::new(vec![ 110, 120, 130, 140, 150, 160, 170, ])); @@ -464,7 +466,7 @@ mod test { ); // Fake last proposed round to be 110. - let block = VerifiedBlock::new_for_test(TestBlock::new(110, 0).build()); + let block = VerifiedBlock::new_for_test(TestBlock::new(last_proposed_round, 0).build()); dag_state.write().accept_block(block); // Compute quorum rounds and propagation delay based on last proposed round = 110, @@ -477,7 +479,7 @@ mod test { // 105, 115, 103, 0, 125, 126, 127, // 0, 0, 0, 0, 0, 0, 0, - let (quorum_rounds, propagation_delay) = prober.probe().await; + let quorum_rounds = prober.probe().await; assert_eq!( quorum_rounds, @@ -491,10 +493,22 @@ mod test { (107, 170) ] ); + assert_eq!( + core_thread_dispatcher.quorum_rounds(), + vec![ + (100, 105), + (0, 115), + (103, 130), + (0, 0), + (105, 150), + (106, 160), + (107, 170) + ] + ); // 110 - 100 = 10 - assert_eq!(propagation_delay, 10); - assert_eq!(core_thread_dispatcher.propagation_delay(), 10); + let propogation_delay = last_proposed_round.saturating_sub(quorum_rounds[own_index].0); + assert_eq!(propogation_delay, 10); } #[tokio::test] diff --git a/rustfmt.toml b/rustfmt.toml index 55797efb6bd4a..dd5b9aa954ddf 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,2 +1,5 @@ edition = "2021" use_field_init_shorthand = true + +group_imports = "StdExternalCrate" +imports_granularity = "Crate"