Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Oct 21, 2024
1 parent 236a2d9 commit cbf0789
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 64 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/ancestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl AncestorStateManager {
}

let low_score_threshold =
(self.propagation_scores.high_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100;
(self.propagation_scores.highest_score() * Self::EXCLUSION_THRESHOLD_PERCENTAGE) / 100;

match ancestor_info.state {
// Check conditions to switch to EXCLUDE state
Expand Down
78 changes: 34 additions & 44 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,10 @@ impl Core {
}
}

assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core.");

let mut excluded_ancestors_count = 0;

// Inclusion of weak links for low score ancestors so there is no long
// list of blocks that we need to include later.
for (score, ancestor) in excluded_ancestors.iter() {
Expand All @@ -961,49 +965,38 @@ impl Core {
// as that is the max round we can include as an ancestor.
network_low_quorum_round = network_low_quorum_round.min(quorum_round);

if let Some(last_block_ref) = self.last_included_ancestors[excluded_author] {
if last_block_ref.round < network_low_quorum_round {
if ancestor.round() == network_low_quorum_round {
// Include the ancestor block as it has been seen by a strong quorum
self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor.clone());
debug!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
}

// Fetch ancestor block from the store for older round
let blocks = self
.dag_state
.read()
.get_uncommitted_blocks_at_slot(Slot::new(
network_low_quorum_round,
excluded_author,
));

if let Some(block) = blocks.first() {
self.last_included_ancestors[excluded_author] = Some(block.reference());
ancestors_to_propose.push(block.clone());
debug!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
} else {
debug!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
}
}
let last_included_ancestor_round = self.last_included_ancestors[excluded_author]
.map_or(GENESIS_ROUND, |block_ref| block_ref.round);

// Check if there is a block that is at a round higher than the last included
// ancestor round but less than or equal to the network low quorum round.
if let Some(last_cached_block_in_range) = self
.dag_state
.read()
.get_last_cached_block_for_authority_in_range(
excluded_author,
last_included_ancestor_round + 1,
network_low_quorum_round + 1,
)
{
// Include the ancestor block as it has been seen by a strong quorum
self.last_included_ancestors[excluded_author] =
Some(last_cached_block_in_range.reference());
ancestors_to_propose.push(last_cached_block_in_range.clone());
debug!("Included low scoring ancestor {last_cached_block_in_range} with score {score} seen between last included round {last_included_ancestor_round} and network quorum round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
} else {
debug!("No cached block found for low scoring ancestor {ancestor} with score {score} between last included round {last_included_ancestor_round} and network quorum round {network_low_quorum_round} to propose for round {clock_round}");
}

debug!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
excluded_ancestors_count += 1;
self.context
.metrics
.node_metrics
Expand All @@ -1013,13 +1006,10 @@ impl Core {
}

info!(
"Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}",
"Included {} ancestors & excluded {excluded_ancestors_count} ancestors for proposal in round {clock_round}",
ancestors_to_propose.len(),
excluded_ancestors.len(),
);

assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core.");

ancestors_to_propose
}

Expand Down
142 changes: 124 additions & 18 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,19 @@ impl DagState {
blocks
}

/// Returns the last block proposed per authority with `round < end_round`.
/// Returns the last block cached for the authority within a range.
/// This can return None if there is no block within the range provided.
pub(crate) fn get_last_cached_block_for_authority_in_range(
&self,
authority: AuthorityIndex,
start_round: Round,
end_round: Round,
) -> Option<VerifiedBlock> {
let blocks = self.get_last_cached_blocks_per_authority_in_range(start_round, end_round);
blocks[authority].clone()
}

/// Returns the last block cached per authority with `round < end_round`.
/// The method is guaranteed to return results only when the `end_round` is not earlier of the
/// available cached data for each authority, otherwise the method will panic - it's the caller's
/// responsibility to ensure that is not requesting filtering for earlier rounds .
Expand All @@ -505,14 +517,40 @@ impl DagState {
&self,
end_round: Round,
) -> Vec<VerifiedBlock> {
// init with the genesis blocks as fallback
let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();
let blocks = self.get_last_cached_blocks_per_authority_in_range(GENESIS_ROUND, end_round);

if end_round == GENESIS_ROUND {
panic!(
"Attempted to retrieve blocks earlier than the genesis round which is not possible"
);
}
// Ensure every authority has a valid block as we will fall back to genesis
// blocks which should always be available.
blocks
.into_iter()
.enumerate()
.map(|(idx, block_opt)| {
let authority_index = self.context.committee.to_authority_index(idx).unwrap();
let hostname = &self.context.committee.authority(authority_index).hostname;
block_opt.unwrap_or_else(|| {
panic!("Expected to find a cached block for authority {hostname}")
})
})
.collect()
}

fn get_last_cached_blocks_per_authority_in_range(
&self,
start_round: u32,
end_round: u32,
) -> Vec<Option<VerifiedBlock>> {
let mut blocks = if start_round != GENESIS_ROUND {
// Init with None, as we will selectively insert cached blocks in range
vec![None; self.context.committee.size()]
} else {
// Init with the genesis blocks as fallback as genesis round is within range.
self.genesis.values().cloned().map(Some).collect::<Vec<_>>()
};

assert!(
end_round != GENESIS_ROUND,
"Attempted to retrieve blocks earlier than the genesis round which is not possible"
);

if end_round == GENESIS_ROUND + 1 {
return blocks;
Expand All @@ -526,14 +564,22 @@ impl DagState {
.unwrap();

let last_evicted_round = self.evicted_rounds[authority_index];
if end_round.saturating_sub(1) <= last_evicted_round {
panic!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}", );

// If the last evicted round is after the start round, start from the
// last evicted round + 1.
let first_included_round = start_round.max(last_evicted_round + 1);

if end_round.saturating_sub(1) <= last_evicted_round || first_included_round > end_round
{
debug!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} and the first included round is {first_included_round} for authority {authority_index}");
blocks[authority_index] = None;
continue;
}

if let Some(block_ref) = block_refs
.range((
Included(BlockRef::new(
last_evicted_round + 1,
first_included_round,
authority_index,
BlockDigest::MIN,
)),
Expand All @@ -546,7 +592,7 @@ impl DagState {
.get(block_ref)
.expect("Block should exist in recent blocks");

blocks[authority_index] = block.clone();
blocks[authority_index] = Some(block.clone());
}
}

Expand Down Expand Up @@ -1947,6 +1993,70 @@ mod test {
assert_eq!(cached_blocks[0].round(), 12);
}

#[tokio::test]
async fn test_get_cached_last_block_for_authority_in_range() {
// GIVEN
const CACHED_ROUNDS: Round = 2;
let (mut context, _) = Context::new_for_test(4);
context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;

let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let mut dag_state = DagState::new(context.clone(), store.clone());

// Create no blocks for authority 0
// Create one block (round 1) for authority 1
// Create two blocks (rounds 1,2) for authority 2
// Create three blocks (rounds 1,2,3) for authority 3
let mut all_blocks = Vec::new();
for author in 1..=3 {
for round in 1..=author {
let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
all_blocks.push(block.clone());
dag_state.accept_block(block);
}
}

dag_state.add_commit(TrustedCommit::new_for_test(
1 as CommitIndex,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
all_blocks.last().unwrap().reference(),
all_blocks
.into_iter()
.map(|block| block.reference())
.collect::<Vec<_>>(),
));

// WHEN search for the latest blocks between rounds (2..4)
let start_round = 2;
let end_round = 4;
let last_block = dag_state.get_last_cached_block_for_authority_in_range(
AuthorityIndex::new_for_test(3),
start_round,
end_round,
);

// THEN we should have found a block within the range
assert_eq!(last_block.unwrap().round(), 3);

// WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
// a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
dag_state.flush();

// AND we request blocks between rounds (3..3)
let start_round = 3;
let end_round = 3;
let last_block = dag_state.get_last_cached_block_for_authority_in_range(
AuthorityIndex::new_for_test(3),
start_round,
end_round,
);

// THEN we hit the case where block is not found
assert!(last_block.is_none());
}

#[rstest]
#[tokio::test]
async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) {
Expand Down Expand Up @@ -2029,9 +2139,7 @@ mod test {
}

#[tokio::test]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
)]
#[should_panic(expected = "Expected to find a cached block for authority test_host_2")]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
// GIVEN
const CACHED_ROUNDS: Round = 1;
Expand Down Expand Up @@ -2076,9 +2184,7 @@ mod test {
}

#[tokio::test]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
)]
#[should_panic(expected = "Expected to find a cached block for authority test_host_2")]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
// GIVEN
const CACHED_ROUNDS: Round = 1;
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/leader_scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ReputationScores {
}
}

pub(crate) fn high_score(&self) -> u64 {
pub(crate) fn highest_score(&self) -> u64 {
*self.scores_per_authority.iter().max().unwrap_or(&0)
}

Expand Down

0 comments on commit cbf0789

Please sign in to comment.