Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus] return quorum rounds from round prober #19703

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,20 +600,21 @@ async fn make_recv_future<T: Clone>(

#[cfg(test)]
mod tests {
use crate::commit::CommitRange;
use crate::test_dag_builder::DagBuilder;
use crate::{
authority_service::AuthorityService,
block::BlockAPI,
block::{BlockRef, SignedBlock, TestBlock, VerifiedBlock},
commit::CommitRange,
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
dag_state::DagState,
error::ConsensusResult,
network::{BlockStream, NetworkClient, NetworkService},
round_prober::QuorumRound,
storage::mem_store::MemStore,
synchronizer::Synchronizer,
test_dag_builder::DagBuilder,
Round,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -665,7 +666,11 @@ mod tests {
todo!()
}

fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}

Expand Down
12 changes: 9 additions & 3 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
round_prober::QuorumRound,
stake_aggregator::{QuorumThreshold, StakeAggregator},
threshold_clock::ThresholdClock,
transaction::TransactionConsumer,
Expand Down Expand Up @@ -645,7 +646,12 @@ impl Core {
}

/// Sets the delay by round for propagating blocks to a quorum.
pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
// TODO: Will set the quorum round per authority in ancestor state manager.
pub(crate) fn set_propagation_delay_and_quorum_rounds(
&mut self,
delay: Round,
_quorum_rounds: Vec<QuorumRound>,
) {
info!("Propagation round delay set to: {delay}");
self.propagation_delay = delay;
}
Expand Down Expand Up @@ -1726,7 +1732,7 @@ mod test {
);

// Use a large propagation delay to disable proposing.
core.set_propagation_delay(1000);
core.set_propagation_delay_and_quorum_rounds(1000, vec![]);

// Make propagation delay the only reason for not proposing.
core.set_subscriber_exists(true);
Expand All @@ -1735,7 +1741,7 @@ mod test {
assert!(core.try_propose(true).unwrap().is_none());

// Let Core know there is no propagation delay.
core.set_propagation_delay(0);
core.set_propagation_delay_and_quorum_rounds(0, vec![]);

// Proposing now would succeed.
assert!(core.try_propose(true).unwrap().is_some());
Expand Down
49 changes: 33 additions & 16 deletions consensus/core/src/core_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
core_thread::CoreError::Shutdown,
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
round_prober::QuorumRound,
BlockAPI as _,
};

Expand Down Expand Up @@ -65,8 +66,13 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static {
/// It is not a guarantee that produced blocks will be accepted by peers.
fn set_subscriber_exists(&self, exists: bool) -> Result<(), CoreError>;

/// Sets the estimated delay to propagate a block to a quorum of peers, in number of rounds.
fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError>;
/// Sets the estimated delay to propagate a block to a quorum of peers, in
/// number of rounds, and the quorum rounds for all authorities.
fn set_propagation_delay_and_quorum_rounds(
&self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError>;

fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;

Expand All @@ -91,7 +97,7 @@ struct CoreThread {
core: Core,
receiver: Receiver<CoreThreadCommand>,
rx_subscriber_exists: watch::Receiver<bool>,
rx_propagation_delay: watch::Receiver<Round>,
rx_propagation_delay_and_quorum_rounds: watch::Receiver<(Round, Vec<QuorumRound>)>,
rx_last_known_proposed_round: watch::Receiver<Round>,
context: Arc<Context>,
}
Expand Down Expand Up @@ -141,11 +147,11 @@ impl CoreThread {
self.core.new_block(Round::MAX, true)?;
}
}
_ = self.rx_propagation_delay.changed() => {
let _scope = monitored_scope("CoreThread::loop::set_propagation_delay");
_ = self.rx_propagation_delay_and_quorum_rounds.changed() => {
let _scope = monitored_scope("CoreThread::loop::set_propagation_delay_and_quorum_rounds");
let should_propose_before = self.core.should_propose();
let delay = *self.rx_propagation_delay.borrow();
self.core.set_propagation_delay(delay);
let (delay, quorum_rounds) = self.rx_propagation_delay_and_quorum_rounds.borrow().clone();
self.core.set_propagation_delay_and_quorum_rounds(delay, quorum_rounds);
if !should_propose_before && self.core.should_propose() {
// If core cannnot propose before but can propose now, try to produce a new block to ensure liveness,
// because block proposal could have been skipped.
Expand All @@ -164,7 +170,7 @@ pub(crate) struct ChannelCoreThreadDispatcher {
context: Arc<Context>,
sender: WeakSender<CoreThreadCommand>,
tx_subscriber_exists: Arc<watch::Sender<bool>>,
tx_propagation_delay: Arc<watch::Sender<Round>>,
tx_propagation_delay_and_quorum_rounds: Arc<watch::Sender<(Round, Vec<QuorumRound>)>>,
tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
highest_received_rounds: Arc<Vec<AtomicU32>>,
}
Expand All @@ -190,16 +196,17 @@ impl ChannelCoreThreadDispatcher {
let (sender, receiver) =
channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
let (tx_subscriber_exists, mut rx_subscriber_exists) = watch::channel(false);
let (tx_propagation_delay, mut rx_propagation_delay) = watch::channel(0);
let (tx_propagation_delay_and_quorum_rounds, mut rx_propagation_delay_and_quorum_rounds) =
watch::channel((0, vec![(0, 0); context.committee.size()]));
let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
rx_subscriber_exists.mark_unchanged();
rx_propagation_delay.mark_unchanged();
rx_propagation_delay_and_quorum_rounds.mark_unchanged();
rx_last_known_proposed_round.mark_unchanged();
let core_thread = CoreThread {
core,
receiver,
rx_subscriber_exists,
rx_propagation_delay,
rx_propagation_delay_and_quorum_rounds,
rx_last_known_proposed_round,
context: context.clone(),
};
Expand All @@ -221,7 +228,9 @@ impl ChannelCoreThreadDispatcher {
context,
sender: sender.downgrade(),
tx_subscriber_exists: Arc::new(tx_subscriber_exists),
tx_propagation_delay: Arc::new(tx_propagation_delay),
tx_propagation_delay_and_quorum_rounds: Arc::new(
tx_propagation_delay_and_quorum_rounds,
),
tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
highest_received_rounds: Arc::new(highest_received_rounds),
};
Expand Down Expand Up @@ -279,9 +288,13 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
.map_err(|e| Shutdown(e.to_string()))
}

fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError> {
self.tx_propagation_delay
.send(delay)
fn set_propagation_delay_and_quorum_rounds(
&self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
self.tx_propagation_delay_and_quorum_rounds
.send((delay, quorum_rounds))
.map_err(|e| Shutdown(e.to_string()))
}

Expand Down Expand Up @@ -356,7 +369,11 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher {
todo!()
}

fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}

Expand Down
7 changes: 6 additions & 1 deletion consensus/core/src/leader_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
use crate::core::CoreSignals;
use crate::core_thread::{CoreError, CoreThreadDispatcher};
use crate::leader_timeout::LeaderTimeoutTask;
use crate::round_prober::QuorumRound;

#[derive(Clone, Default)]
struct MockCoreThreadDispatcher {
Expand Down Expand Up @@ -171,7 +172,11 @@ mod tests {
todo!()
}

fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}

Expand Down
7 changes: 7 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ pub(crate) struct NodeMetrics {
pub(crate) commit_sync_fetch_once_latency: Histogram,
pub(crate) commit_sync_fetch_once_errors: IntCounterVec,
pub(crate) round_prober_quorum_round_gaps: IntGaugeVec,
pub(crate) round_prober_low_quorum_round: IntGaugeVec,
pub(crate) round_prober_current_round_gaps: IntGaugeVec,
pub(crate) round_prober_propagation_delays: Histogram,
pub(crate) round_prober_last_propagation_delay: IntGauge,
Expand Down Expand Up @@ -611,6 +612,12 @@ impl NodeMetrics {
&["authority"],
registry
).unwrap(),
round_prober_low_quorum_round: register_int_gauge_vec_with_registry!(
"round_prober_low_quorum_round",
"Low quorum round among peers for blocks proposed from each authority",
&["authority"],
registry
).unwrap(),
round_prober_current_round_gaps: register_int_gauge_vec_with_registry!(
"round_prober_current_round_gaps",
"Round gaps from local last proposed round to the low quorum round of each peer. Can be negative.",
Expand Down
33 changes: 31 additions & 2 deletions consensus/core/src/round_prober.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ impl<C: NetworkClient> RoundProber<C> {
.round_prober_quorum_round_gaps
.with_label_values(&[&authority.hostname])
.set((high - low) as i64);
node_metrics
.round_prober_low_quorum_round
.with_label_values(&[&authority.hostname])
.set(*low as i64);
// The gap can be negative if this validator is lagging behind the network.
node_metrics
.round_prober_current_round_gaps
Expand All @@ -231,7 +235,7 @@ impl<C: NetworkClient> RoundProber<C> {
.set(propagation_delay as i64);
if let Err(e) = self
.core_thread_dispatcher
.set_propagation_delay(propagation_delay)
.set_propagation_delay_and_quorum_rounds(propagation_delay, quorum_rounds.clone())
{
tracing::warn!(
"Failed to set propagation delay {propagation_delay} on Core: {:?}",
Expand Down Expand Up @@ -293,6 +297,7 @@ mod test {
use consensus_config::AuthorityIndex;
use parking_lot::{Mutex, RwLock};

use super::QuorumRound;
use crate::{
block::BlockRef,
commit::CommitRange,
Expand All @@ -309,19 +314,25 @@ mod test {
struct FakeThreadDispatcher {
highest_received_rounds: Vec<Round>,
propagation_delay: Mutex<Round>,
quorum_rounds: Mutex<Vec<QuorumRound>>,
}

impl FakeThreadDispatcher {
fn new(highest_received_rounds: Vec<Round>) -> Self {
Self {
highest_received_rounds,
propagation_delay: Mutex::new(0),
quorum_rounds: Mutex::new(Vec::new()),
}
}

fn propagation_delay(&self) -> Round {
*self.propagation_delay.lock()
}

fn quorum_rounds(&self) -> Vec<QuorumRound> {
self.quorum_rounds.lock().clone()
}
}

#[async_trait]
Expand All @@ -345,7 +356,13 @@ mod test {
unimplemented!()
}

fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError> {
fn set_propagation_delay_and_quorum_rounds(
&self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
let mut quorum_round_per_authority = self.quorum_rounds.lock();
*quorum_round_per_authority = quorum_rounds;
let mut propagation_delay = self.propagation_delay.lock();
*propagation_delay = delay;
Ok(())
Expand Down Expand Up @@ -492,6 +509,18 @@ mod test {
]
);

assert_eq!(
core_thread_dispatcher.quorum_rounds(),
vec![
(100, 105),
(0, 115),
(103, 130),
(0, 0),
(105, 150),
(106, 160),
(107, 170)
]
);
// 110 - 100 = 10
assert_eq!(propagation_delay, 10);
assert_eq!(core_thread_dispatcher.propagation_delay(), 10);
Expand Down
Loading