Skip to content

Commit

Permalink
[Narwhal] wait to cancel header proposal after higher round certifica…
Browse files Browse the repository at this point in the history
…tes have been created (#13002)

## Description 

Cancel header proposal less eagerly, by waiting until a higher round
certificate has been created locally, instead of immediately observing a
higher round certificate. Orphaned certificates can still be useful as
votes on previous leaders. Transactions in the orphaned certificates
will be retried, same as those in cancelled headers.

The main goal is to make the system behavior more intuitive. Also this
change may help keeping latency consistent when headers are proposed
more frequently for any reason.

## Test Plan 

CI
Deploey to private test and observed no change in p50 latency and p95
jitter.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Jul 24, 2023
1 parent fc9c746 commit 52a6e7b
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 92 deletions.
35 changes: 27 additions & 8 deletions narwhal/primary/src/certifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use futures::StreamExt;
use mysten_metrics::metered_channel::Receiver;
use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
use network::anemo_ext::NetworkExt;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use storage::{CertificateStore, HeaderStore};
Expand All @@ -24,7 +26,7 @@ use types::{
ensure,
error::{DagError, DagResult},
Certificate, CertificateDigest, ConditionalBroadcastReceiver, Header, HeaderAPI,
PrimaryToPrimaryClient, RequestVoteRequest, Vote, VoteAPI,
PrimaryToPrimaryClient, RequestVoteRequest, Round, Vote, VoteAPI,
};

#[cfg(test)]
Expand Down Expand Up @@ -53,9 +55,8 @@ pub struct Certifier {
rx_shutdown: ConditionalBroadcastReceiver,
/// Receives our newly created headers from the `Proposer`.
rx_headers: Receiver<Header>,
/// Used to cancel vote requests for a previously-proposed header that is being replaced
/// before a certificate could be formed.
cancel_proposed_header: Option<oneshot::Sender<()>>,
/// Used to cancel vote requests for proposing header that are no longer needed.
cancel_proposing_headers: Arc<Mutex<VecDeque<(Round, oneshot::Sender<()>)>>>,
/// Handle to propose_header task. Our target is to have only one task running always, thus
/// we cancel the previously running before we spawn the next one. However, we don't wait for
/// the previous to finish to spawn the new one, so we might temporarily have more that one
Expand Down Expand Up @@ -93,7 +94,7 @@ impl Certifier {
signature_service,
rx_shutdown,
rx_headers,
cancel_proposed_header: None,
cancel_proposing_headers: Default::default(),
propose_header_tasks: JoinSet::new(),
network: primary_network,
metrics,
Expand Down Expand Up @@ -247,6 +248,7 @@ impl Certifier {
metrics: Arc<PrimaryMetrics>,
network: anemo::Network,
header: Header,
cancel_proposing_headers: Arc<Mutex<VecDeque<(Round, oneshot::Sender<()>)>>>,
mut cancel: oneshot::Receiver<()>,
) -> DagResult<Certificate> {
if header.epoch() != committee.epoch() {
Expand Down Expand Up @@ -337,6 +339,15 @@ impl Certifier {
})?;
debug!("Assembled {certificate:?}");

let mut cancel_proposing_headers = cancel_proposing_headers.lock();
while let Some(&(round, _)) = cancel_proposing_headers.front() {
if round > header.round() {
break;
}
let (_round, sender) = cancel_proposing_headers.pop_front().unwrap();
let _ = sender.send(());
}

Ok(certificate)
}

Expand Down Expand Up @@ -369,10 +380,16 @@ impl Certifier {
// TODO: move logic into Proposer.
Some(header) = self.rx_headers.recv() => {
let (tx_cancel, rx_cancel) = oneshot::channel();
if let Some(cancel) = self.cancel_proposed_header {
let _ = cancel.send(());
{
let mut cancel_proposing_headers = self.cancel_proposing_headers.lock();
cancel_proposing_headers.push_back((header.round(), tx_cancel));

const MAX_PROPOSING_HEADERS: usize = 3;
while cancel_proposing_headers.len() > MAX_PROPOSING_HEADERS {
let (_round, sender) = cancel_proposing_headers.pop_front().unwrap();
let _ = sender.send(());
}
}
self.cancel_proposed_header = Some(tx_cancel);

let name = self.authority_id;
let committee = self.committee.clone();
Expand All @@ -381,6 +398,7 @@ impl Certifier {
let signature_service = self.signature_service.clone();
let metrics = self.metrics.clone();
let network = self.network.clone();
let cancel_proposing_headers = self.cancel_proposing_headers.clone();
fail_point_async!("narwhal-delay");
self.propose_header_tasks.spawn(monitored_future!(Self::propose_header(
name,
Expand All @@ -391,6 +409,7 @@ impl Certifier {
metrics,
network,
header,
cancel_proposing_headers,
rx_cancel,
)));
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ impl PrimaryReceiverHandler {
// certificates to still have a chance to be included in the DAG while not wasting
// resources on very old vote requests. This value affects performance but not correctness
// of the algorithm.
const HEADER_AGE_LIMIT: Round = 3;
const HEADER_AGE_LIMIT: Round = 5;

// Lock to ensure consistency between limit_round and where parent_digests are gc'ed.
let mut parent_digests = self.parent_digests.lock();
Expand Down
52 changes: 32 additions & 20 deletions narwhal/primary/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use anemo::{rpc::Status, Network, Request, Response};
use config::{AuthorityIdentifier, Committee, Epoch, WorkerCache};
use config::{AuthorityIdentifier, Committee, Epoch, Stake, WorkerCache};
use consensus::consensus::ConsensusRound;
use crypto::NetworkPublicKey;
use fastcrypto::hash::Hash as _;
Expand Down Expand Up @@ -362,11 +362,37 @@ impl Synchronizer {
let inner_proposer = inner.clone();
spawn_logged_monitored_task!(
async move {
let last_round_certificates = inner_proposer
.certificate_store
.last_two_rounds_certs()
.expect("Failed recovering certificates in primary core");
for certificate in last_round_certificates {
let highest_round_number = inner_proposer.certificate_store.highest_round_number();
let mut certificates = vec![];
// The last or last 2 rounds are sufficient for recovery.
for i in 0..2 {
let round = highest_round_number - i;
// Do not recover genesis certificates. They are initialized into certificate
// aggregator already.
if round == 0 {
break;
}
let round_certs = inner_proposer
.certificate_store
.at_round(round)
.expect("Failed recovering certificates in primary core");
let stake: Stake = round_certs
.iter()
.map(|c: &Certificate| inner_proposer.committee.stake_by_id(c.origin()))
.sum();
certificates.extend(round_certs.into_iter());
// If a round has a quorum of certificates, enough have recovered because
// a header can be proposed with these parents.
if stake >= inner_proposer.committee.quorum_threshold() {
break;
} else {
// Only the last round can have less than a quorum of stake.
assert_eq!(i, 0);
}
}
// Unnecessary to append certificates in ascending round order, but it doesn't
// hurt either.
for certificate in certificates.into_iter().rev() {
if let Err(e) = inner_proposer
.append_certificate_in_aggregator(certificate)
.await
Expand Down Expand Up @@ -698,20 +724,6 @@ impl Synchronizer {
.with_label_values(&[certificate_source])
.set(highest_received_round as i64);

// Let the proposer draw early conclusions from a certificate at this round and epoch, without its
// parents or payload (which we may not have yet).
//
// Since our certificate is well-signed, it shows a majority of honest signers stand at round r,
// so to make a successful proposal, our proposer must use parents at least at round r-1.
//
// This allows the proposer not to fire proposals at rounds strictly below the certificate we witnessed.
let minimal_round_for_parents = certificate.round().saturating_sub(1);
self.inner
.tx_parents
.send((vec![], minimal_round_for_parents, certificate.epoch()))
.await
.map_err(|_| DagError::ShuttingDown)?;

// Instruct workers to download any missing batches referenced in this certificate.
// Since this header got certified, we are sure that all the data it refers to (ie. its batches and its parents) are available.
// We can thus continue the processing of the certificate without blocking on batch synchronization.
Expand Down
15 changes: 2 additions & 13 deletions narwhal/primary/src/tests/synchronizer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ async fn accept_certificates() {
}

// Ensure the Synchronizer sends the parents of the certificates to the proposer.
//
// The first messages are the Synchronizer letting us know about the round of parent certificates
for _i in 0..3 {
let received = rx_parents.recv().await.unwrap();
assert_eq!(received, (vec![], 0, 0));
}
// the next message actually contains the parents
let received = rx_parents.recv().await.unwrap();
assert_eq!(received, (certificates.clone(), 1, 0));

Expand Down Expand Up @@ -452,11 +445,6 @@ async fn synchronizer_recover_partial_certs() {
}
tokio::time::sleep(Duration::from_secs(5)).await;

for _ in 0..2 {
let received = rx_parents.recv().await.unwrap();
assert_eq!(received, (vec![], 0, 0));
}

// the recovery flow sends message that contains the parents
let received = rx_parents.recv().await.unwrap();
assert_eq!(received.1, 1);
Expand Down Expand Up @@ -519,7 +507,7 @@ async fn synchronizer_recover_previous_round() {
.unwrap();
let _ = tx_synchronizer_network.send(network.clone());

// Send 3 certificates from round 1, and 2 certificates from round 2 to Synchronizer.
// Create 3 certificates per round.
let genesis_certs = Certificate::genesis(&committee);
let genesis = genesis_certs
.iter()
Expand All @@ -537,6 +525,7 @@ async fn synchronizer_recover_previous_round() {
&latest_protocol_version(),
&keys,
);
// Send 3 certificates from round 1, and 2 certificates from round 2 to Synchronizer.
let all_certificates: Vec<_> = all_certificates.into_iter().collect();
let round_1_certificates = all_certificates[0..3].to_vec();
let round_2_certificates = all_certificates[3..5].to_vec();
Expand Down
92 changes: 42 additions & 50 deletions narwhal/storage/src/certificate_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,21 +534,18 @@ impl<T: Cache> CertificateStore<T> {
/// Retrieves all the certificates with round >= the provided round.
/// The result is returned with certificates sorted in round asc order
pub fn after_round(&self, round: Round) -> StoreResult<Vec<Certificate>> {
// Skip to a row at or before the requested round.
// TODO: Add a more efficient seek method to typed store.
let mut iter = self.certificate_id_by_round.unbounded_iter();
if round > 0 {
iter = iter.skip_to(&(round - 1, AuthorityIdentifier::default()))?;
}

// Restrict the scan to at or after the requested round.
let iter = self
.certificate_id_by_round
.iter_with_bounds(Some((round, AuthorityIdentifier(0))), None);
let mut digests = Vec::new();
for ((r, _), d) in iter {
match r.cmp(&round) {
Ordering::Equal | Ordering::Greater => {
digests.push(d);
}
Ordering::Less => {
continue;
unreachable!("Failed to specify start range. Round {}", round);
}
}
}
Expand Down Expand Up @@ -590,43 +587,41 @@ impl<T: Cache> CertificateStore<T> {
Ok(result)
}

/// Retrieves the certificates of the last round and the round before that
pub fn last_two_rounds_certs(&self) -> StoreResult<Vec<Certificate>> {
// starting from the last element - hence the last round - move backwards until
// we find certificates of different round.
let certificates_reverse = self
.certificate_id_by_round
.unbounded_iter()
.skip_to_last()
.reverse();

let mut round = 0;
let mut certificates = Vec::new();

for (key, digest) in certificates_reverse {
let (certificate_round, _certificate_origin) = key;

// We treat zero as special value (round unset) in order to
// capture the last certificate's round.
// We are now in a round less than the previous so we want to
// stop consuming
if round == 0 {
round = certificate_round;
} else if certificate_round < round - 1 {
break;
/// Retrieves the certificates at the specified round.
pub fn at_round(&self, round: Round) -> StoreResult<Vec<Certificate>> {
// Restrict the scan to at or after the requested round.
let iter = self.certificate_id_by_round.iter_with_bounds(
Some((round, AuthorityIdentifier(0))),
Some((round + 1, AuthorityIdentifier(0))),
);
let mut digests = Vec::new();
for ((r, _), d) in iter {
match r.cmp(&round) {
Ordering::Greater => {
unreachable!("Failed to specify end range. Round {}", round);
}
Ordering::Equal => {
digests.push(d);
}
Ordering::Less => {
unreachable!("Failed to specify start range. Round {}", round);
}
}

let certificate = self.certificates_by_id.get(&digest)?.ok_or_else(|| {
RocksDBError(format!(
"Certificate with id {} not found in main storage although it should",
digest
))
})?;

certificates.push(certificate);
}

Ok(certificates)
// Fetch all those certificates from main storage, return an error if any one is missing.
self.certificates_by_id
.multi_get(digests.clone())?
.into_iter()
.map(|opt_cert| {
opt_cert.ok_or_else(|| {
RocksDBError(format!(
"Certificate with some digests not found, CertificateStore invariant violation: {:?}",
digests
))
})
})
.collect()
}

/// Retrieves the last certificate of the given origin.
Expand Down Expand Up @@ -932,7 +927,7 @@ mod test {
}

#[tokio::test]
async fn test_last_two_rounds() {
async fn test_last_round() {
// GIVEN
let store = new_store(temp_dir());

Expand All @@ -944,23 +939,20 @@ mod test {
store.write_all(certs).unwrap();

// WHEN
let result = store.last_two_rounds_certs().unwrap();
let result = store.at_round(50).unwrap();
let last_round_cert = store.last_round(origin).unwrap().unwrap();
let last_round_number = store.last_round_number(origin).unwrap().unwrap();
let last_round_number_not_exist =
store.last_round_number(AuthorityIdentifier(10u16)).unwrap();
let highest_round_number = store.highest_round_number();

// THEN
assert_eq!(result.len(), 8);
assert_eq!(result.len(), 4);
assert_eq!(last_round_cert.round(), 50);
assert_eq!(last_round_number, 50);
assert_eq!(highest_round_number, 50);
for certificate in result {
assert!(
(certificate.round() == last_round_number)
|| (certificate.round() == last_round_number - 1)
);
assert_eq!(certificate.round(), last_round_number);
}
assert!(last_round_number_not_exist.is_none());
}
Expand All @@ -971,7 +963,7 @@ mod test {
let store = new_store(temp_dir());

// WHEN
let result = store.last_two_rounds_certs().unwrap();
let result = store.at_round(1).unwrap();
let last_round_cert = store.last_round(AuthorityIdentifier::default()).unwrap();
let last_round_number = store
.last_round_number(AuthorityIdentifier::default())
Expand Down

1 comment on commit 52a6e7b

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 Validators 500/s Owned Transactions Benchmark Results

Benchmark Report:
+-------------+-----+-----+--------+---------------+---------------+---------------+-----------------------+----------------------------+
| duration(s) | tps | cps | error% | latency (min) | latency (p50) | latency (p99) | gas used (MIST total) | gas used/hr (MIST approx.) |
+=======================================================================================================================================+
| 60          | 587 | 587 | 0      | 56            | 8415          | 8751          | 459,098,592,000       | 27,545,915,520,000         |
Stress Performance Report:
+-----------+-----+-----+
| metric    | p50 | p99 |
+=======================+
| cpu usage | 12  | 100 |

4 Validators 500/s Shared Transactions Benchmark Results

Benchmark Report:
+-------------+-----+-----+--------+---------------+---------------+---------------+-----------------------+----------------------------+
| duration(s) | tps | cps | error% | latency (min) | latency (p50) | latency (p99) | gas used (MIST total) | gas used/hr (MIST approx.) |
+=======================================================================================================================================+
| 60          | 482 | 482 | 0      | 21            | 9407          | 13687         | 443,411,080,800       | 26,604,664,848,000         |
Stress Performance Report:
+-----------+-----+-----+
| metric    | p50 | p99 |
+=======================+
| cpu usage | 11  | 100 |

20 Validators 50/s Owned Transactions Benchmark Results

Benchmark Report:
+-------------+-----+-----+--------+---------------+---------------+---------------+-----------------------+----------------------------+
| duration(s) | tps | cps | error% | latency (min) | latency (p50) | latency (p99) | gas used (MIST total) | gas used/hr (MIST approx.) |
+=======================================================================================================================================+
| 60          | 200 | 200 | 0      | 21            | 65            | 124           | 156,370,944,000       | 9,382,256,640,000          |
Stress Performance Report:
+-----------+-----+-----+
| metric    | p50 | p99 |
+=======================+
| cpu usage | 26  | 51  |

20 Validators 50/s Shared Transactions Benchmark Results

Benchmark Report:
+-------------+-----+-----+--------+---------------+---------------+---------------+-----------------------+----------------------------+
| duration(s) | tps | cps | error% | latency (min) | latency (p50) | latency (p99) | gas used (MIST total) | gas used/hr (MIST approx.) |
+=======================================================================================================================================+
| 60          | 194 | 194 | 0      | 39            | 1427          | 2087          | 177,736,510,800       | 10,664,190,648,000         |
Stress Performance Report:
+-----------+-----+-----+
| metric    | p50 | p99 |
+=======================+
| cpu usage | 25  | 57  |

Narwhal Benchmark Results

 SUMMARY:
-----------------------------------------
 + CONFIG:
 Faults: 0 node(s)
 Committee size: 4 node(s)
 Worker(s) per node: 1 worker(s)
 Collocate primary and workers: True
 Input rate: 50,000 tx/s
 Transaction size: 512 B
 Execution time: 0 s

 Header number of batches threshold: 32 digests
 Header maximum number of batches: 1,000 digests
 Max header delay: 2,000 ms
 GC depth: 50 round(s)
 Sync retry delay: 10,000 ms
 Sync retry nodes: 3 node(s)
 batch size: 500,000 B
 Max batch delay: 200 ms
 Max concurrent requests: 500,000 

 + RESULTS:
 Batch creation avg latency: 202 ms
 Header creation avg latency: -1 ms
 	Batch to header avg latency: -1 ms
 Header to certificate avg latency: 1 ms
 	Request vote outbound avg latency: 0 ms
 Certificate commit avg latency: 743 ms

 Consensus TPS: 0 tx/s
 Consensus BPS: 0 B/s
 Consensus latency: 0 ms

 End-to-end TPS: 0 tx/s
 End-to-end BPS: 0 B/s
 End-to-end latency: 0 ms
-----------------------------------------

Please sign in to comment.