Skip to content

Commit

Permalink
Ensure leader blocks are not exlcuded
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Oct 1, 2024
1 parent e33f913 commit c209697
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 60 deletions.
6 changes: 3 additions & 3 deletions consensus/core/src/ancestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ impl AncestorStateManager {
network_low_quorum_round: u32,
) {
let block_hostname = &self.context.committee.authority(authority_id).hostname;
let ancestor_info = self.state_map.get_mut(&authority_id).expect(&format!(
"Expected authority_id {authority_id} to be initialized in state_map",
));
let ancestor_info = self.state_map.get_mut(&authority_id).unwrap_or_else(|| {
panic!("Expected authority_id {authority_id} to be initialized in state_map")
});

// Check if the lock period has expired
if let Some(expiry) = ancestor_info.lock_expiry_time {
Expand Down
71 changes: 52 additions & 19 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,7 @@ impl Core {
// only when the validator was supposed to be the leader of the round - so we bring down the missed leaders.
// Probably proposing for all the intermediate rounds might not make much sense.

// Determine the ancestors to be included in proposal

// Determine the ancestors to be included in proposal.
// Smart ancestor selection requires distributed scoring to be enabled.
let ancestors = if self
.context
Expand Down Expand Up @@ -858,6 +857,9 @@ impl Core {
self.ancestor_state_manager.update_all_ancestors_state();
let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();

let quorum_round = clock_round.saturating_sub(1);
let leaders = self.cached_leaders(quorum_round);

let mut temp_excluded_ancestors = Vec::new();

// Propose only ancestors of higher rounds than what has already been proposed.
Expand All @@ -871,16 +873,14 @@ impl Core {
.filter(|ancestor| ancestor.author() != self.context.own_index)
.flat_map(|ancestor| {

let ancestor_state = ancestor_state_map.get(&ancestor.author()).expect(
&format!("Should have ancestor state for {}", ancestor.author()),
);
let ancestor_state = ancestor_state_map.get(&ancestor.author()).unwrap_or_else(|| panic!("Should have ancestor state for {}", ancestor.author()));

match ancestor_state {
AncestorState::Include => {
debug!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
info!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
}
AncestorState::Exclude(score) => {
debug!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
info!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
temp_excluded_ancestors.push((*score, ancestor));
return None;
}
Expand All @@ -901,20 +901,21 @@ impl Core {
// Check total stake of high scoring parent round ancestors
for ancestor in included_ancestors
.iter()
.filter(|a| a.round() == clock_round - 1)
.filter(|a| a.round() == quorum_round)
{
parent_round_quorum.add(ancestor.author(), &self.context.committee);
}

if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake());
self.context.metrics.node_metrics.smart_selection_wait.inc();
info!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake());
return vec![];
}

// Now we can update the last included ancestor block refs as there is no way
// to short circuit ancestor proposal.
for ancestor in included_ancestors.iter() {
debug!(
info!(
"Included ancestor {ancestor} to propose for round {clock_round}, own_block = {}",
ancestor.author() == self.context.own_index
);
Expand All @@ -929,13 +930,33 @@ impl Core {
let mut excluded_ancestors = Vec::new();

for (score, ancestor) in temp_excluded_ancestors.into_iter() {
let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
if !parent_round_quorum.reached_threshold(&self.context.committee)
&& ancestor.round() == clock_round - 1
&& ancestor.round() == quorum_round
{
debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}");
info!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}");
parent_round_quorum.add(ancestor.author(), &self.context.committee);
self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor);
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "strong"])
.inc();
} else if leaders.contains(&ancestor.author()) {
// Make sure to include all ancestors that are leaders.
info!("Including temporarily excluded leader ancestor {ancestor} with score {score} to propose for round {clock_round}");
assert!(ancestor.round() == quorum_round);
parent_round_quorum.add(ancestor.author(), &self.context.committee);
self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor);
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "leader"])
.inc();
} else {
excluded_ancestors.push((score, ancestor));
}
Expand All @@ -952,20 +973,20 @@ impl Core {
// If the network quourum round for this ancestor is greater than or equal
// to the clock round then we want to make sure to set it to clock_round - 1
// as that is the max round we can include as an ancestor.
network_low_quorum_round = network_low_quorum_round.min(clock_round - 1);
network_low_quorum_round = network_low_quorum_round.min(quorum_round);

if let Some(last_block_ref) = self.last_included_ancestors[excluded_author] {
if last_block_ref.round < network_low_quorum_round {
if ancestor.round() == network_low_quorum_round {
// Include the ancestor block as it has been seen by a strong quorum
self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor.clone());
debug!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
info!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
}
Expand All @@ -982,21 +1003,21 @@ impl Core {
if let Some(block) = blocks.first() {
self.last_included_ancestors[excluded_author] = Some(block.reference());
ancestors_to_propose.push(block.clone());
debug!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
info!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
} else {
debug!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
info!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
}
}
}

debug!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
info!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
Expand All @@ -1016,6 +1037,18 @@ impl Core {
ancestors_to_propose
}

fn cached_leaders(&self, round: Round) -> Vec<AuthorityIndex> {
let dag_state = self.dag_state.read();
let mut leaders = Vec::new();
for leader in self.leaders(round) {
if dag_state.contains_cached_block_at_slot(leader) {
leaders.push(leader.authority);
}
}

leaders
}

/// Checks whether all the leaders of the round exist.
/// TODO: we can leverage some additional signal here in order to more cleverly manipulate later the leader timeout
/// Ex if we already have one leader - the first in order - we might don't want to wait as much.
Expand Down
36 changes: 0 additions & 36 deletions consensus/core/src/leader_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,6 @@ impl LeaderSchedule {
.with_label_values(&["LeaderSchedule::update_leader_schedule"])
.start_timer();

let s1 = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["LeaderSchedule::update_leader_schedule::calculate_scoring_subdag_distributed_vote_scores"])
.start_timer();
let (reputation_scores, last_commit_index) = {
let dag_state = dag_state.read();
let reputation_scores = dag_state.calculate_scoring_subdag_distributed_vote_scores();
Expand All @@ -151,57 +144,28 @@ impl LeaderSchedule {

(reputation_scores, last_commit_index)
};
drop(s1);

let s2 = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&[
"LeaderSchedule::update_leader_schedule::clear_scoring_subdag_&_add_commit_info",
])
.start_timer();
{
let mut dag_state = dag_state.write();
// Clear scoring subdag as we have updated the leader schedule
dag_state.clear_scoring_subdag();
// Buffer score and last commit rounds in dag state to be persisted later
dag_state.add_commit_info(reputation_scores.clone());
}
drop(s2);

let s3 = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&[
"LeaderSchedule::update_leader_schedule::update_leader_swap_table",
])
.start_timer();
self.update_leader_swap_table(LeaderSwapTable::new(
self.context.clone(),
last_commit_index,
reputation_scores.clone(),
));
drop(s3);

let s4 = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["LeaderSchedule::update_leader_schedule::update_metrics"])
.start_timer();
reputation_scores.update_metrics(self.context.clone());

self.context
.metrics
.node_metrics
.num_of_bad_nodes
.set(self.leader_swap_table.read().bad_nodes.len() as i64);
drop(s4);
}

// TODO: Remove when DistributedVoteScoring is enabled.
Expand Down
10 changes: 8 additions & 2 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub(crate) struct NodeMetrics {
pub(crate) commit_round_advancement_interval: Histogram,
pub(crate) last_decided_leader_round: IntGauge,
pub(crate) leader_timeout_total: IntCounterVec,
pub(crate) smart_selection_wait: IntCounter,
pub(crate) ancestor_state_change_by_authority: IntCounterVec,
pub(crate) excluded_proposal_ancestors_count_by_authority: IntCounterVec,
pub(crate) included_excluded_proposal_ancestors_count_by_authority: IntCounterVec,
Expand Down Expand Up @@ -435,6 +436,11 @@ impl NodeMetrics {
&["timeout_type"],
registry,
).unwrap(),
smart_selection_wait: register_int_counter_with_registry!(
"smart_selection_wait",
"Number of times we waited for smart ancestor selection.",
registry,
).unwrap(),
ancestor_state_change_by_authority: register_int_counter_vec_with_registry!(
"ancestor_state_change_by_authority",
"The total number of times an ancestor state changed to EXCLUDE or INCLUDE.",
Expand All @@ -449,8 +455,8 @@ impl NodeMetrics {
).unwrap(),
included_excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!(
"included_excluded_proposal_ancestors_count_by_authority",
"Total number of included excluded ancestors per authority during proposal.",
&["authority"],
"Total number of included excluded ancestors per authority during proposal. Either weak, strong, leader type",
&["authority", "type"],
registry,
).unwrap(),
missing_blocks_total: register_int_counter_with_registry!(
Expand Down

0 comments on commit c209697

Please sign in to comment.