From 59ae4a5ed9a7654eaf59066ad319ec5828ae291b Mon Sep 17 00:00:00 2001 From: Arun Koshy Date: Tue, 1 Oct 2024 14:25:44 -0700 Subject: [PATCH] Add metrics and debug logs --- consensus/core/src/ancestor.rs | 6 +- consensus/core/src/core.rs | 100 +++++++++++++++++--------- consensus/core/src/leader_schedule.rs | 36 ---------- consensus/core/src/metrics.rs | 17 ++++- 4 files changed, 86 insertions(+), 73 deletions(-) diff --git a/consensus/core/src/ancestor.rs b/consensus/core/src/ancestor.rs index 86e9c20622735d..2dabcc653b3936 100644 --- a/consensus/core/src/ancestor.rs +++ b/consensus/core/src/ancestor.rs @@ -122,9 +122,9 @@ impl AncestorStateManager { network_low_quorum_round: u32, ) { let block_hostname = &self.context.committee.authority(authority_id).hostname; - let ancestor_info = self.state_map.get_mut(&authority_id).expect(&format!( - "Expected authority_id {authority_id} to be initialized in state_map", - )); + let ancestor_info = self.state_map.get_mut(&authority_id).unwrap_or_else(|| { + panic!("Expected authority_id {authority_id} to be initialized in state_map") + }); // Check if the lock period has expired if let Some(expiry) = ancestor_info.lock_expiry_time { diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index ce251efcb9ee47..80c2787eed96bc 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -356,17 +356,38 @@ impl Core { .start_timer(); let clock_round = self.threshold_clock.get_round(); + debug!("try_new_block force {force} for round {clock_round}"); if clock_round <= self.last_proposed_round() { + debug!( + "try_new_block failed due to clock_round {clock_round} <= last_proposed_round {}", + self.last_proposed_round() + ); return None; } // There must be a quorum of blocks from the previous round. let quorum_round = self.threshold_clock.get_round().saturating_sub(1); + let leader_authority = self + .context + .committee + .authority(self.first_leader(quorum_round)) + .hostname + .clone(); + self.context + .metrics + .node_metrics + .block_proposal_leader_wait_count + .with_label_values(&[&leader_authority]) + .inc(); + // Create a new block either because we want to "forcefully" propose a block due to a leader timeout, // or because we are actually ready to produce the block (leader exists and min delay has passed). if !force { if !self.leaders_exist(quorum_round) { + debug!( + "try_new_block failed due to leader not exists for quorum_round {quorum_round} for clock_round {clock_round}" + ); return None; } @@ -377,6 +398,7 @@ impl Core { .saturating_sub(self.last_proposed_timestamp_ms()), ) < self.context.parameters.min_round_delay { + debug!("try_new_block failed due to min round delay for clock_round {clock_round}"); return None; } } @@ -387,8 +409,7 @@ impl Core { // only when the validator was supposed to be the leader of the round - so we bring down the missed leaders. // Probably proposing for all the intermediate rounds might not make much sense. - // Determine the ancestors to be included in proposal - + // Determine the ancestors to be included in proposal. // Smart ancestor selection requires distributed scoring to be enabled. let ancestors = if self .context @@ -399,38 +420,32 @@ impl Core { // If we did not find enough good ancestors to propose, continue to wait before proposing. if ancestors.is_empty() { + debug!("try_new_block failed due to not enough good ancestors for clock_round {clock_round}"); assert!( !force, "Ancestors should have been returned if force is true!" ); return None; } + debug!( + "try_new_block SAS found {} ancestors for clock_round {clock_round}", + ancestors.len() + ); ancestors } else { self.ancestors_to_propose(clock_round) }; - let leader_authority = &self - .context - .committee - .authority(self.first_leader(quorum_round)) - .hostname; self.context .metrics .node_metrics .block_proposal_leader_wait_ms - .with_label_values(&[leader_authority]) + .with_label_values(&[&leader_authority]) .inc_by( Instant::now() .saturating_duration_since(self.threshold_clock.get_quorum_ts()) .as_millis() as u64, ); - self.context - .metrics - .node_metrics - .block_proposal_leader_wait_count - .with_label_values(&[leader_authority]) - .inc(); self.context .metrics @@ -510,13 +525,26 @@ impl Core { // Ensure the new block and its ancestors are persisted, before broadcasting it. self.dag_state.write().flush(); + let current_proposal_duration = Duration::from_millis(verified_block.timestamp_ms()); + let previous_proposal_duration = Duration::from_millis(self.last_proposed_timestamp_ms()); + self.context + .metrics + .node_metrics + .block_proposal_interval + .observe( + current_proposal_duration + .checked_sub(previous_proposal_duration) + .unwrap_or_else(|| Duration::from_millis(0)) + .as_secs_f64(), + ); + // Update internal state. self.last_proposed_block = verified_block.clone(); // Now acknowledge the transactions for their inclusion to block ack_transactions(verified_block.reference()); - info!("Created block {:?}", verified_block); + info!("Created block {verified_block:?} for round {clock_round}"); self.context .metrics @@ -858,6 +886,8 @@ impl Core { self.ancestor_state_manager.update_all_ancestors_state(); let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states(); + let quorum_round = clock_round.saturating_sub(1); + let mut temp_excluded_ancestors = Vec::new(); // Propose only ancestors of higher rounds than what has already been proposed. @@ -871,16 +901,14 @@ impl Core { .filter(|ancestor| ancestor.author() != self.context.own_index) .flat_map(|ancestor| { - let ancestor_state = ancestor_state_map.get(&ancestor.author()).expect( - &format!("Should have ancestor state for {}", ancestor.author()), - ); + let ancestor_state = ancestor_state_map.get(&ancestor.author()).unwrap_or_else(|| panic!("Should have ancestor state for {}", ancestor.author())); match ancestor_state { AncestorState::Include => { - debug!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}"); + info!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}"); } AncestorState::Exclude(score) => { - debug!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}"); + info!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}"); temp_excluded_ancestors.push((*score, ancestor)); return None; } @@ -901,20 +929,21 @@ impl Core { // Check total stake of high scoring parent round ancestors for ancestor in included_ancestors .iter() - .filter(|a| a.round() == clock_round - 1) + .filter(|a| a.round() == quorum_round) { parent_round_quorum.add(ancestor.author(), &self.context.committee); } if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) { - debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); + self.context.metrics.node_metrics.smart_selection_wait.inc(); + info!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake()); return vec![]; } // Now we can update the last included ancestor block refs as there is no way // to short circuit ancestor proposal. for ancestor in included_ancestors.iter() { - debug!( + info!( "Included ancestor {ancestor} to propose for round {clock_round}, own_block = {}", ancestor.author() == self.context.own_index ); @@ -929,13 +958,20 @@ impl Core { let mut excluded_ancestors = Vec::new(); for (score, ancestor) in temp_excluded_ancestors.into_iter() { + let block_hostname = &self.context.committee.authority(ancestor.author()).hostname; if !parent_round_quorum.reached_threshold(&self.context.committee) - && ancestor.round() == clock_round - 1 + && ancestor.round() == quorum_round { - debug!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); + info!("Including temporarily excluded strong link ancestor {ancestor} with score {score} to propose for round {clock_round}"); parent_round_quorum.add(ancestor.author(), &self.context.committee); self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference()); ancestors_to_propose.push(ancestor); + self.context + .metrics + .node_metrics + .included_excluded_proposal_ancestors_count_by_authority + .with_label_values(&[block_hostname, "strong"]) + .inc(); } else { excluded_ancestors.push((score, ancestor)); } @@ -952,7 +988,7 @@ impl Core { // If the network quourum round for this ancestor is greater than or equal // to the clock round then we want to make sure to set it to clock_round - 1 // as that is the max round we can include as an ancestor. - network_low_quorum_round = network_low_quorum_round.min(clock_round - 1); + network_low_quorum_round = network_low_quorum_round.min(quorum_round); if let Some(last_block_ref) = self.last_included_ancestors[excluded_author] { if last_block_ref.round < network_low_quorum_round { @@ -960,12 +996,12 @@ impl Core { // Include the ancestor block as it has been seen by a strong quorum self.last_included_ancestors[excluded_author] = Some(ancestor.reference()); ancestors_to_propose.push(ancestor.clone()); - debug!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); + info!("Included low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); self.context .metrics .node_metrics .included_excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname]) + .with_label_values(&[block_hostname, "weak"]) .inc(); continue; } @@ -982,21 +1018,21 @@ impl Core { if let Some(block) = blocks.first() { self.last_included_ancestors[excluded_author] = Some(block.reference()); ancestors_to_propose.push(block.clone()); - debug!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); + info!("Included low scoring ancestor {block} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); self.context .metrics .node_metrics .included_excluded_proposal_ancestors_count_by_authority - .with_label_values(&[block_hostname]) + .with_label_values(&[block_hostname, "weak"]) .inc(); continue; } else { - debug!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); + info!("No earlier uncommitted block found for low scoring ancestor {ancestor} with score {score} seen at network quorum round {network_low_quorum_round} to propose for round {clock_round}"); } } } - debug!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}"); + info!("Excluded low score ancestor {ancestor} with score {score} to propose for round {clock_round}"); self.context .metrics .node_metrics diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index 0733e5bc2c636c..4990580915373d 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -136,13 +136,6 @@ impl LeaderSchedule { .with_label_values(&["LeaderSchedule::update_leader_schedule"]) .start_timer(); - let s1 = self - .context - .metrics - .node_metrics - .scope_processing_time - .with_label_values(&["LeaderSchedule::update_leader_schedule::calculate_scoring_subdag_distributed_vote_scores"]) - .start_timer(); let (reputation_scores, last_commit_index) = { let dag_state = dag_state.read(); let reputation_scores = dag_state.calculate_scoring_subdag_distributed_vote_scores(); @@ -151,17 +144,7 @@ impl LeaderSchedule { (reputation_scores, last_commit_index) }; - drop(s1); - let s2 = self - .context - .metrics - .node_metrics - .scope_processing_time - .with_label_values(&[ - "LeaderSchedule::update_leader_schedule::clear_scoring_subdag_&_add_commit_info", - ]) - .start_timer(); { let mut dag_state = dag_state.write(); // Clear scoring subdag as we have updated the leader schedule @@ -169,31 +152,13 @@ impl LeaderSchedule { // Buffer score and last commit rounds in dag state to be persisted later dag_state.add_commit_info(reputation_scores.clone()); } - drop(s2); - let s3 = self - .context - .metrics - .node_metrics - .scope_processing_time - .with_label_values(&[ - "LeaderSchedule::update_leader_schedule::update_leader_swap_table", - ]) - .start_timer(); self.update_leader_swap_table(LeaderSwapTable::new( self.context.clone(), last_commit_index, reputation_scores.clone(), )); - drop(s3); - let s4 = self - .context - .metrics - .node_metrics - .scope_processing_time - .with_label_values(&["LeaderSchedule::update_leader_schedule::update_metrics"]) - .start_timer(); reputation_scores.update_metrics(self.context.clone()); self.context @@ -201,7 +166,6 @@ impl LeaderSchedule { .node_metrics .num_of_bad_nodes .set(self.leader_swap_table.read().bad_nodes.len() as i64); - drop(s4); } // TODO: Remove when DistributedVoteScoring is enabled. diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 847a6c7357c62e..1adc47b16831a8 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -138,8 +138,10 @@ pub(crate) struct NodeMetrics { pub(crate) last_known_own_block_round: IntGauge, pub(crate) sync_last_known_own_block_retries: IntCounter, pub(crate) commit_round_advancement_interval: Histogram, + pub(crate) block_proposal_interval: Histogram, pub(crate) last_decided_leader_round: IntGauge, pub(crate) leader_timeout_total: IntCounterVec, + pub(crate) smart_selection_wait: IntCounter, pub(crate) ancestor_state_change_by_authority: IntCounterVec, pub(crate) excluded_proposal_ancestors_count_by_authority: IntCounterVec, pub(crate) included_excluded_proposal_ancestors_count_by_authority: IntCounterVec, @@ -418,6 +420,12 @@ impl NodeMetrics { "Index of the last commit.", registry, ).unwrap(), + block_proposal_interval: register_histogram_with_registry!( + "block_proposal_interval", + "Intervals (in secs) between block proposals.", + FINE_GRAINED_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ).unwrap(), commit_round_advancement_interval: register_histogram_with_registry!( "commit_round_advancement_interval", "Intervals (in secs) between commit round advancements.", @@ -435,6 +443,11 @@ impl NodeMetrics { &["timeout_type"], registry, ).unwrap(), + smart_selection_wait: register_int_counter_with_registry!( + "smart_selection_wait", + "Number of times we waited for smart ancestor selection.", + registry, + ).unwrap(), ancestor_state_change_by_authority: register_int_counter_vec_with_registry!( "ancestor_state_change_by_authority", "The total number of times an ancestor state changed to EXCLUDE or INCLUDE.", @@ -449,8 +462,8 @@ impl NodeMetrics { ).unwrap(), included_excluded_proposal_ancestors_count_by_authority: register_int_counter_vec_with_registry!( "included_excluded_proposal_ancestors_count_by_authority", - "Total number of included excluded ancestors per authority during proposal.", - &["authority"], + "Total number of included excluded ancestors per authority during proposal. Either weak or strong type", + &["authority", "type"], registry, ).unwrap(), missing_blocks_total: register_int_counter_with_registry!(