Skip to content

Commit

Permalink
[consensus] Fix crash recovery when no commit info was flushed (#17712)
Browse files Browse the repository at this point in the history
CommitInfo is buffered on LeaderSchedule change and then it is flushed
on the following commit. The current issue is that it is possible for
commits to be written but not CommitInfo to be persisted to store. This
is typically okay because the recovery code in DagState should recover
all information from the last stored CommitInfo. The issue was that
until the first LeaderSchedule change there is no commit info written to
store and the code was not properly recovering `last_committed_rounds`
in DagState in this case.

Changed the code to handle recovery in the case that no commit info has
been written to store and added some additional testing for recovery.
  • Loading branch information
arun-koshy authored and mwtian committed May 14, 2024
1 parent 0d02721 commit b10ea73
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 137 deletions.
140 changes: 81 additions & 59 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
},
commit::{
load_committed_subdag_from_store, CommitAPI as _, CommitDigest, CommitIndex, CommitInfo,
CommitRef, CommitVote, CommittedSubDag, TrustedCommit,
CommitRef, CommitVote, CommittedSubDag, TrustedCommit, GENESIS_COMMIT_INDEX,
},
context::Context,
leader_scoring::ReputationScores,
Expand Down Expand Up @@ -103,45 +103,44 @@ impl DagState {

let mut unscored_committed_subdags = Vec::new();

let last_committed_rounds = {
let commit_info = store
.read_last_commit_info()
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
let commit_info = store
.read_last_commit_info()
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
let (mut last_committed_rounds, commit_recovery_start_index) =
if let Some((commit_ref, commit_info)) = commit_info {
let mut committed_rounds = commit_info.committed_rounds;
let last_commit = last_commit
.as_ref()
.expect("There exists commit info, so the last commit should exist as well.");

if last_commit.index() > commit_ref.index {
let committed_blocks = store
.scan_commits(((commit_ref.index + 1)..last_commit.index() + 1).into())
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
.iter()
.flat_map(|commit| {
let committed_subdag =
load_committed_subdag_from_store(store.as_ref(), commit.clone());
if context
.protocol_config
.mysticeti_leader_scoring_and_schedule()
{
unscored_committed_subdags.push(committed_subdag.clone());
}
committed_subdag.blocks
})
.collect::<Vec<_>>();

for block in committed_blocks {
committed_rounds[block.author()] =
max(committed_rounds[block.author()], block.round());
tracing::info!("Recovering committed state from {commit_ref} {commit_info:?}");
(commit_info.committed_rounds, commit_ref.index + 1)
} else {
tracing::info!("Found no stored CommitInfo to recover from");
(vec![0; num_authorities], GENESIS_COMMIT_INDEX + 1)
};

if let Some(last_commit) = last_commit.as_ref() {
store
.scan_commits((commit_recovery_start_index..last_commit.index() + 1).into())
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
.iter()
.for_each(|commit| {
for block_ref in commit.blocks() {
last_committed_rounds[block_ref.author] =
max(last_committed_rounds[block_ref.author], block_ref.round);
}
}
if context
.protocol_config
.mysticeti_leader_scoring_and_schedule()
{
let committed_subdag =
load_committed_subdag_from_store(store.as_ref(), commit.clone());
unscored_committed_subdags.push(committed_subdag);
}
});
}

committed_rounds
} else {
vec![0; num_authorities]
}
};
tracing::info!(
"DagState was initialized with the following state: \
{last_commit:?}; {last_committed_rounds:?}; {} unscored_committed_subdags;",
unscored_committed_subdags.len()
);

let mut state = Self {
context,
Expand Down Expand Up @@ -1339,6 +1338,7 @@ mod test {

#[tokio::test]
async fn test_flush_and_recovery() {
telemetry_subscribers::init_for_testing();
let num_authorities: u32 = 4;
let (context, _) = Context::new_for_test(num_authorities as usize);
let context = Arc::new(context);
Expand All @@ -1347,27 +1347,41 @@ mod test {

// Create test blocks and commits for round 1 ~ 10
let num_rounds: u32 = 10;
let mut blocks = Vec::new();
let mut commits = Vec::new();
for round in 1..=num_rounds {
for author in 0..num_authorities {
let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
blocks.push(block);
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=num_rounds).build();
let mut commits = vec![];
let leaders = dag_builder
.leader_blocks(1..=num_rounds)
.into_iter()
.flatten()
.collect::<Vec<_>>();

let mut last_committed_rounds = vec![0; 4];
for (idx, leader) in leaders.into_iter().enumerate() {
let commit_index = idx as u32 + 1;
let subdag =
dag_builder.get_subdag(leader.clone(), last_committed_rounds.clone(), commit_index);
for block in subdag.blocks.iter() {
last_committed_rounds[block.author().value()] =
max(block.round(), last_committed_rounds[block.author().value()]);
}
commits.push(TrustedCommit::new_for_test(
round as CommitIndex,
let commit = TrustedCommit::new_for_test(
commit_index,
CommitDigest::MIN,
0,
blocks.last().unwrap().reference(),
vec![],
));
leader.timestamp_ms(),
leader.reference(),
subdag
.blocks
.iter()
.map(|block| block.reference())
.collect::<Vec<_>>(),
);
commits.push(commit);
}

// Add the blocks from first 5 rounds and first 5 commits to the dag state
let i = blocks.iter().position(|b| b.round() == 6).unwrap();
let temp_blocks = blocks.split_off(i);
dag_state.accept_blocks(blocks.clone());
let temp_commits = commits.split_off(5);
dag_state.accept_blocks(dag_builder.blocks(1..=5));
for commit in commits.clone() {
dag_state.add_commit(commit);
}
Expand All @@ -1376,17 +1390,13 @@ mod test {
dag_state.flush();

// Add the rest of the blocks and commits to the dag state
dag_state.accept_blocks(temp_blocks.clone());
dag_state.accept_blocks(dag_builder.blocks(6..=num_rounds));
for commit in temp_commits.clone() {
dag_state.add_commit(commit);
}

// All blocks should be found in DagState.
let all_blocks = blocks
.clone()
.into_iter()
.chain(temp_blocks.clone())
.collect::<Vec<_>>();
let all_blocks = dag_builder.blocks(6..=num_rounds);
let block_refs = all_blocks
.iter()
.map(|block| block.reference())
Expand All @@ -1400,6 +1410,7 @@ mod test {

// Last commit index should be 10.
assert_eq!(dag_state.last_commit_index(), 10);
assert_eq!(dag_state.last_committed_rounds(), last_committed_rounds);

// Destroy the dag state.
drop(dag_state);
Expand All @@ -1408,6 +1419,7 @@ mod test {
let dag_state = DagState::new(context.clone(), store.clone());

// Blocks of first 5 rounds should be found in DagState.
let blocks = dag_builder.blocks(1..=5);
let block_refs = blocks
.iter()
.map(|block| block.reference())
Expand All @@ -1420,7 +1432,8 @@ mod test {
assert_eq!(result, blocks);

// Blocks above round 5 should not be in DagState, because they are not flushed.
let block_refs = temp_blocks
let missing_blocks = dag_builder.blocks(6..=num_rounds);
let block_refs = missing_blocks
.iter()
.map(|block| block.reference())
.collect::<Vec<_>>();
Expand All @@ -1433,6 +1446,15 @@ mod test {

// Last commit index should be 5.
assert_eq!(dag_state.last_commit_index(), 5);

// This is the last_commmit_rounds of the first 5 commits that were flushed
let expected_last_committed_rounds = vec![4, 5, 4, 4];
assert_eq!(
dag_state.last_committed_rounds(),
expected_last_committed_rounds
);
// Unscored subdags will be recoverd based on the flushed commits and no commit info
assert_eq!(dag_state.unscored_committed_subdags_count(), 5);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit b10ea73

Please sign in to comment.