Skip to content

Commit

Permalink
feat: Implement ChonkyBFT (#211)
Browse files Browse the repository at this point in the history
Implemented types and logic for ChonkyBFT. Note that I'm not keeping the
old types, that's because `ReplicaCommit` has no changes to it (I
checked and serialization is the same for old and new, old signatures
still work) as well as `CommitQC` and `FinalBlock`. Any type that an EN
could receive, or that ends on our database, doesn't suffer any
alterations. So it seems that the upgrade to ChonkyBFT is
backwards-compatible for ENs. Since we only have one validator, we can
upgrade the whole network without a planned hard fork.
Fixes BFT-452
  • Loading branch information
brunoffranca authored Nov 5, 2024
1 parent 74407e6 commit f4cc128
Show file tree
Hide file tree
Showing 81 changed files with 6,916 additions and 6,295 deletions.
949 changes: 596 additions & 353 deletions node/Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,31 +1,17 @@
use super::StateMachine;
use zksync_concurrency::ctx;
use zksync_concurrency::{ctx, error::Wrap as _};
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

impl StateMachine {
/// Tries to build a finalized block from the given CommitQC. We simply search our
/// block proposal cache for the matching block, and if we find it we build the block.
/// If this method succeeds, it sends the finalized block to the executor.
/// It also updates the High QC in the replica state machine, if the received QC is
/// higher.
#[tracing::instrument(level = "debug", skip_all)]
/// If this method succeeds, it saves the finalized block to storage.
pub(crate) async fn save_block(
&mut self,
ctx: &ctx::Ctx,
commit_qc: &validator::CommitQC,
) -> ctx::Result<()> {
// Update high_qc.
if self
.high_qc
.as_ref()
.map(|qc| qc.view().number < commit_qc.view().number)
.unwrap_or(true)
{
self.high_qc = Some(commit_qc.clone());
}
// TODO(gprusak): for availability of finalized blocks,
// replicas should be able to broadcast highest quorums without
// the corresponding block (same goes for synchronization).
let Some(cache) = self.block_proposal_cache.get(&commit_qc.header().number) else {
return Ok(());
};
Expand All @@ -46,7 +32,11 @@ impl StateMachine {
.block_store
.queue_block(ctx, block.clone().into())
.await?;

// For availability, replica should not proceed until it stores the block persistently.
// Rationale is that after save_block, there is start_new_view which prunes the
// cache. Without persisting this block, if all replicas crash just after
// start_new_view, the payload becomes unavailable.
self.config
.block_store
.wait_until_persisted(ctx, block.header().number)
Expand All @@ -55,6 +45,32 @@ impl StateMachine {
let number_metric = &crate::metrics::METRICS.finalized_block_number;
let current_number = number_metric.get();
number_metric.set(current_number.max(block.header().number.0));

Ok(())
}

/// Backups the replica state to DB.
pub(crate) async fn backup_state(&self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let mut proposals = vec![];
for (number, payloads) in &self.block_proposal_cache {
proposals.extend(payloads.values().map(|p| storage::Proposal {
number: *number,
payload: p.clone(),
}));
}
let backup = storage::ReplicaState {
view: self.view_number,
phase: self.phase,
high_vote: self.high_vote.clone(),
high_commit_qc: self.high_commit_qc.clone(),
high_timeout_qc: self.high_timeout_qc.clone(),
proposals,
};
self.config
.replica_store
.set_state(ctx, &backup)
.await
.wrap("set_state()")?;
Ok(())
}
}
161 changes: 161 additions & 0 deletions node/actors/bft/src/chonky_bft/commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use super::StateMachine;
use crate::metrics;
use std::collections::HashSet;
use zksync_concurrency::{ctx, error::Wrap, metrics::LatencyHistogramExt as _};
use zksync_consensus_roles::validator;

/// Errors that can occur when processing a ReplicaCommit message.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
/// Message signer isn't part of the validator set.
#[error("message signer isn't part of the validator set (signer: {signer:?})")]
NonValidatorSigner {
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Past view.
#[error("past view (current view: {current_view:?})")]
Old {
/// Current view.
current_view: validator::ViewNumber,
},
/// Duplicate signer. We already have a commit message from the same validator
/// for the same or past view.
#[error("duplicate signer (message view: {message_view:?}, signer: {signer:?})")]
DuplicateSigner {
/// View number of the message.
message_view: validator::ViewNumber,
/// Signer of the message.
signer: Box<validator::PublicKey>,
},
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] anyhow::Error),
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::ReplicaCommitVerifyError),
/// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable.
#[error(transparent)]
Internal(#[from] ctx::Error),
}

impl Wrap for Error {
fn with_wrap<C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C>(
self,
f: F,
) -> Self {
match self {
Error::Internal(err) => Error::Internal(err.with_wrap(f)),
err => err,
}
}
}

impl StateMachine {
/// Processes a ReplicaCommit message.
pub(crate) async fn on_commit(
&mut self,
ctx: &ctx::Ctx,
signed_message: validator::Signed<validator::ReplicaCommit>,
) -> Result<(), Error> {
// ----------- Checking origin of the message --------------

// Unwrap message.
let message = &signed_message.msg;
let author = &signed_message.key;

// Check that the message signer is in the validator committee.
if !self.config.genesis().validators.contains(author) {
return Err(Error::NonValidatorSigner {
signer: author.clone().into(),
});
}

// If the message is from a past view, ignore it.
if message.view.number < self.view_number {
return Err(Error::Old {
current_view: self.view_number,
});
}

// If we already have a message from the same validator for the same or past view, ignore it.
if let Some(&view) = self.commit_views_cache.get(author) {
if view >= message.view.number {
return Err(Error::DuplicateSigner {
message_view: message.view.number,
signer: author.clone().into(),
});
}
}

// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
signed_message.verify().map_err(Error::InvalidSignature)?;

message
.verify(self.config.genesis())
.map_err(Error::InvalidMessage)?;

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
let commit_qc = self
.commit_qcs_cache
.entry(message.view.number)
.or_default()
.entry(message.clone())
.or_insert_with(|| validator::CommitQC::new(message.clone(), self.config.genesis()));

// Should always succeed as all checks have been already performed
commit_qc
.add(&signed_message, self.config.genesis())
.expect("could not add message to CommitQC");

// Calculate the CommitQC signers weight.
let weight = self.config.genesis().validators.weight(&commit_qc.signers);

// Update view number of last commit message for author
self.commit_views_cache
.insert(author.clone(), message.view.number);

// Clean up commit_qcs for the case that no replica is at the view
// of a given CommitQC.
// This prevents commit_qcs map from growing indefinitely in case some
// malicious replica starts spamming messages for future views.
let active_views: HashSet<_> = self.commit_views_cache.values().collect();
self.commit_qcs_cache
.retain(|view_number, _| active_views.contains(view_number));

// Now we check if we have enough weight to continue. If not, we wait for more messages.
if weight < self.config.genesis().validators.quorum_threshold() {
return Ok(());
};

// ----------- We have a QC. Now we process it. --------------

// Consume the created commit QC for this view.
let commit_qc = self
.commit_qcs_cache
.remove(&message.view.number)
.unwrap()
.remove(message)
.unwrap();

// We update our state with the new commit QC.
self.process_commit_qc(ctx, &commit_qc)
.await
.wrap("process_commit_qc()")?;

// Metrics. We observe the latency of committing to a block measured
// from the start of this view.
metrics::METRICS
.commit_latency
.observe_latency(ctx.now() - self.view_start);

// Start a new view.
self.start_new_view(ctx, message.view.number.next()).await?;

Ok(())
}
}
Loading

0 comments on commit f4cc128

Please sign in to comment.