From 22baf3d412ca51df441a4737db7bbbb6c93004b1 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 6 Nov 2024 13:29:40 +0000 Subject: [PATCH 1/6] . --- rs/p2p/consensus_manager/src/lib.rs | 6 +- rs/p2p/consensus_manager/src/sender.rs | 159 +++++++++++++------------ 2 files changed, 84 insertions(+), 81 deletions(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index ea5189dba8a..645674db154 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -116,7 +116,7 @@ fn start_consensus_manager( metrics_registry: &MetricsRegistry, rt_handle: Handle, // Locally produced adverts to send to the node's peers. - adverts_to_send: Receiver>, + outbound_transmits: Receiver>, // Adverts received from peers adverts_received: Receiver<(SlotUpdate, NodeId, ConnId)>, sender: UnboundedSender>, @@ -137,7 +137,7 @@ where metrics.clone(), rt_handle.clone(), transport.clone(), - adverts_to_send, + outbound_transmits, assembler.clone(), ); @@ -165,7 +165,7 @@ pub(crate) enum Update { Id(Artifact::Id), } -pub fn uri_prefix() -> String { +pub(crate) fn uri_prefix() -> String { Artifact::NAME.to_lowercase() } diff --git a/rs/p2p/consensus_manager/src/sender.rs b/rs/p2p/consensus_manager/src/sender.rs index 61010287922..f76ff0b1d49 100644 --- a/rs/p2p/consensus_manager/src/sender.rs +++ b/rs/p2p/consensus_manager/src/sender.rs @@ -1,5 +1,3 @@ -#![allow(clippy::disallowed_methods)] - use std::{ collections::{hash_map::Entry, HashMap}, marker::PhantomData, @@ -34,12 +32,10 @@ use self::available_slot_set::{AvailableSlot, AvailableSlotSet}; /// The size threshold for an artifact to be pushed. Artifacts smaller than this constant /// in size are pushed. -pub(crate) const ARTIFACT_PUSH_THRESHOLD_BYTES: usize = 1024; // 1KB - +const ARTIFACT_PUSH_THRESHOLD_BYTES: usize = 1024; // 1KB const MIN_BACKOFF_INTERVAL: Duration = Duration::from_millis(250); const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60); const BACKOFF_MULTIPLIER: f64 = 2.0; - // Used to log warnings if the slot table grows beyond the threshold. const SLOT_TABLE_THRESHOLD: u64 = 30_000; @@ -57,12 +53,12 @@ fn panic_on_join_err(result: Result) -> T { } } -pub(crate) struct ConsensusManagerSender { +pub struct ConsensusManagerSender { log: ReplicaLogger, metrics: ConsensusManagerMetrics, rt_handle: Handle, transport: Arc, - adverts_to_send: Receiver>, + outbound_transmits: Receiver>, slot_manager: AvailableSlotSet, current_commit_id: CommitId, active_adverts: HashMap, @@ -77,12 +73,12 @@ impl< Assembler: ArtifactAssembler, > ConsensusManagerSender { - pub(crate) fn run( + pub fn run( log: ReplicaLogger, metrics: ConsensusManagerMetrics, rt_handle: Handle, transport: Arc, - adverts_to_send: Receiver>, + outbound_transmits: Receiver>, assembler: Assembler, ) -> Shutdown { let slot_manager = AvailableSlotSet::new(log.clone(), metrics.clone(), WireArtifact::NAME); @@ -92,7 +88,7 @@ impl< metrics, rt_handle: rt_handle.clone(), transport, - adverts_to_send, + outbound_transmits, slot_manager, current_commit_id: CommitId::from(0), active_adverts: HashMap::new(), @@ -118,10 +114,10 @@ impl< ); break; } - Some(advert) = self.adverts_to_send.recv() => { + Some(advert) = self.outbound_transmits.recv() => { match advert { - ArtifactTransmit::Deliver(new_artifact) => self.handle_send_advert(new_artifact, cancellation_token.clone()), - ArtifactTransmit::Abort(id) => self.handle_purge_advert(&id), + ArtifactTransmit::Deliver(new_artifact) => self.handle_deliver_transmit(new_artifact, cancellation_token.clone()), + ArtifactTransmit::Abort(id) => self.handle_abort_transmit(&id), } self.current_commit_id.inc_assign(); @@ -160,7 +156,7 @@ impl< } } - fn handle_purge_advert(&mut self, id: &Artifact::Id) { + fn handle_abort_transmit(&mut self, id: &Artifact::Id) { if let Some((cancellation_token, free_slot)) = self.active_adverts.remove(id) { self.metrics.send_view_consensus_purge_active_total.inc(); cancellation_token.cancel(); @@ -171,7 +167,7 @@ impl< } #[instrument(skip_all)] - fn handle_send_advert( + fn handle_deliver_transmit( &mut self, new_artifact: ArtifactWithOpt, cancellation_token: CancellationToken, @@ -188,10 +184,7 @@ impl< let child_token = cancellation_token.child_token(); let child_token_clone = child_token.clone(); - let send_future = Self::send_advert_to_all_peers( - self.rt_handle.clone(), - self.metrics.clone(), - self.transport.clone(), + let payload = Self::get_transmit_payload( self.current_commit_id, used_slot.slot_number(), ArtifactWithOpt { @@ -199,6 +192,12 @@ impl< is_latency_sensitive: new_artifact.is_latency_sensitive, }, wire_artifact_id, + ); + let send_future = Self::send_transmit_to_all_peers( + self.rt_handle.clone(), + self.metrics.clone(), + self.transport.clone(), + payload, child_token_clone, ); @@ -209,12 +208,7 @@ impl< } } - /// Sends an advert to all peers. - #[instrument(skip_all)] - async fn send_advert_to_all_peers( - rt_handle: Handle, - metrics: ConsensusManagerMetrics, - transport: Arc, + fn get_transmit_payload( commit_id: CommitId, slot_number: SlotNumber, ArtifactWithOpt { @@ -222,8 +216,7 @@ impl< is_latency_sensitive, }: ArtifactWithOpt, id: WireArtifact::Id, - cancellation_token: CancellationToken, - ) { + ) -> Bytes { let pb_slot_update = pb::SlotUpdate { commit_id: commit_id.get(), slot_id: slot_number.get(), @@ -238,63 +231,70 @@ impl< } }), }; - - let body = Bytes::from(pb_slot_update.encode_to_vec()); - - let mut in_progress_transmissions = JoinSet::new(); - // Stores the connection ID and the [`CancellationToken`] of the last successful transmission task to a peer. - let mut initiated_transmissions: HashMap = - HashMap::new(); - let mut periodic_check_interval = time::interval(Duration::from_secs(5)); - loop { - select! { - _ = periodic_check_interval.tick() => { - // check for new peers/connection IDs - // spawn task for peers with higher conn id or not in completed transmissions. - // add task to join map - for (peer, connection_id) in transport.peers() { - let is_initiated = initiated_transmissions.get(&peer).is_some_and(|(id, token)| { - if *id == connection_id { - true - } else { - token.cancel(); - metrics.send_view_resend_reconnect_total.inc(); - false - } - }); + Bytes::from(pb_slot_update.encode_to_vec()) + } +} +/// Sends an advert to all peers. +#[instrument(skip_all)] +async fn send_transmit_to_all_peers( + rt_handle: Handle, + metrics: ConsensusManagerMetrics, + transport: Arc, + body: Bytes, + cancellation_token: CancellationToken, +) { + let mut in_progress_transmissions = JoinSet::new(); + // Stores the connection ID and the [`CancellationToken`] of the last successful transmission task to a peer. + let mut initiated_transmissions: HashMap = HashMap::new(); + let mut periodic_check_interval = time::interval(Duration::from_secs(5)); + loop { + select! { + _ = periodic_check_interval.tick() => { + // check for new peers/connection IDs + // spawn task for peers with higher conn id or not in completed transmissions. + // add task to join map + for (peer, connection_id) in transport.peers() { + let is_initiated = initiated_transmissions.get(&peer).is_some_and(|(id, token)| { + if *id == connection_id { + true + } else { + token.cancel(); + metrics.send_view_resend_reconnect_total.inc(); + false + } + }); - if !is_initiated { - let child_token = cancellation_token.child_token(); - let child_token_clone = child_token.clone(); - metrics.send_view_send_to_peer_total.inc(); + if !is_initiated { + let child_token = cancellation_token.child_token(); + let child_token_clone = child_token.clone(); + metrics.send_view_send_to_peer_total.inc(); - let transport = transport.clone(); - let body = body.clone(); + let transport = transport.clone(); + let body = body.clone(); - let send_future = async move { - select! { - _ = send_advert_to_peer(transport, body, peer, uri_prefix::()) => {}, - _ = child_token.cancelled() => {}, - } - }; + let send_future = async move { + select! { + _ = send_transmit_to_peer(transport, body, peer, uri_prefix::()) => {}, + _ = child_token.cancelled() => {}, + } + }; - in_progress_transmissions.spawn_on(send_future, &rt_handle); - initiated_transmissions.insert(peer, (connection_id, child_token_clone)); - } + in_progress_transmissions.spawn_on(send_future, &rt_handle); + initiated_transmissions.insert(peer, (connection_id, child_token_clone)); } } - Some(result) = in_progress_transmissions.join_next() => { + } + Some(result) = in_progress_transmissions.join_next() => { + panic_on_join_err(result); + metrics.send_view_send_to_peer_delivered_total.inc(); + } + _ = cancellation_token.cancelled() => { + while let Some(result) = in_progress_transmissions.join_next().await { + metrics.send_view_send_to_peer_cancelled_total.inc(); panic_on_join_err(result); - metrics.send_view_send_to_peer_delivered_total.inc(); - } - _ = cancellation_token.cancelled() => { - while let Some(result) = in_progress_transmissions.join_next().await { - metrics.send_view_send_to_peer_cancelled_total.inc(); - panic_on_join_err(result); - } - break; } + break; } } } @@ -303,7 +303,7 @@ impl< /// Sends a serialized advert or artifact message to a peer. /// If the peer is not reachable, it will retry with an exponential backoff. #[instrument(skip(transport, message))] -async fn send_advert_to_peer( +async fn send_transmit_to_peer( transport: Arc, message: Bytes, peer: NodeId, @@ -333,7 +333,9 @@ async fn send_advert_to_peer( } mod available_slot_set { - use super::*; + use super::SLOT_TABLE_THRESHOLD; + use crate::{ConsensusManagerMetrics, SlotNumber}; + use ic_logger::{warn, ReplicaLogger}; pub struct AvailableSlot(u64); @@ -398,6 +400,7 @@ mod available_slot_set { } } +#[allow(clippy::disallowed_methods)] #[cfg(test)] mod tests { use anyhow::anyhow; @@ -433,7 +436,7 @@ mod tests { /// Verify that advert is sent to multiple peers. #[tokio::test] - async fn send_advert_to_all_peers() { + async fn send_transmit_to_all_peers() { with_test_replica_logger(|log| async { let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); let (tx, rx) = tokio::sync::mpsc::channel(100); From 54303f24afdc6d00ecb42bd28366b8efadf989a9 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 6 Nov 2024 13:33:36 +0000 Subject: [PATCH 2/6] . --- rs/p2p/consensus_manager/src/sender.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/rs/p2p/consensus_manager/src/sender.rs b/rs/p2p/consensus_manager/src/sender.rs index f76ff0b1d49..5799e273a2f 100644 --- a/rs/p2p/consensus_manager/src/sender.rs +++ b/rs/p2p/consensus_manager/src/sender.rs @@ -234,7 +234,6 @@ impl< Bytes::from(pb_slot_update.encode_to_vec()) } } -/// Sends an advert to all peers. #[instrument(skip_all)] async fn send_transmit_to_all_peers( rt_handle: Handle, @@ -300,8 +299,6 @@ async fn send_transmit_to_all_peers( } } -/// Sends a serialized advert or artifact message to a peer. -/// If the peer is not reachable, it will retry with an exponential backoff. #[instrument(skip(transport, message))] async fn send_transmit_to_peer( transport: Arc, From 934bcb4e1868e54812276df25d4c32289049327e Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 6 Nov 2024 13:53:37 +0000 Subject: [PATCH 3/6] . --- rs/p2p/consensus_manager/src/sender.rs | 29 +++++++++++++++----------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/rs/p2p/consensus_manager/src/sender.rs b/rs/p2p/consensus_manager/src/sender.rs index 5799e273a2f..e668c8ae430 100644 --- a/rs/p2p/consensus_manager/src/sender.rs +++ b/rs/p2p/consensus_manager/src/sender.rs @@ -62,7 +62,7 @@ pub struct ConsensusManagerSender, - join_set: JoinSet<()>, + active_transmit_tasks: JoinSet<()>, assembler: Assembler, marker: PhantomData, } @@ -92,7 +92,7 @@ impl< slot_manager, current_commit_id: CommitId::from(0), active_adverts: HashMap::new(), - join_set: JoinSet::new(), + active_transmit_tasks: JoinSet::new(), assembler, marker: PhantomData, }; @@ -123,14 +123,14 @@ impl< self.current_commit_id.inc_assign(); } - Some(result) = self.join_set.join_next() => { + Some(result) = self.active_transmit_tasks.join_next() => { panic_on_join_err(result); } } #[cfg(debug_assertions)] { - if self.join_set.len() < self.active_adverts.len() { + if self.active_transmit_tasks.len() < self.active_adverts.len() { // This invariant can be violated if the root cancellation token is cancelled. // It can be violated because the active_adverts HashMap is only cleared // when purging artifacts, and not when the tasks join due to a cancellation @@ -142,8 +142,8 @@ impl< if is_not_cancelled { panic!( - "Invariant violated: join_set.len() {:?} >= active_adverts.len() {:?}.", - self.join_set.len(), + "Invariant violated: active_transmit_tasks.len() {:?} >= active_adverts.len() {:?}.", + self.active_transmit_tasks.len(), self.active_adverts.len() ); } @@ -151,7 +151,7 @@ impl< } } - while let Some(result) = self.join_set.join_next().await { + while let Some(result) = self.active_transmit_tasks.join_next().await { panic_on_join_err(result); } } @@ -193,15 +193,18 @@ impl< }, wire_artifact_id, ); - let send_future = Self::send_transmit_to_all_peers( + let route = uri_prefix::(); + let send_future = send_transmit_to_all_peers( self.rt_handle.clone(), self.metrics.clone(), self.transport.clone(), payload, + route, child_token_clone, ); - self.join_set.spawn_on(send_future, &self.rt_handle); + self.active_transmit_tasks + .spawn_on(send_future, &self.rt_handle); entry.insert((child_token, used_slot)); } else { self.metrics.send_view_consensus_dup_adverts_total.inc(); @@ -240,6 +243,7 @@ async fn send_transmit_to_all_peers( metrics: ConsensusManagerMetrics, transport: Arc, body: Bytes, + route: String, cancellation_token: CancellationToken, ) { let mut in_progress_transmissions = JoinSet::new(); @@ -271,10 +275,11 @@ async fn send_transmit_to_all_peers( let transport = transport.clone(); let body = body.clone(); + let route = route.clone(); let send_future = async move { select! { - _ = send_transmit_to_peer(transport, body, peer, uri_prefix::()) => {}, + _ = send_transmit_to_peer(transport, body, peer, route) => {}, _ = child_token.cancelled() => {}, } }; @@ -304,7 +309,7 @@ async fn send_transmit_to_peer( transport: Arc, message: Bytes, peer: NodeId, - uri_prefix: String, + route: String, ) { let mut backoff = ExponentialBackoffBuilder::new() .with_initial_interval(MIN_BACKOFF_INTERVAL) @@ -315,7 +320,7 @@ async fn send_transmit_to_peer( loop { let request = Request::builder() - .uri(format!("/{}/update", uri_prefix)) + .uri(format!("/{}/update", route)) .body(message.clone()) .expect("Building from typed values"); From 9b93a4e445cc9b8939bbe02b7a58b00c5ff0ff51 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 6 Nov 2024 13:57:07 +0000 Subject: [PATCH 4/6] . --- rs/p2p/consensus_manager/src/sender.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rs/p2p/consensus_manager/src/sender.rs b/rs/p2p/consensus_manager/src/sender.rs index e668c8ae430..bf6513fb811 100644 --- a/rs/p2p/consensus_manager/src/sender.rs +++ b/rs/p2p/consensus_manager/src/sender.rs @@ -61,7 +61,7 @@ pub struct ConsensusManagerSender>, slot_manager: AvailableSlotSet, current_commit_id: CommitId, - active_adverts: HashMap, + active_slots: HashMap, active_transmit_tasks: JoinSet<()>, assembler: Assembler, marker: PhantomData, @@ -91,7 +91,7 @@ impl< outbound_transmits, slot_manager, current_commit_id: CommitId::from(0), - active_adverts: HashMap::new(), + active_slots: HashMap::new(), active_transmit_tasks: JoinSet::new(), assembler, marker: PhantomData, @@ -130,9 +130,9 @@ impl< #[cfg(debug_assertions)] { - if self.active_transmit_tasks.len() < self.active_adverts.len() { + if self.active_transmit_tasks.len() < self.active_slots.len() { // This invariant can be violated if the root cancellation token is cancelled. - // It can be violated because the active_adverts HashMap is only cleared + // It can be violated because the active_slots HashMap is only cleared // when purging artifacts, and not when the tasks join due to a cancellation // not triggered by the manager. let is_not_cancelled = @@ -142,9 +142,9 @@ impl< if is_not_cancelled { panic!( - "Invariant violated: active_transmit_tasks.len() {:?} >= active_adverts.len() {:?}.", + "Invariant violated: active_transmit_tasks.len() {:?} >= active_slots.len() {:?}.", self.active_transmit_tasks.len(), - self.active_adverts.len() + self.active_slots.len() ); } } @@ -157,7 +157,7 @@ impl< } fn handle_abort_transmit(&mut self, id: &Artifact::Id) { - if let Some((cancellation_token, free_slot)) = self.active_adverts.remove(id) { + if let Some((cancellation_token, free_slot)) = self.active_slots.remove(id) { self.metrics.send_view_consensus_purge_active_total.inc(); cancellation_token.cancel(); self.slot_manager.push(free_slot); @@ -175,7 +175,7 @@ impl< let id = new_artifact.artifact.id(); let wire_artifact = self.assembler.disassemble_message(new_artifact.artifact); let wire_artifact_id = wire_artifact.id(); - let entry = self.active_adverts.entry(id.clone()); + let entry = self.active_slots.entry(id.clone()); if let Entry::Vacant(entry) = entry { self.metrics.send_view_consensus_new_adverts_total.inc(); From 6503cb771001e4a7a77a7248f9e0730120b5a321 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 6 Nov 2024 14:10:35 +0000 Subject: [PATCH 5/6] . --- rs/p2p/consensus_manager/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 645674db154..bb2ee7d8bcf 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -154,23 +154,23 @@ where vec![shutdown_send_side, shutdown_receive_side] } -pub(crate) struct SlotUpdate { +struct SlotUpdate { slot_number: SlotNumber, commit_id: CommitId, update: Update, } -pub(crate) enum Update { +enum Update { Artifact(Artifact), Id(Artifact::Id), } -pub(crate) fn uri_prefix() -> String { +fn uri_prefix() -> String { Artifact::NAME.to_lowercase() } struct SlotNumberTag; -pub(crate) type SlotNumber = AmountOf; +type SlotNumber = AmountOf; struct CommitIdTag; -pub(crate) type CommitId = AmountOf; +type CommitId = AmountOf; From e0e062f34c4beb68b283ecab390ac78f158ab640 Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Wed, 6 Nov 2024 14:30:57 +0000 Subject: [PATCH 6/6] . --- rs/p2p/consensus_manager/src/sender.rs | 36 ++++++++++++-------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/rs/p2p/consensus_manager/src/sender.rs b/rs/p2p/consensus_manager/src/sender.rs index bf6513fb811..69be847fbd5 100644 --- a/rs/p2p/consensus_manager/src/sender.rs +++ b/rs/p2p/consensus_manager/src/sender.rs @@ -270,22 +270,10 @@ async fn send_transmit_to_all_peers( if !is_initiated { let child_token = cancellation_token.child_token(); - let child_token_clone = child_token.clone(); metrics.send_view_send_to_peer_total.inc(); - - let transport = transport.clone(); - let body = body.clone(); - let route = route.clone(); - - let send_future = async move { - select! { - _ = send_transmit_to_peer(transport, body, peer, route) => {}, - _ = child_token.cancelled() => {}, - } - }; - - in_progress_transmissions.spawn_on(send_future, &rt_handle); - initiated_transmissions.insert(peer, (connection_id, child_token_clone)); + let send_fut = send_transmit_to_peer(transport.clone(), body.clone(), peer, route.clone(), child_token.clone()); + in_progress_transmissions.spawn_on(send_fut, &rt_handle); + initiated_transmissions.insert(peer, (connection_id, child_token)); } } } @@ -310,6 +298,7 @@ async fn send_transmit_to_peer( message: Bytes, peer: NodeId, route: String, + cancellation_token: CancellationToken, ) { let mut backoff = ExponentialBackoffBuilder::new() .with_initial_interval(MIN_BACKOFF_INTERVAL) @@ -324,13 +313,22 @@ async fn send_transmit_to_peer( .body(message.clone()) .expect("Building from typed values"); - // TODO: NET-1748 - if transport.rpc(&peer, request).await.is_ok() { - return; + select! { + // TODO: NET-1748 + rpc_result = transport.rpc(&peer, request) => { + if rpc_result.is_ok() { + return; + } + }, + _ = cancellation_token.cancelled() => return, } let backoff_duration = backoff.next_backoff().unwrap_or(MAX_BACKOFF_INTERVAL); - time::sleep(backoff_duration).await; + let timeout = time::sleep(backoff_duration); + select! { + _ = timeout => (), + _ = cancellation_token.cancelled() => return, + } } }