From 52a6e7b205b11456134e54980e6864a862ba0f0d Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Mon, 24 Jul 2023 09:52:27 -0700 Subject: [PATCH] [Narwhal] wait to cancel header proposal after higher round certificates have been created (#13002) ## Description Cancel header proposal less eagerly, by waiting until a higher round certificate has been created locally, instead of immediately observing a higher round certificate. Orphaned certificates can still be useful as votes on previous leaders. Transactions in the orphaned certificates will be retried, same as those in cancelled headers. The main goal is to make the system behavior more intuitive. Also this change may help keeping latency consistent when headers are proposed more frequently for any reason. ## Test Plan CI Deploey to private test and observed no change in p50 latency and p95 jitter. --- If your changes are not user-facing and not a breaking change, you can skip the following section. Otherwise, please indicate what changed, and then add to the Release Notes section as highlighted during the release process. ### Type of Change (Check all that apply) - [ ] protocol change - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes --- narwhal/primary/src/certifier.rs | 35 +++++-- narwhal/primary/src/primary.rs | 2 +- narwhal/primary/src/synchronizer.rs | 52 +++++++---- .../primary/src/tests/synchronizer_tests.rs | 15 +-- narwhal/storage/src/certificate_store.rs | 92 +++++++++---------- 5 files changed, 104 insertions(+), 92 deletions(-) diff --git a/narwhal/primary/src/certifier.rs b/narwhal/primary/src/certifier.rs index 82f7cd9c39994..38b2e7855f45f 100644 --- a/narwhal/primary/src/certifier.rs +++ b/narwhal/primary/src/certifier.rs @@ -11,6 +11,8 @@ use futures::StreamExt; use mysten_metrics::metered_channel::Receiver; use mysten_metrics::{monitored_future, spawn_logged_monitored_task}; use network::anemo_ext::NetworkExt; +use parking_lot::Mutex; +use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; use storage::{CertificateStore, HeaderStore}; @@ -24,7 +26,7 @@ use types::{ ensure, error::{DagError, DagResult}, Certificate, CertificateDigest, ConditionalBroadcastReceiver, Header, HeaderAPI, - PrimaryToPrimaryClient, RequestVoteRequest, Vote, VoteAPI, + PrimaryToPrimaryClient, RequestVoteRequest, Round, Vote, VoteAPI, }; #[cfg(test)] @@ -53,9 +55,8 @@ pub struct Certifier { rx_shutdown: ConditionalBroadcastReceiver, /// Receives our newly created headers from the `Proposer`. rx_headers: Receiver
, - /// Used to cancel vote requests for a previously-proposed header that is being replaced - /// before a certificate could be formed. - cancel_proposed_header: Option>, + /// Used to cancel vote requests for proposing header that are no longer needed. + cancel_proposing_headers: Arc)>>>, /// Handle to propose_header task. Our target is to have only one task running always, thus /// we cancel the previously running before we spawn the next one. However, we don't wait for /// the previous to finish to spawn the new one, so we might temporarily have more that one @@ -93,7 +94,7 @@ impl Certifier { signature_service, rx_shutdown, rx_headers, - cancel_proposed_header: None, + cancel_proposing_headers: Default::default(), propose_header_tasks: JoinSet::new(), network: primary_network, metrics, @@ -247,6 +248,7 @@ impl Certifier { metrics: Arc, network: anemo::Network, header: Header, + cancel_proposing_headers: Arc)>>>, mut cancel: oneshot::Receiver<()>, ) -> DagResult { if header.epoch() != committee.epoch() { @@ -337,6 +339,15 @@ impl Certifier { })?; debug!("Assembled {certificate:?}"); + let mut cancel_proposing_headers = cancel_proposing_headers.lock(); + while let Some(&(round, _)) = cancel_proposing_headers.front() { + if round > header.round() { + break; + } + let (_round, sender) = cancel_proposing_headers.pop_front().unwrap(); + let _ = sender.send(()); + } + Ok(certificate) } @@ -369,10 +380,16 @@ impl Certifier { // TODO: move logic into Proposer. Some(header) = self.rx_headers.recv() => { let (tx_cancel, rx_cancel) = oneshot::channel(); - if let Some(cancel) = self.cancel_proposed_header { - let _ = cancel.send(()); + { + let mut cancel_proposing_headers = self.cancel_proposing_headers.lock(); + cancel_proposing_headers.push_back((header.round(), tx_cancel)); + + const MAX_PROPOSING_HEADERS: usize = 3; + while cancel_proposing_headers.len() > MAX_PROPOSING_HEADERS { + let (_round, sender) = cancel_proposing_headers.pop_front().unwrap(); + let _ = sender.send(()); + } } - self.cancel_proposed_header = Some(tx_cancel); let name = self.authority_id; let committee = self.committee.clone(); @@ -381,6 +398,7 @@ impl Certifier { let signature_service = self.signature_service.clone(); let metrics = self.metrics.clone(); let network = self.network.clone(); + let cancel_proposing_headers = self.cancel_proposing_headers.clone(); fail_point_async!("narwhal-delay"); self.propose_header_tasks.spawn(monitored_future!(Self::propose_header( name, @@ -391,6 +409,7 @@ impl Certifier { metrics, network, header, + cancel_proposing_headers, rx_cancel, ))); Ok(()) diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 3b0fee768edef..9a0c1d02125fc 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -822,7 +822,7 @@ impl PrimaryReceiverHandler { // certificates to still have a chance to be included in the DAG while not wasting // resources on very old vote requests. This value affects performance but not correctness // of the algorithm. - const HEADER_AGE_LIMIT: Round = 3; + const HEADER_AGE_LIMIT: Round = 5; // Lock to ensure consistency between limit_round and where parent_digests are gc'ed. let mut parent_digests = self.parent_digests.lock(); diff --git a/narwhal/primary/src/synchronizer.rs b/narwhal/primary/src/synchronizer.rs index c881f98bfc4c2..1c1d41403d55a 100644 --- a/narwhal/primary/src/synchronizer.rs +++ b/narwhal/primary/src/synchronizer.rs @@ -2,7 +2,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use anemo::{rpc::Status, Network, Request, Response}; -use config::{AuthorityIdentifier, Committee, Epoch, WorkerCache}; +use config::{AuthorityIdentifier, Committee, Epoch, Stake, WorkerCache}; use consensus::consensus::ConsensusRound; use crypto::NetworkPublicKey; use fastcrypto::hash::Hash as _; @@ -362,11 +362,37 @@ impl Synchronizer { let inner_proposer = inner.clone(); spawn_logged_monitored_task!( async move { - let last_round_certificates = inner_proposer - .certificate_store - .last_two_rounds_certs() - .expect("Failed recovering certificates in primary core"); - for certificate in last_round_certificates { + let highest_round_number = inner_proposer.certificate_store.highest_round_number(); + let mut certificates = vec![]; + // The last or last 2 rounds are sufficient for recovery. + for i in 0..2 { + let round = highest_round_number - i; + // Do not recover genesis certificates. They are initialized into certificate + // aggregator already. + if round == 0 { + break; + } + let round_certs = inner_proposer + .certificate_store + .at_round(round) + .expect("Failed recovering certificates in primary core"); + let stake: Stake = round_certs + .iter() + .map(|c: &Certificate| inner_proposer.committee.stake_by_id(c.origin())) + .sum(); + certificates.extend(round_certs.into_iter()); + // If a round has a quorum of certificates, enough have recovered because + // a header can be proposed with these parents. + if stake >= inner_proposer.committee.quorum_threshold() { + break; + } else { + // Only the last round can have less than a quorum of stake. + assert_eq!(i, 0); + } + } + // Unnecessary to append certificates in ascending round order, but it doesn't + // hurt either. + for certificate in certificates.into_iter().rev() { if let Err(e) = inner_proposer .append_certificate_in_aggregator(certificate) .await @@ -698,20 +724,6 @@ impl Synchronizer { .with_label_values(&[certificate_source]) .set(highest_received_round as i64); - // Let the proposer draw early conclusions from a certificate at this round and epoch, without its - // parents or payload (which we may not have yet). - // - // Since our certificate is well-signed, it shows a majority of honest signers stand at round r, - // so to make a successful proposal, our proposer must use parents at least at round r-1. - // - // This allows the proposer not to fire proposals at rounds strictly below the certificate we witnessed. - let minimal_round_for_parents = certificate.round().saturating_sub(1); - self.inner - .tx_parents - .send((vec![], minimal_round_for_parents, certificate.epoch())) - .await - .map_err(|_| DagError::ShuttingDown)?; - // Instruct workers to download any missing batches referenced in this certificate. // Since this header got certified, we are sure that all the data it refers to (ie. its batches and its parents) are available. // We can thus continue the processing of the certificate without blocking on batch synchronization. diff --git a/narwhal/primary/src/tests/synchronizer_tests.rs b/narwhal/primary/src/tests/synchronizer_tests.rs index 6f620ba4d1e82..10ef14dc9445c 100644 --- a/narwhal/primary/src/tests/synchronizer_tests.rs +++ b/narwhal/primary/src/tests/synchronizer_tests.rs @@ -90,13 +90,6 @@ async fn accept_certificates() { } // Ensure the Synchronizer sends the parents of the certificates to the proposer. - // - // The first messages are the Synchronizer letting us know about the round of parent certificates - for _i in 0..3 { - let received = rx_parents.recv().await.unwrap(); - assert_eq!(received, (vec![], 0, 0)); - } - // the next message actually contains the parents let received = rx_parents.recv().await.unwrap(); assert_eq!(received, (certificates.clone(), 1, 0)); @@ -452,11 +445,6 @@ async fn synchronizer_recover_partial_certs() { } tokio::time::sleep(Duration::from_secs(5)).await; - for _ in 0..2 { - let received = rx_parents.recv().await.unwrap(); - assert_eq!(received, (vec![], 0, 0)); - } - // the recovery flow sends message that contains the parents let received = rx_parents.recv().await.unwrap(); assert_eq!(received.1, 1); @@ -519,7 +507,7 @@ async fn synchronizer_recover_previous_round() { .unwrap(); let _ = tx_synchronizer_network.send(network.clone()); - // Send 3 certificates from round 1, and 2 certificates from round 2 to Synchronizer. + // Create 3 certificates per round. let genesis_certs = Certificate::genesis(&committee); let genesis = genesis_certs .iter() @@ -537,6 +525,7 @@ async fn synchronizer_recover_previous_round() { &latest_protocol_version(), &keys, ); + // Send 3 certificates from round 1, and 2 certificates from round 2 to Synchronizer. let all_certificates: Vec<_> = all_certificates.into_iter().collect(); let round_1_certificates = all_certificates[0..3].to_vec(); let round_2_certificates = all_certificates[3..5].to_vec(); diff --git a/narwhal/storage/src/certificate_store.rs b/narwhal/storage/src/certificate_store.rs index 3eb9bfaf85732..b79bf7d3d0dff 100644 --- a/narwhal/storage/src/certificate_store.rs +++ b/narwhal/storage/src/certificate_store.rs @@ -534,13 +534,10 @@ impl CertificateStore { /// Retrieves all the certificates with round >= the provided round. /// The result is returned with certificates sorted in round asc order pub fn after_round(&self, round: Round) -> StoreResult> { - // Skip to a row at or before the requested round. - // TODO: Add a more efficient seek method to typed store. - let mut iter = self.certificate_id_by_round.unbounded_iter(); - if round > 0 { - iter = iter.skip_to(&(round - 1, AuthorityIdentifier::default()))?; - } - + // Restrict the scan to at or after the requested round. + let iter = self + .certificate_id_by_round + .iter_with_bounds(Some((round, AuthorityIdentifier(0))), None); let mut digests = Vec::new(); for ((r, _), d) in iter { match r.cmp(&round) { @@ -548,7 +545,7 @@ impl CertificateStore { digests.push(d); } Ordering::Less => { - continue; + unreachable!("Failed to specify start range. Round {}", round); } } } @@ -590,43 +587,41 @@ impl CertificateStore { Ok(result) } - /// Retrieves the certificates of the last round and the round before that - pub fn last_two_rounds_certs(&self) -> StoreResult> { - // starting from the last element - hence the last round - move backwards until - // we find certificates of different round. - let certificates_reverse = self - .certificate_id_by_round - .unbounded_iter() - .skip_to_last() - .reverse(); - - let mut round = 0; - let mut certificates = Vec::new(); - - for (key, digest) in certificates_reverse { - let (certificate_round, _certificate_origin) = key; - - // We treat zero as special value (round unset) in order to - // capture the last certificate's round. - // We are now in a round less than the previous so we want to - // stop consuming - if round == 0 { - round = certificate_round; - } else if certificate_round < round - 1 { - break; + /// Retrieves the certificates at the specified round. + pub fn at_round(&self, round: Round) -> StoreResult> { + // Restrict the scan to at or after the requested round. + let iter = self.certificate_id_by_round.iter_with_bounds( + Some((round, AuthorityIdentifier(0))), + Some((round + 1, AuthorityIdentifier(0))), + ); + let mut digests = Vec::new(); + for ((r, _), d) in iter { + match r.cmp(&round) { + Ordering::Greater => { + unreachable!("Failed to specify end range. Round {}", round); + } + Ordering::Equal => { + digests.push(d); + } + Ordering::Less => { + unreachable!("Failed to specify start range. Round {}", round); + } } - - let certificate = self.certificates_by_id.get(&digest)?.ok_or_else(|| { - RocksDBError(format!( - "Certificate with id {} not found in main storage although it should", - digest - )) - })?; - - certificates.push(certificate); } - Ok(certificates) + // Fetch all those certificates from main storage, return an error if any one is missing. + self.certificates_by_id + .multi_get(digests.clone())? + .into_iter() + .map(|opt_cert| { + opt_cert.ok_or_else(|| { + RocksDBError(format!( + "Certificate with some digests not found, CertificateStore invariant violation: {:?}", + digests + )) + }) + }) + .collect() } /// Retrieves the last certificate of the given origin. @@ -932,7 +927,7 @@ mod test { } #[tokio::test] - async fn test_last_two_rounds() { + async fn test_last_round() { // GIVEN let store = new_store(temp_dir()); @@ -944,7 +939,7 @@ mod test { store.write_all(certs).unwrap(); // WHEN - let result = store.last_two_rounds_certs().unwrap(); + let result = store.at_round(50).unwrap(); let last_round_cert = store.last_round(origin).unwrap().unwrap(); let last_round_number = store.last_round_number(origin).unwrap().unwrap(); let last_round_number_not_exist = @@ -952,15 +947,12 @@ mod test { let highest_round_number = store.highest_round_number(); // THEN - assert_eq!(result.len(), 8); + assert_eq!(result.len(), 4); assert_eq!(last_round_cert.round(), 50); assert_eq!(last_round_number, 50); assert_eq!(highest_round_number, 50); for certificate in result { - assert!( - (certificate.round() == last_round_number) - || (certificate.round() == last_round_number - 1) - ); + assert_eq!(certificate.round(), last_round_number); } assert!(last_round_number_not_exist.is_none()); } @@ -971,7 +963,7 @@ mod test { let store = new_store(temp_dir()); // WHEN - let result = store.last_two_rounds_certs().unwrap(); + let result = store.at_round(1).unwrap(); let last_round_cert = store.last_round(AuthorityIdentifier::default()).unwrap(); let last_round_number = store .last_round_number(AuthorityIdentifier::default())