diff --git a/.github/workflows/docker_build_push.yml b/.github/workflows/docker_build_push.yml index 1d5bd7d40..b0d5063c9 100644 --- a/.github/workflows/docker_build_push.yml +++ b/.github/workflows/docker_build_push.yml @@ -15,12 +15,6 @@ jobs: uses: ./.github/workflows/docker_utils.yml secrets: inherit - broadcast_channels: - uses: ./.github/workflows/docker_utils.yml - secrets: inherit - with: - features: broadcast_via_channels - network: uses: ./.github/workflows/docker_utils.yml secrets: inherit diff --git a/crates/topos-tce-broadcast/Cargo.toml b/crates/topos-tce-broadcast/Cargo.toml index 539a5f8a6..37fc652a6 100644 --- a/crates/topos-tce-broadcast/Cargo.toml +++ b/crates/topos-tce-broadcast/Cargo.toml @@ -31,10 +31,7 @@ rand.workspace = true topos-test-sdk = { path = "../topos-test-sdk/" } -[features] -task-manager-channels = [] - [[bench]] name = "double_echo" -path = "benches/double_echo.rs" -harness = false +path = "benches/benchmark.rs" +harness = false \ No newline at end of file diff --git a/crates/topos-tce-broadcast/benches/benchmark.rs b/crates/topos-tce-broadcast/benches/benchmark.rs new file mode 100644 index 000000000..5988e4026 --- /dev/null +++ b/crates/topos-tce-broadcast/benches/benchmark.rs @@ -0,0 +1,20 @@ +use criterion::async_executor::FuturesExecutor; +use criterion::{criterion_group, criterion_main, Criterion}; +mod double_echo; + +pub fn criterion_benchmark(c: &mut Criterion) { + let certificates = 10_000; + + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + c.bench_function("double_echo", |b| { + b.to_async(FuturesExecutor).iter(|| async { + runtime.block_on(async { double_echo::processing_double_echo(certificates).await }) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/crates/topos-tce-broadcast/benches/double_echo.rs b/crates/topos-tce-broadcast/benches/double_echo.rs index 0be6ccc97..8cb743fcd 100644 --- a/crates/topos-tce-broadcast/benches/double_echo.rs +++ b/crates/topos-tce-broadcast/benches/double_echo.rs @@ -1,20 +1,125 @@ -use criterion::async_executor::FuturesExecutor; -use criterion::{criterion_group, criterion_main, Criterion}; -mod task_manager; - -pub fn criterion_benchmark(c: &mut Criterion) { - let certificates = 10_000; - - let runtime = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); - - c.bench_function("double_echo", |b| { - b.to_async(FuturesExecutor).iter(|| async { - runtime.block_on(async { task_manager::processing_double_echo(certificates).await }) - }) - }); +use std::collections::HashSet; +use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, oneshot}; +use topos_tce_broadcast::double_echo::DoubleEcho; +use topos_tce_broadcast::sampler::SubscriptionsView; +use topos_tce_broadcast::DoubleEchoCommand; +use topos_test_sdk::certificates::create_certificate_chain; +use topos_test_sdk::constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1}; + +const CHANNEL_SIZE: usize = 256_000; + +#[derive(Clone)] +struct TceParams { + nb_peers: usize, + broadcast_params: ReliableBroadcastParams, +} + +struct Context { + event_receiver: Receiver, } -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches); +pub async fn processing_double_echo(n: u64) { + let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE); + + let (cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE); + let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE); + let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) = + mpsc::channel::>(1); + + let params = TceParams { + nb_peers: 10, + broadcast_params: ReliableBroadcastParams { + echo_threshold: 8, + ready_threshold: 5, + delivery_threshold: 8, + }, + }; + + let mut ctx = Context { event_receiver }; + + let double_echo = DoubleEcho::new( + params.clone().broadcast_params, + cmd_receiver, + event_sender, + double_echo_shutdown_receiver, + ); + + // List of peers + let mut peers = HashSet::new(); + for i in 0..params.nb_peers { + let peer = topos_p2p::utils::local_key_pair(Some(i as u8)) + .public() + .to_peer_id(); + peers.insert(peer); + } + + let msg = SubscriptionsView { + echo: peers.clone(), + ready: peers.clone(), + network_size: params.nb_peers, + }; + + tokio::spawn(double_echo.run(subscriptions_view_receiver)); + + subscriptions_view_sender.send(msg.clone()).await.unwrap(); + + let certificates = + create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], n as usize); + + let double_echo_selected_echo = msg + .echo + .iter() + .take(params.broadcast_params.echo_threshold) + .cloned() + .collect::>(); + + let double_echo_selected_ready = msg + .ready + .iter() + .take(params.broadcast_params.delivery_threshold) + .cloned() + .collect::>(); + + for cert in &certificates { + let _ = cmd_sender + .send(DoubleEchoCommand::Broadcast { + cert: cert.clone(), + need_gossip: true, + }) + .await; + } + + for cert in &certificates { + for p in &double_echo_selected_echo { + let _ = cmd_sender + .send(DoubleEchoCommand::Echo { + from_peer: *p, + certificate_id: cert.id, + }) + .await; + } + + for p in &double_echo_selected_ready { + let _ = cmd_sender + .send(DoubleEchoCommand::Ready { + from_peer: *p, + certificate_id: cert.id, + }) + .await; + } + } + + let mut count = 0; + + while let Some(event) = ctx.event_receiver.recv().await { + if let ProtocolEvents::CertificateDelivered { .. } = event { + count += 1; + + if count == n { + break; + } + } + } +} diff --git a/crates/topos-tce-broadcast/benches/task_manager.rs b/crates/topos-tce-broadcast/benches/task_manager.rs deleted file mode 100644 index 1945b0601..000000000 --- a/crates/topos-tce-broadcast/benches/task_manager.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::collections::HashSet; -use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; -use tokio::sync::mpsc::Receiver; -use tokio::sync::{mpsc, oneshot}; -use topos_tce_broadcast::double_echo::DoubleEcho; -use topos_tce_broadcast::sampler::SubscriptionsView; -use topos_test_sdk::certificates::create_certificate_chain; -use topos_test_sdk::constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1}; - -const CHANNEL_SIZE: usize = 256_000; - -struct TceParams { - nb_peers: usize, - broadcast_params: ReliableBroadcastParams, -} - -struct Context { - event_receiver: Receiver, -} - -pub async fn processing_double_echo(n: u64) { - let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE); - - let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE); - let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE); - let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) = - mpsc::channel::>(1); - let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE); - - let params = TceParams { - nb_peers: 10, - broadcast_params: ReliableBroadcastParams { - echo_threshold: 8, - ready_threshold: 5, - delivery_threshold: 8, - }, - }; - - let mut ctx = Context { event_receiver }; - - let mut double_echo = DoubleEcho::new( - params.broadcast_params, - task_manager_message_sender.clone(), - cmd_receiver, - event_sender, - double_echo_shutdown_receiver, - 0, - ); - - // List of peers - let mut peers = HashSet::new(); - for i in 0..params.nb_peers { - let peer = topos_p2p::utils::local_key_pair(Some(i as u8)) - .public() - .to_peer_id(); - peers.insert(peer); - } - - // Subscriptions - double_echo.subscriptions.echo = peers.clone(); - double_echo.subscriptions.ready = peers.clone(); - double_echo.subscriptions.network_size = params.nb_peers; - - let msg = SubscriptionsView { - echo: peers.clone(), - ready: peers.clone(), - network_size: params.nb_peers, - }; - - subscriptions_view_sender.send(msg).await.unwrap(); - - double_echo.spawn_task_manager(subscriptions_view_receiver, task_manager_message_receiver); - - let certificates = - create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], n as usize); - - let double_echo_selected_echo = double_echo - .subscriptions - .echo - .iter() - .take(double_echo.params.echo_threshold) - .cloned() - .collect::>(); - - let double_echo_selected_ready = double_echo - .subscriptions - .ready - .iter() - .take(double_echo.params.delivery_threshold) - .cloned() - .collect::>(); - - for cert in &certificates { - double_echo.broadcast(cert.clone(), true).await; - } - - for cert in &certificates { - for p in &double_echo_selected_echo { - double_echo.handle_echo(*p, cert.id).await; - } - - for p in &double_echo_selected_ready { - double_echo.handle_ready(*p, cert.id).await; - } - } - - let mut count = 0; - - while let Some(event) = ctx.event_receiver.recv().await { - if let ProtocolEvents::CertificateDelivered { .. } = event { - count += 1; - - if count == n { - break; - } - } - } -} diff --git a/crates/topos-tce-broadcast/src/constant.rs b/crates/topos-tce-broadcast/src/constant.rs index 01dbfc5d6..2a999783d 100644 --- a/crates/topos-tce-broadcast/src/constant.rs +++ b/crates/topos-tce-broadcast/src/constant.rs @@ -7,12 +7,6 @@ lazy_static! { .ok() .and_then(|s| s.parse().ok()) .unwrap_or(2048); - /// Size of the channel between double echo and the task manager - pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize = - std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(20_480); /// Size of the channel to send protocol events from the double echo pub static ref PROTOCOL_CHANNEL_SIZE: usize = std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE") diff --git a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs index d729a885e..3278e6ba4 100644 --- a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs +++ b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs @@ -115,7 +115,6 @@ impl BroadcastState { // we update the status to Delivered and change the status if !self.status.is_delivered() && self.reached_delivery_threshold() { self.status = self.status.delivered(); - debug!( "📝 Certificate {} is now {}", &self.certificate.id, self.status @@ -136,7 +135,6 @@ impl BroadcastState { ); DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL.inc(); - _ = self .event_sender .try_send(ProtocolEvents::CertificateDelivered { diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 61df3c6ec..243a0e62d 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -1,30 +1,48 @@ +use crate::double_echo::broadcast_state::BroadcastState; +use crate::double_echo::task::Task; use crate::TaskStatus; use crate::{DoubleEchoCommand, SubscriptionsView}; -use std::collections::HashSet; +use futures::{stream::FuturesUnordered, StreamExt}; +use std::future::IntoFuture; +use std::{ + collections::{HashMap, HashSet}, + future::Future, + pin::Pin, +}; +use task::TaskContext; use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; +use tokio::spawn; use tokio::sync::{mpsc, oneshot}; use topos_core::uci::{Certificate, CertificateId}; - +use topos_metrics::DOUBLE_ECHO_ACTIVE_TASKS_COUNT; use topos_p2p::PeerId; use tracing::{error, info, warn}; pub mod broadcast_state; +pub mod task; pub struct DoubleEcho { - /// Channel to receive commands + /// If Echo | Ready messages arrive before the related certificate is broadcasted, they are buffered here + pub buffered_messages: HashMap>, + /// Channel to receive DoubleEchoCommands from the TCE process command_receiver: mpsc::Receiver, - /// Channel to send events - event_sender: mpsc::Sender, - /// Channel to receive shutdown signal - pub(crate) shutdown: mpsc::Receiver>, /// Delivered certificate ids to avoid processing twice the same certificate delivered_certificates: HashSet, + /// Channel to send back ProtocolEvents to the TCE process + event_sender: mpsc::Sender, /// The threshold parameters for the double echo pub params: ReliableBroadcastParams, - /// The connection to the TaskManager to forward DoubleEchoCommand messages - task_manager_message_sender: mpsc::Sender, + /// The running tasks, which end themselves when they are done + #[allow(clippy::type_complexity)] + pub running_tasks: FuturesUnordered< + Pin + Send + 'static>>, + >, + /// Channel to receive shutdown signal + pub(crate) shutdown: mpsc::Receiver>, /// The overview of the network, which holds echo and ready subscriptions and the network size pub subscriptions: SubscriptionsView, + /// All open tasks, which currently handle processing a certificate each + pub tasks: HashMap, } impl DoubleEcho { @@ -33,65 +51,23 @@ impl DoubleEcho { #[allow(clippy::too_many_arguments)] pub fn new( params: ReliableBroadcastParams, - task_manager_message_sender: mpsc::Sender, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, shutdown: mpsc::Receiver>, - _pending_certificate_count: u64, ) -> Self { Self { - params, - task_manager_message_sender, + buffered_messages: Default::default(), command_receiver, + delivered_certificates: Default::default(), event_sender, - subscriptions: SubscriptionsView::default(), + params, + running_tasks: FuturesUnordered::new(), shutdown, - delivered_certificates: Default::default(), + subscriptions: SubscriptionsView::default(), + tasks: HashMap::new(), } } - #[cfg(not(feature = "task-manager-channels"))] - pub fn spawn_task_manager( - &mut self, - subscriptions_view_receiver: mpsc::Receiver, - task_manager_message_receiver: mpsc::Receiver, - ) -> mpsc::Receiver<(CertificateId, TaskStatus)> { - let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048); - - let (task_manager, shutdown_receiver) = crate::task_manager_futures::TaskManager::new( - task_manager_message_receiver, - task_completion_sender, - subscriptions_view_receiver, - self.event_sender.clone(), - self.params.clone(), - ); - - tokio::spawn(task_manager.run(shutdown_receiver)); - - task_completion_receiver - } - - #[cfg(feature = "task-manager-channels")] - pub fn spawn_task_manager( - &mut self, - subscriptions_view_receiver: mpsc::Receiver, - task_manager_message_receiver: mpsc::Receiver, - ) -> mpsc::Receiver<(CertificateId, TaskStatus)> { - let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048); - - let (task_manager, shutdown_receiver) = crate::task_manager_channels::TaskManager::new( - task_manager_message_receiver, - task_completion_sender, - subscriptions_view_receiver, - self.event_sender.clone(), - self.params.clone(), - ); - - tokio::spawn(task_manager.run(shutdown_receiver)); - - task_completion_receiver - } - /// DoubleEcho main loop /// - Listen for shutdown signal /// - Read new messages from command_receiver @@ -99,33 +75,15 @@ impl DoubleEcho { /// - If a new subscription view is received, update the subscriptions /// - If a new Echo/Ready is received, update the state of the certificate or buffer /// the message - pub(crate) async fn run( - mut self, - mut subscriptions_view_receiver: mpsc::Receiver, - task_manager_message_receiver: mpsc::Receiver, - ) { - let (forwarding_subscriptions_sender, forwarding_subscriptions_receiver) = - mpsc::channel(2048); - let mut task_completion = self.spawn_task_manager( - forwarding_subscriptions_receiver, - task_manager_message_receiver, - ); - - info!("DoubleEcho started"); - + pub async fn run(mut self, mut subscriptions_view_receiver: mpsc::Receiver) { let shutdowned: Option> = loop { tokio::select! { biased; Some(new_subscriptions_view) = subscriptions_view_receiver.recv() => { - forwarding_subscriptions_sender.send(new_subscriptions_view.clone()).await.unwrap(); self.subscriptions = new_subscriptions_view; } - shutdown = self.shutdown.recv() => { - warn!("Double echo shutdown signal received {:?}", shutdown); - break shutdown; - }, Some(command) = self.command_receiver.recv() => { match command { @@ -133,7 +91,7 @@ impl DoubleEcho { command if self.subscriptions.is_some() => { match command { - DoubleEchoCommand::Echo { from_peer, certificate_id } => self.handle_echo(from_peer, certificate_id).await, + DoubleEchoCommand::Echo { from_peer, certificate_id, .. } => self.handle_echo(from_peer, certificate_id).await, DoubleEchoCommand::Ready { from_peer, certificate_id } => self.handle_ready(from_peer, certificate_id).await, _ => {} } @@ -145,12 +103,18 @@ impl DoubleEcho { } } - Some((certificate_id, status)) = task_completion.recv() => { + Some((certificate_id, status)) = self.running_tasks.next() => { if status == TaskStatus::Success { - self.delivered_certificates.insert(certificate_id); + self.tasks.remove(&certificate_id); + DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); } } + shutdown = self.shutdown.recv() => { + warn!("Double echo shutdown signal received {:?}", shutdown); + break shutdown; + }, + else => { warn!("Break the tokio loop for the double echo"); @@ -174,6 +138,7 @@ impl DoubleEcho { /// - or received through the gossip (first step of protocol exchange) pub async fn broadcast(&mut self, cert: Certificate, origin: bool) { info!("🙌 Starting broadcasting the Certificate {}", &cert.id); + if self.cert_pre_broadcast_check(&cert).is_err() { error!("Failure on the pre-check for the Certificate {}", &cert.id); self.event_sender @@ -221,14 +186,35 @@ impl DoubleEcho { ); None } else { - _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Broadcast { - need_gossip: origin, - cert: certificate, - }) - .await; + match self.tasks.entry(certificate.id) { + std::collections::hash_map::Entry::Vacant(entry) => { + let broadcast_state = BroadcastState::new( + certificate.clone(), + self.params.echo_threshold, + self.params.ready_threshold, + self.params.delivery_threshold, + self.event_sender.clone(), + self.subscriptions.clone(), + origin, + ); + + let (task, task_context) = Task::new(certificate.id, broadcast_state); + + self.running_tasks.push(task.into_future()); + if let Some(messages) = self.buffered_messages.remove(&certificate.id) { + let sink = task_context.sink.clone(); + spawn(async move { + for msg in messages { + _ = sink.send(msg).await; + } + }); + } + DOUBLE_ECHO_ACTIVE_TASKS_COUNT.inc(); + entry.insert(task_context); + } + std::collections::hash_map::Entry::Occupied(_) => {} + } Some(true) } } @@ -250,25 +236,45 @@ impl DoubleEcho { impl DoubleEcho { pub async fn handle_echo(&mut self, from_peer: PeerId, certificate_id: CertificateId) { if self.delivered_certificates.get(&certificate_id).is_none() { - let _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Echo { - from_peer, - certificate_id, - }) - .await; + if let Some(task_context) = self.tasks.get(&certificate_id) { + let _ = task_context + .sink + .send(DoubleEchoCommand::Echo { + from_peer, + certificate_id, + }) + .await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(DoubleEchoCommand::Echo { + from_peer, + certificate_id, + }); + }; } } pub async fn handle_ready(&mut self, from_peer: PeerId, certificate_id: CertificateId) { if self.delivered_certificates.get(&certificate_id).is_none() { - let _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Ready { - from_peer, - certificate_id, - }) - .await; + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context + .sink + .send(DoubleEchoCommand::Ready { + from_peer, + certificate_id, + }) + .await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(DoubleEchoCommand::Ready { + from_peer, + certificate_id, + }); + }; } } } diff --git a/crates/topos-tce-broadcast/src/task_manager_futures/task.rs b/crates/topos-tce-broadcast/src/double_echo/task.rs similarity index 100% rename from crates/topos-tce-broadcast/src/task_manager_futures/task.rs rename to crates/topos-tce-broadcast/src/double_echo/task.rs diff --git a/crates/topos-tce-broadcast/src/lib.rs b/crates/topos-tce-broadcast/src/lib.rs index f9fa39b75..51cf0e3e9 100644 --- a/crates/topos-tce-broadcast/src/lib.rs +++ b/crates/topos-tce-broadcast/src/lib.rs @@ -31,11 +31,6 @@ mod constant; pub mod double_echo; pub mod sampler; -#[cfg(feature = "task-manager-channels")] -pub mod task_manager_channels; -#[cfg(not(feature = "task-manager-channels"))] -pub mod task_manager_futures; - #[cfg(test)] mod tests; @@ -108,7 +103,7 @@ impl ReliableBroadcastClient { pub async fn new( config: ReliableBroadcastConfig, _local_peer_id: String, - storage: StorageClient, + _storage: StorageClient, ) -> (Self, impl Stream) { let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel::(*constant::SUBSCRIPTION_VIEW_CHANNEL_SIZE); @@ -117,25 +112,14 @@ impl ReliableBroadcastClient { let (double_echo_shutdown_channel, double_echo_shutdown_receiver) = mpsc::channel::>(1); - let (task_manager_message_sender, task_manager_message_receiver) = - mpsc::channel(*constant::BROADCAST_TASK_MANAGER_CHANNEL_SIZE); - - let pending_certificate_count = storage - .get_pending_certificates() - .await - .map(|v| v.len()) - .unwrap_or(0) as u64; - let double_echo = DoubleEcho::new( config.tce_params, - task_manager_message_sender, command_receiver, event_sender, double_echo_shutdown_receiver, - pending_certificate_count, ); - spawn(double_echo.run(subscriptions_view_receiver, task_manager_message_receiver)); + spawn(double_echo.run(subscriptions_view_receiver)); ( Self { diff --git a/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs b/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs deleted file mode 100644 index 9ae3f27ad..000000000 --- a/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs +++ /dev/null @@ -1,148 +0,0 @@ -use std::collections::HashMap; -use tokio::{spawn, sync::mpsc}; - -use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; -use topos_core::uci::CertificateId; -use tracing::warn; - -pub mod task; -use crate::double_echo::broadcast_state::BroadcastState; -use crate::sampler::SubscriptionsView; -use crate::TaskStatus; -use crate::{constant, DoubleEchoCommand}; -use task::{Task, TaskContext}; -use topos_metrics::{ - CERTIFICATE_PROCESSING_FROM_API_TOTAL, CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL, - CERTIFICATE_PROCESSING_TOTAL, -}; - -/// The TaskManager is responsible for receiving messages from the network and distributing them -/// among tasks. These tasks are either created if none for a certain CertificateID exists yet, -/// or existing tasks will receive the messages. -pub struct TaskManager { - pub message_receiver: mpsc::Receiver, - pub task_completion_receiver: mpsc::Receiver<(CertificateId, TaskStatus)>, - pub task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, - pub notify_task_completion: mpsc::Sender<(CertificateId, TaskStatus)>, - pub subscription_view_receiver: mpsc::Receiver, - pub subscriptions: SubscriptionsView, - pub event_sender: mpsc::Sender, - pub tasks: HashMap, - pub buffered_messages: HashMap>, - pub thresholds: ReliableBroadcastParams, - pub shutdown_sender: mpsc::Sender<()>, -} - -impl TaskManager { - pub fn new( - message_receiver: mpsc::Receiver, - notify_task_completion: mpsc::Sender<(CertificateId, TaskStatus)>, - subscription_view_receiver: mpsc::Receiver, - event_sender: mpsc::Sender, - thresholds: ReliableBroadcastParams, - ) -> (Self, mpsc::Receiver<()>) { - let (task_completion_sender, task_completion_receiver) = - mpsc::channel(*constant::BROADCAST_TASK_COMPLETION_CHANNEL_SIZE); - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - ( - Self { - message_receiver, - task_completion_receiver, - task_completion_sender, - notify_task_completion, - subscription_view_receiver, - subscriptions: SubscriptionsView::default(), - event_sender, - tasks: HashMap::new(), - buffered_messages: Default::default(), - thresholds, - shutdown_sender, - }, - shutdown_receiver, - ) - } - - pub async fn run(mut self, mut shutdown_receiver: mpsc::Receiver<()>) { - loop { - tokio::select! { - biased; - - Some(new_subscriptions_view) = self.subscription_view_receiver.recv() => { - self.subscriptions = new_subscriptions_view; - } - - Some(msg) = self.message_receiver.recv() => { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready{ certificate_id, .. } => { - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages.entry(certificate_id).or_default().push(msg); - } - } - DoubleEchoCommand::Broadcast { ref cert, need_gossip } => { - match self.tasks.entry(cert.id) { - std::collections::hash_map::Entry::Vacant(entry) => { - let broadcast_state = BroadcastState::new( - cert.clone(), - self.thresholds.echo_threshold, - self.thresholds.ready_threshold, - self.thresholds.delivery_threshold, - self.event_sender.clone(), - self.subscriptions.clone(), - need_gossip, - ); - - let (task, task_context) = Task::new(cert.id, self.task_completion_sender.clone(), broadcast_state); - - spawn(task.run()); - - CERTIFICATE_PROCESSING_TOTAL.inc(); - if need_gossip { - CERTIFICATE_PROCESSING_FROM_API_TOTAL.inc(); - } else { - CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL.inc(); - } - - if let Some(messages) = self.buffered_messages.remove(&cert.id) { - let sink = task_context.sink.clone(); - spawn(async move { - for msg in messages { - _ = sink.send(msg).await; - } - }); - } - - entry.insert(task_context); - } - std::collections::hash_map::Entry::Occupied(_) => {}, - } - } - } - } - - Some((certificate_id, status)) = self.task_completion_receiver.recv() => { - self.tasks.remove(&certificate_id); - let _ = self.notify_task_completion.send((certificate_id, status)).await; - } - - _ = shutdown_receiver.recv() => { - warn!("Task Manager shutting down"); - - for task in self.tasks.iter() { - task.1.shutdown_sender.send(()).await.unwrap(); - } - - break; - } - } - } - } -} - -impl Drop for TaskManager { - fn drop(&mut self) { - _ = self.shutdown_sender.try_send(()); - } -} diff --git a/crates/topos-tce-broadcast/src/task_manager_channels/task.rs b/crates/topos-tce-broadcast/src/task_manager_channels/task.rs deleted file mode 100644 index 540042555..000000000 --- a/crates/topos-tce-broadcast/src/task_manager_channels/task.rs +++ /dev/null @@ -1,87 +0,0 @@ -use tokio::sync::mpsc; - -use crate::double_echo::broadcast_state::{BroadcastState, Status}; -use crate::DoubleEchoCommand; -use crate::TaskStatus; -use topos_core::uci::CertificateId; - -#[derive(Debug, Clone)] -pub struct TaskContext { - pub sink: mpsc::Sender, - pub shutdown_sender: mpsc::Sender<()>, -} - -pub struct Task { - pub message_receiver: mpsc::Receiver, - pub certificate_id: CertificateId, - pub completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, - pub broadcast_state: BroadcastState, - pub shutdown_receiver: mpsc::Receiver<()>, -} - -impl Task { - pub fn new( - certificate_id: CertificateId, - completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, - broadcast_state: BroadcastState, - ) -> (Self, TaskContext) { - let (message_sender, message_receiver) = mpsc::channel(1024); - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - let task_context = TaskContext { - sink: message_sender, - shutdown_sender, - }; - - let task = Task { - message_receiver, - certificate_id, - completion_sender, - broadcast_state, - shutdown_receiver, - }; - - (task, task_context) - } - - pub(crate) async fn run(mut self) { - loop { - tokio::select! { - Some(msg) = self.message_receiver.recv() => { - match msg { - DoubleEchoCommand::Echo { from_peer, .. } => { - if let Some(Status::DeliveredWithReadySent) = - self.broadcast_state.apply_echo(from_peer) - { - let _ = self - .completion_sender - .send((self.certificate_id, TaskStatus::Success)) - .await; - - break; - } - } - DoubleEchoCommand::Ready { from_peer, .. } => { - if let Some(Status::DeliveredWithReadySent) = - self.broadcast_state.apply_ready(from_peer) - { - let _ = self - .completion_sender - .send((self.certificate_id, TaskStatus::Success)) - .await; - - break; - } - } - _ => {} - } - } - - _ = self.shutdown_receiver.recv() => { - println!("Received shutdown, shutting down task {:?}", self.certificate_id); - break; - } - } - } - } -} diff --git a/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs b/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs deleted file mode 100644 index 42e6e2132..000000000 --- a/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs +++ /dev/null @@ -1,161 +0,0 @@ -use futures::stream::FuturesUnordered; -use futures::Future; -use futures::StreamExt; -use std::collections::HashMap; -use std::future::IntoFuture; -use std::pin::Pin; -use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; -use tokio::{spawn, sync::mpsc}; -use topos_core::uci::CertificateId; -use topos_metrics::CERTIFICATE_PROCESSING_FROM_API_TOTAL; -use topos_metrics::CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL; -use topos_metrics::CERTIFICATE_PROCESSING_TOTAL; -use topos_metrics::DOUBLE_ECHO_ACTIVE_TASKS_COUNT; -use tracing::warn; - -pub mod task; - -use crate::double_echo::broadcast_state::BroadcastState; -use crate::sampler::SubscriptionsView; -use crate::DoubleEchoCommand; -use crate::TaskStatus; -use task::{Task, TaskContext}; - -/// The TaskManager is responsible for receiving messages from the network and distributing them -/// among tasks. These tasks are either created if none for a certain CertificateID exists yet, -/// or existing tasks will receive the messages. -pub struct TaskManager { - pub message_receiver: mpsc::Receiver, - pub task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, - pub subscription_view_receiver: mpsc::Receiver, - pub subscriptions: SubscriptionsView, - pub event_sender: mpsc::Sender, - pub tasks: HashMap, - #[allow(clippy::type_complexity)] - pub running_tasks: FuturesUnordered< - Pin + Send + 'static>>, - >, - pub buffered_messages: HashMap>, - pub thresholds: ReliableBroadcastParams, - pub shutdown_sender: mpsc::Sender<()>, -} - -impl TaskManager { - pub fn new( - message_receiver: mpsc::Receiver, - task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, - subscription_view_receiver: mpsc::Receiver, - event_sender: mpsc::Sender, - thresholds: ReliableBroadcastParams, - ) -> (Self, mpsc::Receiver<()>) { - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - ( - Self { - message_receiver, - task_completion_sender, - subscription_view_receiver, - subscriptions: SubscriptionsView::default(), - event_sender, - tasks: HashMap::new(), - running_tasks: FuturesUnordered::new(), - buffered_messages: Default::default(), - thresholds, - shutdown_sender, - }, - shutdown_receiver, - ) - } - - pub async fn run(mut self, mut shutdown_receiver: mpsc::Receiver<()>) { - loop { - tokio::select! { - biased; - - Some(new_subscriptions_view) = self.subscription_view_receiver.recv() => { - self.subscriptions = new_subscriptions_view; - } - - Some(msg) = self.message_receiver.recv() => { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(msg); - }; - } - DoubleEchoCommand::Broadcast { ref cert, need_gossip } => { - match self.tasks.entry(cert.id) { - std::collections::hash_map::Entry::Vacant(entry) => { - let broadcast_state = BroadcastState::new( - cert.clone(), - self.thresholds.echo_threshold, - self.thresholds.ready_threshold, - self.thresholds.delivery_threshold, - self.event_sender.clone(), - self.subscriptions.clone(), - need_gossip, - ); - - let (task, task_context) = Task::new(cert.id, broadcast_state); - - self.running_tasks.push(task.into_future()); - - if let Some(messages) = self.buffered_messages.remove(&cert.id) { - let sink = task_context.sink.clone(); - spawn(async move { - for msg in messages { - _ = sink.send(msg).await; - } - }); - } - - DOUBLE_ECHO_ACTIVE_TASKS_COUNT.inc(); - - CERTIFICATE_PROCESSING_TOTAL.inc(); - if need_gossip { - CERTIFICATE_PROCESSING_FROM_API_TOTAL.inc(); - } else { - CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL.inc(); - } - - entry.insert(task_context); - } - std::collections::hash_map::Entry::Occupied(_) => {}, - } - } - } - } - - - Some((certificate_id, status)) = self.running_tasks.next() => { - if status == TaskStatus::Success { - self.tasks.remove(&certificate_id); - DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); - let _ = self.task_completion_sender.send((certificate_id, status)).await; - } - } - - _ = shutdown_receiver.recv() => { - warn!("Task Manager shutting down"); - - for task in self.tasks.iter() { - task.1.shutdown_sender.send(()).await.unwrap(); - } - - break; - } - } - } - } -} - -impl Drop for TaskManager { - fn drop(&mut self) { - _ = self.shutdown_sender.try_send(()); - } -} diff --git a/crates/topos-tce-broadcast/src/tests/mod.rs b/crates/topos-tce-broadcast/src/tests/mod.rs index 80d8a4dbf..0505212c2 100644 --- a/crates/topos-tce-broadcast/src/tests/mod.rs +++ b/crates/topos-tce-broadcast/src/tests/mod.rs @@ -4,8 +4,8 @@ use rstest::*; use std::collections::HashSet; use std::usize; use tce_transport::ReliableBroadcastParams; - use tokio::sync::mpsc::Receiver; +use tokio::task::JoinHandle; use topos_test_sdk::constants::*; @@ -35,7 +35,7 @@ fn medium_config() -> TceParams { } } -#[derive(Debug)] +#[derive(Clone, Debug)] struct TceParams { nb_peers: usize, broadcast_params: ReliableBroadcastParams, @@ -43,24 +43,27 @@ struct TceParams { struct Context { event_receiver: Receiver, + cmd_sender: Sender, + subscriptions: SubscriptionsView, + #[allow(dead_code)] + double_echo_handle: JoinHandle<()>, + #[allow(dead_code)] + shutdown_sender: Sender>, } -async fn create_context(params: TceParams) -> (DoubleEcho, Context) { - let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE); +async fn create_context(params: TceParams) -> Context { + let (_subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE); - let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE); + let (cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE); let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE); - let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) = + let (double_echo_shutdown_sender, double_echo_shutdown_receiver) = mpsc::channel::>(1); - let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE); let mut double_echo = DoubleEcho::new( params.broadcast_params, - task_manager_message_sender.clone(), cmd_receiver, event_sender, double_echo_shutdown_receiver, - 0, ); // List of peers @@ -72,63 +75,94 @@ async fn create_context(params: TceParams) -> (DoubleEcho, Context) { peers.insert(peer); } - // Subscriptions - double_echo.subscriptions.echo = peers.clone(); - double_echo.subscriptions.ready = peers.clone(); - double_echo.subscriptions.network_size = params.nb_peers; - - let msg = SubscriptionsView { + let subscriptions = SubscriptionsView { echo: peers.clone(), ready: peers.clone(), network_size: params.nb_peers, }; - subscriptions_view_sender.send(msg).await.unwrap(); + // Subscriptions + double_echo.subscriptions.echo = peers.clone(); + double_echo.subscriptions.ready = peers.clone(); + double_echo.subscriptions.network_size = params.nb_peers; - double_echo.spawn_task_manager(subscriptions_view_receiver, task_manager_message_receiver); + let double_echo_handle = spawn(double_echo.run(subscriptions_view_receiver)); - (double_echo, Context { event_receiver }) + Context { + event_receiver, + cmd_sender, + subscriptions, + double_echo_handle, + shutdown_sender: double_echo_shutdown_sender, + } } -async fn reach_echo_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { - let selected = double_echo - .subscriptions +async fn reach_echo_threshold( + cmd_sender: Sender, + params: TceParams, + cert: &Certificate, + subscriptions: SubscriptionsView, +) { + let selected = subscriptions .echo .iter() - .take(double_echo.params.echo_threshold) + .take(params.broadcast_params.echo_threshold) .cloned() .collect::>(); for p in selected { - double_echo.handle_echo(p, cert.id).await; + let _ = cmd_sender + .send(DoubleEchoCommand::Echo { + from_peer: p, + certificate_id: cert.id, + }) + .await; } } -async fn reach_ready_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { - let selected = double_echo - .subscriptions +async fn reach_ready_threshold( + cmd_sender: Sender, + params: TceParams, + cert: &Certificate, + subscriptions: SubscriptionsView, +) { + let selected = subscriptions .ready .iter() - .take(double_echo.params.ready_threshold) + .take(params.broadcast_params.ready_threshold) .cloned() .collect::>(); for p in selected { - double_echo.handle_ready(p, cert.id).await; + let _ = cmd_sender + .send(DoubleEchoCommand::Ready { + from_peer: p, + certificate_id: cert.id, + }) + .await; } } -async fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { - let selected = double_echo - .subscriptions +async fn reach_delivery_threshold( + cmd_sender: Sender, + params: TceParams, + cert: &Certificate, + subscriptions: SubscriptionsView, +) { + let selected = subscriptions .ready .iter() - .take(double_echo.params.delivery_threshold) + .take(params.broadcast_params.delivery_threshold) .cloned() .collect::>(); for p in selected { - double_echo.handle_ready(p, cert.id).await; + let _ = cmd_sender + .send(DoubleEchoCommand::Ready { + from_peer: p, + certificate_id: cert.id, + }) + .await; } } @@ -138,7 +172,7 @@ async fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certifica #[tokio::test] #[trace] async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) { - let (mut double_echo, mut ctx) = create_context(params).await; + let mut ctx = create_context(params.clone()).await; let dummy_cert = Certificate::new( PREV_CERTIFICATE_ID, @@ -152,7 +186,13 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) .expect("Dummy certificate"); // Trigger Echo upon dispatching - double_echo.broadcast(dummy_cert.clone(), true).await; + let _ = ctx + .cmd_sender + .send(DoubleEchoCommand::Broadcast { + cert: dummy_cert.clone(), + need_gossip: true, + }) + .await; assert!(matches!( ctx.event_receiver.recv().await, @@ -163,6 +203,7 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) ctx.event_receiver.try_recv(), Ok(ProtocolEvents::Gossip { .. }) )); + assert!(matches!( ctx.event_receiver.try_recv(), Ok(ProtocolEvents::Echo { .. }) @@ -174,7 +215,13 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) )); // Trigger Ready upon reaching the Echo threshold - reach_echo_threshold(&mut double_echo, &dummy_cert).await; + reach_echo_threshold( + ctx.cmd_sender.clone(), + params.clone(), + &dummy_cert, + ctx.subscriptions.clone(), + ) + .await; assert!(matches!( ctx.event_receiver.recv().await, @@ -182,7 +229,13 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) )); // Trigger Delivery upon reaching the Delivery threshold - reach_delivery_threshold(&mut double_echo, &dummy_cert).await; + reach_delivery_threshold( + ctx.cmd_sender.clone(), + params.clone(), + &dummy_cert, + ctx.subscriptions.clone(), + ) + .await; assert!(matches!( ctx.event_receiver.recv().await, @@ -196,7 +249,7 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) #[tokio::test] #[trace] async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) { - let (mut double_echo, mut ctx) = create_context(params).await; + let mut ctx = create_context(params.clone()).await; let dummy_cert = Certificate::new( PREV_CERTIFICATE_ID, @@ -210,7 +263,13 @@ async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) { .expect("Dummy certificate"); // Trigger Echo upon dispatching - double_echo.broadcast(dummy_cert.clone(), true).await; + let _ = ctx + .cmd_sender + .send(DoubleEchoCommand::Broadcast { + cert: dummy_cert.clone(), + need_gossip: true, + }) + .await; assert!(matches!( ctx.event_receiver.recv().await, @@ -227,8 +286,14 @@ async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) { Ok(ProtocolEvents::Echo { .. }) )); - // Trigger Ready upon reaching the Ready threshold - reach_ready_threshold(&mut double_echo, &dummy_cert).await; + // Trigger Ready upon reaching the Echo threshold + reach_ready_threshold( + ctx.cmd_sender.clone(), + params.clone(), + &dummy_cert, + ctx.subscriptions.clone(), + ) + .await; assert!(matches!( ctx.event_receiver.recv().await, diff --git a/crates/topos/Cargo.toml b/crates/topos/Cargo.toml index 4422c4e89..580332044 100644 --- a/crates/topos/Cargo.toml +++ b/crates/topos/Cargo.toml @@ -62,7 +62,6 @@ regex = "1" [features] default = ["tce", "sequencer", "network", "node", "setup", "subnet"] -broadcast_via_channels = ["default", "topos-tce-broadcast/task-manager-channels"] tce = ["topos-tce", "topos-tce-transport"] sequencer = ["topos-sequencer"] network = ["topos-certificate-spammer"]