Skip to content

Commit

Permalink
include only ancestors sync'd 90% & no weak links for EXCLUDE ancestors
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Nov 5, 2024
1 parent 6caad4a commit 0ec03ed
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 188 deletions.
4 changes: 4 additions & 0 deletions consensus/config/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl Committee {
self.total_stake
}

pub fn n_percent_stake_threshold(&self, n: u64) -> Stake {
self.total_stake * n / 100
}

pub fn quorum_threshold(&self) -> Stake {
self.quorum_threshold
}
Expand Down
22 changes: 16 additions & 6 deletions consensus/core/src/ancestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ impl AncestorStateManager {
#[cfg(test)]
const STATE_LOCK_SCORE_UPDATES: u32 = 1;

// Exclusion threshold is based on reputation scores
const EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 10;

// Inclusion threshold is based on network quorum round
const INCLUSION_THRESHOLD_PERCENTAGE: u64 = 90;

pub(crate) fn new(context: Arc<Context>, propagation_scores: ReputationScores) -> Self {
let state_map = vec![AncestorInfo::new(); context.committee.size()];

Expand Down Expand Up @@ -103,6 +107,12 @@ impl AncestorStateManager {
self.state_map.iter().map(|info| info.state).collect()
}

pub(crate) fn get_inclusion_stake_threshold(&self) -> u64 {
self.context
.committee
.n_percent_stake_threshold(Self::INCLUSION_THRESHOLD_PERCENTAGE)
}

/// Updates the state of all ancestors based on the latest scores and quorum rounds
pub(crate) fn update_all_ancestors_state(&mut self) {
let propagation_scores_by_authority = self
Expand Down Expand Up @@ -213,12 +223,12 @@ impl AncestorStateManager {
self.state_map[authority_id] = ancestor_info;
}

/// Calculate the network's low quorum round from 2f+1 authorities by stake,
/// where low quorum round is the highest round a block has been seen by 2f+1
/// authorities.
/// Calculate the network's quorum round from authorities by inclusion stake
/// threshold, where quorum round is the highest round a block has been seen
/// by a percentage (inclusion threshold) of authorities.
fn calculate_network_low_quorum_round(&self) -> u32 {
let committee = &self.context.committee;
let quorum_threshold = committee.quorum_threshold();
let inclusion_stake_threshold = self.get_inclusion_stake_threshold();
let mut low_quorum_rounds_with_stake = self
.quorum_round_per_authority
.iter()
Expand All @@ -231,9 +241,9 @@ impl AncestorStateManager {
let mut network_low_quorum_round = 0;

for (round, stake) in low_quorum_rounds_with_stake.iter().rev() {
let reached_quorum_before = total_stake >= quorum_threshold;
let reached_quorum_before = total_stake >= inclusion_stake_threshold;
total_stake += stake;
if !reached_quorum_before && total_stake >= quorum_threshold {
if !reached_quorum_before && total_stake >= inclusion_stake_threshold {
network_low_quorum_round = *round;
break;
}
Expand Down
43 changes: 2 additions & 41 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,51 +926,11 @@ impl Core {

assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core.");

let mut excluded_ancestors_count = 0;

// Inclusion of weak links for low score ancestors so there is no long
// list of blocks that we need to include later.
for (score, ancestor) in excluded_ancestors.iter() {
let excluded_author = ancestor.author();
let block_hostname = &self.context.committee.authority(excluded_author).hostname;
let mut network_low_quorum_round =
self.ancestor_state_manager.quorum_round_per_authority[excluded_author].0;

// If the network quourum round for this ancestor is greater than or equal
// to the clock round then we want to make sure to set it to clock_round - 1
// as that is the max round we can include as an ancestor.
network_low_quorum_round = network_low_quorum_round.min(quorum_round);

let last_included_ancestor_round = self.last_included_ancestors[excluded_author]
.map_or(GENESIS_ROUND, |block_ref| block_ref.round);

// Check if there is a block that is at a round higher than the last included
// ancestor round but less than or equal to the network low quorum round.
if let Some(last_cached_block_in_range) = self
.dag_state
.read()
.get_last_cached_block_for_authority_in_range(
excluded_author,
last_included_ancestor_round + 1,
network_low_quorum_round + 1,
)
{
// Include the ancestor block as it has been seen by a strong quorum
ancestors_to_propose.push(last_cached_block_in_range.clone());
trace!("Included low scoring ancestor {last_cached_block_in_range} with score {score} seen between last included round {last_included_ancestor_round} and network quorum round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
} else {
trace!("No cached block found for low scoring ancestor {ancestor} with score {score} between last included round {last_included_ancestor_round} and network quorum round {network_low_quorum_round} to propose for round {clock_round}");
}

trace!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
excluded_ancestors_count += 1;
self.context
.metrics
.node_metrics
Expand All @@ -980,8 +940,9 @@ impl Core {
}

info!(
"Included {} ancestors & excluded {excluded_ancestors_count} ancestors for proposal in round {clock_round}",
"Included {} ancestors & excluded {} ancestors for proposal in round {clock_round}",
ancestors_to_propose.len(),
excluded_ancestors.len()
);

ancestors_to_propose
Expand Down
161 changes: 20 additions & 141 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,65 +496,23 @@ impl DagState {
blocks
}

/// Returns the last block cached for the authority within a range.
/// This can return None if there is no block within the range provided.
pub(crate) fn get_last_cached_block_for_authority_in_range(
&self,
authority: AuthorityIndex,
start_round: Round,
end_round: Round,
) -> Option<VerifiedBlock> {
let blocks = self.get_last_cached_block_per_authority_in_range(start_round, end_round);
blocks[authority].clone()
}

/// Returns the last block cached per authority with `round < end_round`.
/// Returns the last block proposed per authority with `round < end_round`.
/// The method is guaranteed to return results only when the `end_round` is not earlier of the
/// available cached data for each authority, otherwise the method will panic - it's the caller's
/// responsibility to ensure that is not requesting filtering for earlier rounds.
/// responsibility to ensure that is not requesting filtering for earlier rounds .
/// In case of equivocation for an authority's last slot only one block will be returned (the last in order).
pub(crate) fn get_last_cached_block_per_authority(
&self,
end_round: Round,
) -> Vec<VerifiedBlock> {
let blocks = self.get_last_cached_block_per_authority_in_range(GENESIS_ROUND, end_round);

// Ensure every authority has a valid block as we will fall back to genesis
// blocks which should always be available.
blocks
.into_iter()
.enumerate()
.map(|(idx, block_opt)| {
let authority_index = self.context.committee.to_authority_index(idx).unwrap();
let hostname = &self.context.committee.authority(authority_index).hostname;
block_opt.unwrap_or_else(|| {
panic!("Expected to find a cached block for authority {hostname}")
})
})
.collect()
}

/// Returns the last block cached per authority within `start_round <= round < end_round`.
/// The method is not guranteed to return a block for the authority as None can
/// be returned if the authority cached blocks are outside of the range provided.
/// In case of equivocation for an authority's last slot only one block will be
/// returned (the last in order).
fn get_last_cached_block_per_authority_in_range(
&self,
start_round: u32,
end_round: u32,
) -> Vec<Option<VerifiedBlock>> {
let mut blocks = if start_round != GENESIS_ROUND {
// Init with None, as we will selectively insert cached blocks in range
vec![None; self.context.committee.size()]
} else {
// Init with the genesis blocks as fallback as genesis round is within range.
self.genesis.values().cloned().map(Some).collect::<Vec<_>>()
};
// init with the genesis blocks as fallback
let mut blocks = self.genesis.values().cloned().collect::<Vec<_>>();

assert!(
end_round != GENESIS_ROUND,
"Attempted to retrieve blocks earlier than the genesis round which is not possible"
);
if end_round == GENESIS_ROUND {
panic!(
"Attempted to retrieve blocks earlier than the genesis round which is not possible"
);
}

if end_round == GENESIS_ROUND + 1 {
return blocks;
Expand All @@ -568,21 +526,14 @@ impl DagState {
.unwrap();

let last_evicted_round = self.evicted_rounds[authority_index];

// If the last evicted round is after the start round, start from the
// last evicted round + 1.
let first_available_round = start_round.max(last_evicted_round + 1);

if first_available_round >= end_round {
debug!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} and the first available round is {first_available_round} for authority {authority_index}");
blocks[authority_index] = None;
continue;
if end_round.saturating_sub(1) <= last_evicted_round {
panic!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}", );
}

if let Some(block_ref) = block_refs
.range((
Included(BlockRef::new(
first_available_round,
last_evicted_round + 1,
authority_index,
BlockDigest::MIN,
)),
Expand All @@ -595,7 +546,7 @@ impl DagState {
.get(block_ref)
.expect("Block should exist in recent blocks");

blocks[authority_index] = Some(block.clone());
blocks[authority_index] = block.clone();
}
}

Expand Down Expand Up @@ -1996,82 +1947,6 @@ mod test {
assert_eq!(cached_blocks[0].round(), 12);
}

#[tokio::test]
async fn test_get_cached_last_block_for_authority_in_range() {
// GIVEN
const CACHED_ROUNDS: Round = 2;
let (mut context, _) = Context::new_for_test(4);
context.parameters.dag_state_cached_rounds = CACHED_ROUNDS;

let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let mut dag_state = DagState::new(context.clone(), store.clone());

// Create no blocks for authority 0
// Create one block (round 1) for authority 1
// Create two blocks (rounds 1,2) for authority 2
// Create three blocks (rounds 1,2,3) for authority 3
let mut all_blocks = Vec::new();
for author in 1..=3 {
for round in 1..=author {
let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build());
all_blocks.push(block.clone());
dag_state.accept_block(block);
}
}

dag_state.add_commit(TrustedCommit::new_for_test(
1 as CommitIndex,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
all_blocks.last().unwrap().reference(),
all_blocks
.into_iter()
.map(|block| block.reference())
.collect::<Vec<_>>(),
));

// WHEN search for the latest blocks between rounds (2..4)
let start_round = 2;
let end_round = 4;
let last_block = dag_state.get_last_cached_block_for_authority_in_range(
AuthorityIndex::new_for_test(3),
start_round,
end_round,
);

// THEN we should have found a block within the range
assert_eq!(last_block.unwrap().round(), 3);

// WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger
// a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND.
dag_state.flush();

// AND we request blocks between rounds (3..3)
let start_round = 3;
let end_round = 3;
let last_block = dag_state.get_last_cached_block_for_authority_in_range(
AuthorityIndex::new_for_test(3),
start_round,
end_round,
);

// THEN we hit the case where block is not found
assert!(last_block.is_none());

// AND we request blocks between intentionally incorrect rounds (4..3)
let start_round = 4;
let end_round = 3;
let last_block = dag_state.get_last_cached_block_for_authority_in_range(
AuthorityIndex::new_for_test(3),
start_round,
end_round,
);

// THEN we hit the case where block is not found
assert!(last_block.is_none());
}

#[rstest]
#[tokio::test]
async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) {
Expand Down Expand Up @@ -2154,7 +2029,9 @@ mod test {
}

#[tokio::test]
#[should_panic(expected = "Expected to find a cached block for authority test_host_2")]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
)]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range() {
// GIVEN
const CACHED_ROUNDS: Round = 1;
Expand Down Expand Up @@ -2199,7 +2076,9 @@ mod test {
}

#[tokio::test]
#[should_panic(expected = "Expected to find a cached block for authority test_host_2")]
#[should_panic(
expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C"
)]
async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() {
// GIVEN
const CACHED_ROUNDS: Round = 1;
Expand Down

0 comments on commit 0ec03ed

Please sign in to comment.