Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement ChonkyBFT #211

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
949 changes: 596 additions & 353 deletions node/Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,31 +1,17 @@
use super::StateMachine;
use zksync_concurrency::ctx;
use zksync_concurrency::{ctx, error::Wrap as _};
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

impl StateMachine {
/// Tries to build a finalized block from the given CommitQC. We simply search our
/// block proposal cache for the matching block, and if we find it we build the block.
/// If this method succeeds, it sends the finalized block to the executor.
/// It also updates the High QC in the replica state machine, if the received QC is
/// higher.
#[tracing::instrument(level = "debug", skip_all)]
/// If this method succeeds, it saves the finalized block to storage.
pub(crate) async fn save_block(
&mut self,
ctx: &ctx::Ctx,
commit_qc: &validator::CommitQC,
) -> ctx::Result<()> {
// Update high_qc.
if self
.high_qc
.as_ref()
.map(|qc| qc.view().number < commit_qc.view().number)
.unwrap_or(true)
{
self.high_qc = Some(commit_qc.clone());
}
// TODO(gprusak): for availability of finalized blocks,
// replicas should be able to broadcast highest quorums without
// the corresponding block (same goes for synchronization).
let Some(cache) = self.block_proposal_cache.get(&commit_qc.header().number) else {
return Ok(());
};
Expand All @@ -46,7 +32,11 @@ impl StateMachine {
.block_store
.queue_block(ctx, block.clone().into())
.await?;

// For availability, replica should not proceed until it stores the block persistently.
// Rationale is that after save_block, there is start_new_view which prunes the
// cache. Without persisting this block, if all replicas crash just after
// start_new_view, the payload becomes unavailable.
self.config
.block_store
.wait_until_persisted(ctx, block.header().number)
Expand All @@ -55,6 +45,32 @@ impl StateMachine {
let number_metric = &crate::metrics::METRICS.finalized_block_number;
let current_number = number_metric.get();
number_metric.set(current_number.max(block.header().number.0));

Ok(())
}

/// Backups the replica state to DB.
pub(crate) async fn backup_state(&self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let mut proposals = vec![];
for (number, payloads) in &self.block_proposal_cache {
proposals.extend(payloads.values().map(|p| storage::Proposal {
number: *number,
payload: p.clone(),
}));
}
let backup = storage::ReplicaState {
view: self.view_number,
phase: self.phase,
high_vote: self.high_vote.clone(),
high_commit_qc: self.high_commit_qc.clone(),
high_timeout_qc: self.high_timeout_qc.clone(),
proposals,
};
self.config
.replica_store
.set_state(ctx, &backup)
.await
.wrap("set_state()")?;
Ok(())
}
}
161 changes: 161 additions & 0 deletions node/actors/bft/src/chonky_bft/commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use super::StateMachine;
use crate::metrics;
use std::collections::HashSet;
use zksync_concurrency::{ctx, error::Wrap, metrics::LatencyHistogramExt as _};
use zksync_consensus_roles::validator;

/// Errors that can occur when processing a ReplicaCommit message.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
/// Message signer isn't part of the validator set.
#[error("message signer isn't part of the validator set (signer: {signer:?})")]
NonValidatorSigner {
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Past view.
#[error("past view (current view: {current_view:?})")]
Old {
/// Current view.
current_view: validator::ViewNumber,
},
/// Duplicate signer. We already have a commit message from the same validator
/// for the same or past view.
#[error("duplicate signer (message view: {message_view:?}, signer: {signer:?})")]
DuplicateSigner {
/// View number of the message.
message_view: validator::ViewNumber,
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] anyhow::Error),
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::ReplicaCommitVerifyError),
/// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable.
#[error(transparent)]
Internal(#[from] ctx::Error),
}

impl Wrap for Error {
fn with_wrap<C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C>(
self,
f: F,
) -> Self {
match self {
Error::Internal(err) => Error::Internal(err.with_wrap(f)),
err => err,
}
}
}

impl StateMachine {
/// Processes a ReplicaCommit message.
pub(crate) async fn on_commit(
&mut self,
ctx: &ctx::Ctx,
signed_message: validator::Signed<validator::ReplicaCommit>,
) -> Result<(), Error> {
// ----------- Checking origin of the message --------------

// Unwrap message.
let message = &signed_message.msg;
let author = &signed_message.key;

// Check that the message signer is in the validator committee.
if !self.config.genesis().validators.contains(author) {
return Err(Error::NonValidatorSigner {
signer: author.clone().into(),
});
}

// If the message is from a past view, ignore it.
if message.view.number < self.view_number {
return Err(Error::Old {
current_view: self.view_number,
});
}

// If we already have a message from the same validator for the same or past view, ignore it.
if let Some(&view) = self.commit_views_cache.get(author) {
if view >= message.view.number {
return Err(Error::DuplicateSigner {
message_view: message.view.number,
signer: author.clone().into(),
});
}
}

// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
signed_message.verify().map_err(Error::InvalidSignature)?;

message
.verify(self.config.genesis())
.map_err(Error::InvalidMessage)?;

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
let commit_qc = self
.commit_qcs_cache
.entry(message.view.number)
.or_default()
.entry(message.clone())
.or_insert_with(|| validator::CommitQC::new(message.clone(), self.config.genesis()));

// Should always succeed as all checks have been already performed
commit_qc
.add(&signed_message, self.config.genesis())
.expect("could not add message to CommitQC");

// Calculate the CommitQC signers weight.
let weight = self.config.genesis().validators.weight(&commit_qc.signers);

// Update view number of last commit message for author
self.commit_views_cache
.insert(author.clone(), message.view.number);

// Clean up commit_qcs for the case that no replica is at the view
// of a given CommitQC.
// This prevents commit_qcs map from growing indefinitely in case some
// malicious replica starts spamming messages for future views.
let active_views: HashSet<_> = self.commit_views_cache.values().collect();
self.commit_qcs_cache
.retain(|view_number, _| active_views.contains(view_number));

// Now we check if we have enough weight to continue. If not, we wait for more messages.
if weight < self.config.genesis().validators.quorum_threshold() {
return Ok(());
};

// ----------- We have a QC. Now we process it. --------------

// Consume the created commit QC for this view.
let commit_qc = self
.commit_qcs_cache
.remove(&message.view.number)
.unwrap()
.remove(message)
.unwrap();

// We update our state with the new commit QC.
self.process_commit_qc(ctx, &commit_qc)
.await
.wrap("process_commit_qc()")?;

// Metrics. We observe the latency of committing to a block measured
// from the start of this view.
metrics::METRICS
.commit_latency
.observe_latency(ctx.now() - self.view_start);

// Start a new view.
self.start_new_view(ctx, message.view.number.next()).await?;

Ok(())
}
}
Loading
Loading