Skip to content

Commit

Permalink
Add metrics and debug logs
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Oct 2, 2024
1 parent e33f913 commit 59ae4a5
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 73 deletions.
6 changes: 3 additions & 3 deletions consensus/core/src/ancestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ impl AncestorStateManager {
network_low_quorum_round: u32,
) {
let block_hostname = &self.context.committee.authority(authority_id).hostname;
let ancestor_info = self.state_map.get_mut(&authority_id).expect(&format!(
"Expected authority_id {authority_id} to be initialized in state_map",
));
let ancestor_info = self.state_map.get_mut(&authority_id).unwrap_or_else(|| {
panic!("Expected authority_id {authority_id} to be initialized in state_map")
});

// Check if the lock period has expired
if let Some(expiry) = ancestor_info.lock_expiry_time {
Expand Down
100 changes: 68 additions & 32 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,38 @@ impl Core {
.start_timer();

let clock_round = self.threshold_clock.get_round();
debug!("try_new_block force {force} for round {clock_round}");
if clock_round <= self.last_proposed_round() {
debug!(
"try_new_block failed due to clock_round {clock_round} <= last_proposed_round {}",
self.last_proposed_round()
);
return None;
}

// There must be a quorum of blocks from the previous round.
let quorum_round = self.threshold_clock.get_round().saturating_sub(1);

let leader_authority = self
.context
.committee
.authority(self.first_leader(quorum_round))
.hostname
.clone();
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_count
.with_label_values(&[&leader_authority])
.inc();

// Create a new block either because we want to "forcefully" propose a block due to a leader timeout,
// or because we are actually ready to produce the block (leader exists and min delay has passed).
if !force {
if !self.leaders_exist(quorum_round) {
debug!(
"try_new_block failed due to leader not exists for quorum_round {quorum_round} for clock_round {clock_round}"
);
return None;
}

Expand All @@ -377,6 +398,7 @@ impl Core {
.saturating_sub(self.last_proposed_timestamp_ms()),
) < self.context.parameters.min_round_delay
{
debug!("try_new_block failed due to min round delay for clock_round {clock_round}");
return None;
}
}
Expand All @@ -387,8 +409,7 @@ impl Core {
// only when the validator was supposed to be the leader of the round - so we bring down the missed leaders.
// Probably proposing for all the intermediate rounds might not make much sense.

// Determine the ancestors to be included in proposal

// Determine the ancestors to be included in proposal.
// Smart ancestor selection requires distributed scoring to be enabled.
let ancestors = if self
.context
Expand All @@ -399,38 +420,32 @@ impl Core {

// If we did not find enough good ancestors to propose, continue to wait before proposing.
if ancestors.is_empty() {
debug!("try_new_block failed due to not enough good ancestors for clock_round {clock_round}");
assert!(
!force,
"Ancestors should have been returned if force is true!"
);
return None;
}
debug!(
"try_new_block SAS found {} ancestors for clock_round {clock_round}",
ancestors.len()
);
ancestors
} else {
self.ancestors_to_propose(clock_round)
};

let leader_authority = &self
.context
.committee
.authority(self.first_leader(quorum_round))
.hostname;
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_ms
.with_label_values(&[leader_authority])
.with_label_values(&[&leader_authority])
.inc_by(
Instant::now()
.saturating_duration_since(self.threshold_clock.get_quorum_ts())
.as_millis() as u64,
);
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_count
.with_label_values(&[leader_authority])
.inc();

self.context
.metrics
Expand Down Expand Up @@ -510,13 +525,26 @@ impl Core {
// Ensure the new block and its ancestors are persisted, before broadcasting it.
self.dag_state.write().flush();

let current_proposal_duration = Duration::from_millis(verified_block.timestamp_ms());
let previous_proposal_duration = Duration::from_millis(self.last_proposed_timestamp_ms());
self.context
.metrics
.node_metrics
.block_proposal_interval
.observe(
current_proposal_duration
.checked_sub(previous_proposal_duration)
.unwrap_or_else(|| Duration::from_millis(0))
.as_secs_f64(),
);

// Update internal state.
self.last_proposed_block = verified_block.clone();

// Now acknowledge the transactions for their inclusion to block
ack_transactions(verified_block.reference());

info!("Created block {:?}", verified_block);
info!("Created block {verified_block:?} for round {clock_round}");

self.context
.metrics
Expand Down Expand Up @@ -858,6 +886,8 @@ impl Core {
self.ancestor_state_manager.update_all_ancestors_state();
let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();

let quorum_round = clock_round.saturating_sub(1);

let mut temp_excluded_ancestors = Vec::new();

// Propose only ancestors of higher rounds than what has already been proposed.
Expand All @@ -871,16 +901,14 @@ impl Core {
.filter(|ancestor| ancestor.author() != self.context.own_index)
.flat_map(|ancestor| {

let ancestor_state = ancestor_state_map.get(&ancestor.author()).expect(
&format!("Should have ancestor state for {}", ancestor.author()),
);
let ancestor_state = ancestor_state_map.get(&ancestor.author()).unwrap_or_else(|| panic!("Should have ancestor state for {}", ancestor.author()));

match ancestor_state {
AncestorState::Include => {
debug!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
info!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
}
AncestorState::Exclude(score) => {
debug!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
info!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
temp_excluded_ancestors.push((*score, ancestor));
return None;
}
Expand All @@ -901,20 +929,21 @@ impl Core {
// Check total stake of high scoring parent round ancestors
for ancestor in included_ancestors
.iter()
.filter(|a| a.round() == clock_round - 1)
.filter(|a| a.round() == quorum_round)
{
parent_round_quorum.add(ancestor.author(), &self.context.committee);
}

if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake());
self.context.metrics.node_metrics.smart_selection_wait.inc();
info!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake());
return vec![];
}

// Now we can update the last included ancestor block refs as there is no way
// to short circuit ancestor proposal.
for ancestor in included_ancestors.iter() {
debug!(
info!(
"Included ancestor {ancestor} to propose for round {clock_round}, own_block = {}",
ancestor.author() == self.context.own_index
);
Expand All @@ -929,13 +958,20 @@ impl Core {
let mut excluded_ancestors = Vec::new();

for (score, ancestor) in temp_excluded_ancestors.into_iter() {
let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
if !parent_round_quorum.reached_threshold(&self.context.committee)
&& ancestor.round() == clock_round - 1
&& ancestor.round() == quorum_round
{
debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}");
info!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}");
parent_round_quorum.add(ancestor.author(), &self.context.committee);
self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor);
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "strong"])
.inc();
} else {
excluded_ancestors.push((score, ancestor));
}
Expand All @@ -952,20 +988,20 @@ impl Core {
// If the network quourum round for this ancestor is greater than or equal
// to the clock round then we want to make sure to set it to clock_round - 1
// as that is the max round we can include as an ancestor.
network_low_quorum_round = network_low_quorum_round.min(clock_round - 1);
network_low_quorum_round = network_low_quorum_round.min(quorum_round);

if let Some(last_block_ref) = self.last_included_ancestors[excluded_author] {
if last_block_ref.round < network_low_quorum_round {
if ancestor.round() == network_low_quorum_round {
// Include the ancestor block as it has been seen by a strong quorum
self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor.clone());
debug!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
info!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
}
Expand All @@ -982,21 +1018,21 @@ impl Core {
if let Some(block) = blocks.first() {
self.last_included_ancestors[excluded_author] = Some(block.reference());
ancestors_to_propose.push(block.clone());
debug!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
info!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.with_label_values(&[block_hostname, "weak"])
.inc();
continue;
} else {
debug!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
info!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}");
}
}
}

debug!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
info!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}");
self.context
.metrics
.node_metrics
Expand Down
36 changes: 0 additions & 36 deletions consensus/core/src/leader_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,6 @@ impl LeaderSchedule {
.with_label_values(&["LeaderSchedule::update_leader_schedule"])
.start_timer();

let s1 = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["LeaderSchedule::update_leader_schedule::calculate_scoring_subdag_distributed_vote_scores"])
.start_timer();
let (reputation_scores, last_commit_index) = {
let dag_state = dag_state.read();
let reputation_scores = dag_state.calculate_scoring_subdag_distributed_vote_scores();
Expand All @@ -151,57 +144,28 @@ impl LeaderSchedule {

(reputation_scores, last_commit_index)
};
drop(s1);

let s2 = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&[
"LeaderSchedule::update_leader_schedule::clear_scoring_subdag_&_add_commit_info",
])
.start_timer();
{
let mut dag_state = dag_state.write();
// Clear scoring subdag as we have updated the leader schedule
dag_state.clear_scoring_subdag();
// Buffer score and last commit rounds in dag state to be persisted later
dag_state.add_commit_info(reputation_scores.clone());
}
drop(s2);

let s3 = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&[
"LeaderSchedule::update_leader_schedule::update_leader_swap_table",
])
.start_timer();
self.update_leader_swap_table(LeaderSwapTable::new(
self.context.clone(),
last_commit_index,
reputation_scores.clone(),
));
drop(s3);

let s4 = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["LeaderSchedule::update_leader_schedule::update_metrics"])
.start_timer();
reputation_scores.update_metrics(self.context.clone());

self.context
.metrics
.node_metrics
.num_of_bad_nodes
.set(self.leader_swap_table.read().bad_nodes.len() as i64);
drop(s4);
}

// TODO: Remove when DistributedVoteScoring is enabled.
Expand Down
17 changes: 15 additions & 2 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ pub(crate) struct NodeMetrics {
pub(crate) last_known_own_block_round: IntGauge,
pub(crate) sync_last_known_own_block_retries: IntCounter,
pub(crate) commit_round_advancement_interval: Histogram,
pub(crate) block_proposal_interval: Histogram,
pub(crate) last_decided_leader_round: IntGauge,
pub(crate) leader_timeout_total: IntCounterVec,
pub(crate) smart_selection_wait: IntCounter,
pub(crate) ancestor_state_change_by_authority: IntCounterVec,
pub(crate) excluded_proposal_ancestors_count_by_authority: IntCounterVec,
pub(crate) included_excluded_proposal_ancestors_count_by_authority: IntCounterVec,
Expand Down Expand Up @@ -418,6 +420,12 @@ impl NodeMetrics {
"Index of the last commit.",
registry,
).unwrap(),
block_proposal_interval: register_histogram_with_registry!(
"block_proposal_interval",
"Intervals (in secs) between block proposals.",
FINE_GRAINED_LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
commit_round_advancement_interval: register_histogram_with_registry!(
"commit_round_advancement_interval",
"Intervals (in secs) between commit round advancements.",
Expand All @@ -435,6 +443,11 @@ impl NodeMetrics {
&["timeout_type"],
registry,
).unwrap(),
smart_selection_wait: register_int_counter_with_registry!(
"smart_selection_wait",
"Number of times we waited for smart ancestor selection.",
registry,
).unwrap(),
ancestor_state_change_by_authority: register_int_counter_vec_with_registry!(
"ancestor_state_change_by_authority",
"The total number of times an ancestor state changed to EXCLUDE or INCLUDE.",
Expand All @@ -449,8 +462,8 @@ impl NodeMetrics {
).unwrap(),
included_excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!(
"included_excluded_proposal_ancestors_count_by_authority",
"Total number of included excluded ancestors per authority during proposal.",
&["authority"],
"Total number of included excluded ancestors per authority during proposal. Either weak or strong type",
&["authority", "type"],
registry,
).unwrap(),
missing_blocks_total: register_int_counter_with_registry!(
Expand Down

0 comments on commit 59ae4a5

Please sign in to comment.