From 35f85f0c12fbb5dc47ac94c1054f79e0ba6b2800 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Thu, 18 Apr 2024 12:21:07 -0300 Subject: [PATCH 1/3] fix: increase channel size and pending cert interval --- crates/topos-config/src/tce/synchronization.rs | 2 +- crates/topos-p2p/src/behaviour/gossip.rs | 4 ++-- crates/topos-sequencer-subnet-runtime/src/proxy.rs | 2 +- crates/topos-tce-api/src/grpc/mod.rs | 4 ++-- crates/topos-tce-api/src/lib.rs | 4 ++-- crates/topos-tce-broadcast/src/constant.rs | 6 +++--- crates/topos-tce-broadcast/src/double_echo/mod.rs | 2 +- crates/topos-tce-broadcast/src/task_manager/mod.rs | 2 +- crates/topos-tce-proxy/src/client.rs | 6 +++--- crates/topos-tce-proxy/src/worker.rs | 6 +++--- crates/topos-tce-synchronizer/src/builder.rs | 2 +- crates/topos-tce/src/app_context.rs | 2 +- 12 files changed, 21 insertions(+), 21 deletions(-) diff --git a/crates/topos-config/src/tce/synchronization.rs b/crates/topos-config/src/tce/synchronization.rs index 222cd1030..e09dbd6ad 100644 --- a/crates/topos-config/src/tce/synchronization.rs +++ b/crates/topos-config/src/tce/synchronization.rs @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig { } impl SynchronizationConfig { - pub const INTERVAL_SECONDS: u64 = 10; + pub const INTERVAL_SECONDS: u64 = 30; pub const LIMIT_PER_SUBNET: usize = 100; const fn default_interval_seconds() -> u64 { diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index eaf6991ef..6afe89ec4 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA use super::HealthStatus; -const MAX_BATCH_SIZE: usize = 10; +const MAX_BATCH_SIZE: usize = 1024; pub struct Behaviour { batch_size: usize, @@ -76,7 +76,7 @@ impl Behaviour { .unwrap_or(Ok(MAX_BATCH_SIZE)) .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() - .max_transmit_size(2 * 1024 * 1024) + .max_transmit_size(2 * 2048 * 2048) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(|msg_id| { // Content based id diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index 5039698bb..f2edc8f29 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -68,7 +68,7 @@ impl SubnetRuntimeProxy { address: {}, ", &config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address ); - let (command_sender, mut command_rcv) = mpsc::channel::(256); + let (command_sender, mut command_rcv) = mpsc::channel::(1024); let ws_runtime_endpoint = config.ws_endpoint.clone(); let http_runtime_endpoint = config.http_endpoint.clone(); let subnet_contract_address = Arc::new(config.subnet_contract_address.clone()); diff --git a/crates/topos-tce-api/src/grpc/mod.rs b/crates/topos-tce-api/src/grpc/mod.rs index ae6f132e4..72e386d5d 100644 --- a/crates/topos-tce-api/src/grpc/mod.rs +++ b/crates/topos-tce-api/src/grpc/mod.rs @@ -29,7 +29,7 @@ pub(crate) mod console; #[cfg(test)] mod tests; -const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100; +const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024; pub(crate) mod builder; pub(crate) mod messaging; @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService { .map(move |message| Self::parse_stream(message, stream_id)) .boxed(); - let (command_sender, command_receiver) = mpsc::channel(2048); + let (command_sender, command_receiver) = mpsc::channel(6144); let (outbound_stream, rx) = mpsc::channel::, OutboundMessage), Status>>( DEFAULT_CHANNEL_STREAM_CAPACITY, diff --git a/crates/topos-tce-api/src/lib.rs b/crates/topos-tce-api/src/lib.rs index 41808c13a..44d2d20e0 100644 --- a/crates/topos-tce-api/src/lib.rs +++ b/crates/topos-tce-api/src/lib.rs @@ -9,10 +9,10 @@ mod tests; pub(crate) mod constants { /// Constant size of every channel in the crate - pub(crate) const CHANNEL_SIZE: usize = 2048; + pub(crate) const CHANNEL_SIZE: usize = 6144; /// Constant size of every transient stream channel in the crate - pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024; + pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 2048; } pub use runtime::{ error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent, diff --git a/crates/topos-tce-broadcast/src/constant.rs b/crates/topos-tce-broadcast/src/constant.rs index 30e46f24d..e1f80d8eb 100644 --- a/crates/topos-tce-broadcast/src/constant.rs +++ b/crates/topos-tce-broadcast/src/constant.rs @@ -6,19 +6,19 @@ lazy_static! { std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(2048); + .unwrap_or(6144); /// Size of the channel between double echo and the task manager pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize = std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(20_480); + .unwrap_or(6144); /// Size of the channel to send protocol events from the double echo pub static ref PROTOCOL_CHANNEL_SIZE: usize = std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(2048); + .unwrap_or(6144); /// Capacity alert threshold for the double echo command channel pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE .checked_mul(10) diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 3b9e0274f..c6b67b023 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -55,7 +55,7 @@ pub struct DoubleEcho { } impl DoubleEcho { - pub const MAX_BUFFER_SIZE: usize = 2048; + pub const MAX_BUFFER_SIZE: usize = 6144; #[allow(clippy::too_many_arguments)] pub fn new( diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index d25e3525d..f1dec56e4 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -107,7 +107,7 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut interval = tokio::time::interval(Duration::from_millis(15)); loop { tokio::select! { diff --git a/crates/topos-tce-proxy/src/client.rs b/crates/topos-tce-proxy/src/client.rs index a6682abb5..11f477157 100644 --- a/crates/topos-tce-proxy/src/client.rs +++ b/crates/topos-tce-proxy/src/client.rs @@ -22,9 +22,9 @@ use topos_core::{ use tracing::{debug, error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100; -const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 100; -const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 100; +const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 2048; +const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 2048; +const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 2048; // Maximum backoff retry timeout in seconds (1 hour) const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600); diff --git a/crates/topos-tce-proxy/src/worker.rs b/crates/topos-tce-proxy/src/worker.rs index 0869893af..9597011e9 100644 --- a/crates/topos-tce-proxy/src/worker.rs +++ b/crates/topos-tce-proxy/src/worker.rs @@ -19,11 +19,11 @@ pub struct TceProxyWorker { } impl TceProxyWorker { - /// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet. + /// Construct a new [`TceProxyWorker`] with a 2048 items deep channel to send commands to and receive events from a TCE node on the given subnet. /// The worker holds a [`crate::client::TceClient`] pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> { - let (command_sender, mut command_rcv) = mpsc::channel::(128); - let (evt_sender, evt_rcv) = mpsc::channel::(128); + let (command_sender, mut command_rcv) = mpsc::channel::(2048); + let (evt_sender, evt_rcv) = mpsc::channel::(2048); let (tce_client_shutdown_channel, shutdown_receiver) = mpsc::channel::>(1); diff --git a/crates/topos-tce-synchronizer/src/builder.rs b/crates/topos-tce-synchronizer/src/builder.rs index a7b70e08a..182b6bf37 100644 --- a/crates/topos-tce-synchronizer/src/builder.rs +++ b/crates/topos-tce-synchronizer/src/builder.rs @@ -29,7 +29,7 @@ impl Default for SynchronizerBuilder { network_client: None, store: None, config: SynchronizationConfig::default(), - event_channel_size: 100, + event_channel_size: 1024, shutdown: None, } } diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index fedc50b66..fa35e825a 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -69,7 +69,7 @@ impl AppContext { validator_store: Arc, api_context: RuntimeContext, ) -> (Self, mpsc::Receiver) { - let (events, receiver) = mpsc::channel(100); + let (events, receiver) = mpsc::channel(2048); ( Self { is_validator, From b98bf85fa94f95996830cd0f4097ab4d51d7e013 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Thu, 18 Apr 2024 12:53:43 -0300 Subject: [PATCH 2/3] fix: increase channel sizes and interval speed further --- crates/topos-config/src/tce/synchronization.rs | 2 +- crates/topos-p2p/src/behaviour/gossip.rs | 4 ++-- crates/topos-sequencer-subnet-runtime/src/proxy.rs | 2 +- crates/topos-tce-api/src/grpc/mod.rs | 2 +- crates/topos-tce-broadcast/src/task_manager/mod.rs | 2 +- crates/topos-tce-proxy/src/client.rs | 6 +++--- crates/topos-tce/src/app_context.rs | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/topos-config/src/tce/synchronization.rs b/crates/topos-config/src/tce/synchronization.rs index e09dbd6ad..b6ff4050d 100644 --- a/crates/topos-config/src/tce/synchronization.rs +++ b/crates/topos-config/src/tce/synchronization.rs @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig { } impl SynchronizationConfig { - pub const INTERVAL_SECONDS: u64 = 30; + pub const INTERVAL_SECONDS: u64 = 60; pub const LIMIT_PER_SUBNET: usize = 100; const fn default_interval_seconds() -> u64 { diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index 6afe89ec4..d5cd8a252 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA use super::HealthStatus; -const MAX_BATCH_SIZE: usize = 1024; +const MAX_BATCH_SIZE: usize = 2048; pub struct Behaviour { batch_size: usize, @@ -76,7 +76,7 @@ impl Behaviour { .unwrap_or(Ok(MAX_BATCH_SIZE)) .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() - .max_transmit_size(2 * 2048 * 2048) + .max_transmit_size(5 * 2048 * 2048) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(|msg_id| { // Content based id diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index f2edc8f29..8160b62e1 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -68,7 +68,7 @@ impl SubnetRuntimeProxy { address: {}, ", &config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address ); - let (command_sender, mut command_rcv) = mpsc::channel::(1024); + let (command_sender, mut command_rcv) = mpsc::channel::(5120); let ws_runtime_endpoint = config.ws_endpoint.clone(); let http_runtime_endpoint = config.http_endpoint.clone(); let subnet_contract_address = Arc::new(config.subnet_contract_address.clone()); diff --git a/crates/topos-tce-api/src/grpc/mod.rs b/crates/topos-tce-api/src/grpc/mod.rs index 72e386d5d..c7e9fbc06 100644 --- a/crates/topos-tce-api/src/grpc/mod.rs +++ b/crates/topos-tce-api/src/grpc/mod.rs @@ -29,7 +29,7 @@ pub(crate) mod console; #[cfg(test)] mod tests; -const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024; +const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 5120; pub(crate) mod builder; pub(crate) mod messaging; diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index f1dec56e4..783945f8f 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -107,7 +107,7 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut interval = tokio::time::interval(Duration::from_millis(15)); + let mut interval = tokio::time::interval(Duration::from_millis(5)); loop { tokio::select! { diff --git a/crates/topos-tce-proxy/src/client.rs b/crates/topos-tce-proxy/src/client.rs index 11f477157..18f917cc9 100644 --- a/crates/topos-tce-proxy/src/client.rs +++ b/crates/topos-tce-proxy/src/client.rs @@ -22,9 +22,9 @@ use topos_core::{ use tracing::{debug, error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 2048; -const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 2048; -const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 2048; +const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 5120; +const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 5120; +const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 5120; // Maximum backoff retry timeout in seconds (1 hour) const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600); diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index fa35e825a..1a410438c 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -69,7 +69,7 @@ impl AppContext { validator_store: Arc, api_context: RuntimeContext, ) -> (Self, mpsc::Receiver) { - let (events, receiver) = mpsc::channel(2048); + let (events, receiver) = mpsc::channel(5120); ( Self { is_validator, From d685d4a368aa8ddafdc590754c1dd64907210702 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Thu, 18 Apr 2024 13:12:01 -0300 Subject: [PATCH 3/3] fix: increase channel size further --- crates/topos-config/src/tce/synchronization.rs | 2 +- crates/topos-p2p/src/behaviour/gossip.rs | 4 ++-- crates/topos-sequencer-subnet-runtime/src/proxy.rs | 3 ++- crates/topos-tce-api/src/grpc/mod.rs | 4 ++-- crates/topos-tce-api/src/lib.rs | 4 ++-- crates/topos-tce-broadcast/src/constant.rs | 6 +++--- crates/topos-tce-broadcast/src/double_echo/mod.rs | 2 +- crates/topos-tce-broadcast/src/task_manager/mod.rs | 2 +- crates/topos-tce-proxy/src/worker.rs | 6 +++--- crates/topos-tce/src/app_context.rs | 2 +- 10 files changed, 18 insertions(+), 17 deletions(-) diff --git a/crates/topos-config/src/tce/synchronization.rs b/crates/topos-config/src/tce/synchronization.rs index b6ff4050d..7f6682764 100644 --- a/crates/topos-config/src/tce/synchronization.rs +++ b/crates/topos-config/src/tce/synchronization.rs @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig { } impl SynchronizationConfig { - pub const INTERVAL_SECONDS: u64 = 60; + pub const INTERVAL_SECONDS: u64 = 60 * 3; pub const LIMIT_PER_SUBNET: usize = 100; const fn default_interval_seconds() -> u64 { diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index d5cd8a252..5651c9788 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA use super::HealthStatus; -const MAX_BATCH_SIZE: usize = 2048; +const MAX_BATCH_SIZE: usize = 1024 * 20; pub struct Behaviour { batch_size: usize, @@ -76,7 +76,7 @@ impl Behaviour { .unwrap_or(Ok(MAX_BATCH_SIZE)) .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() - .max_transmit_size(5 * 2048 * 2048) + .max_transmit_size(20 * 2048 * 2048) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(|msg_id| { // Content based id diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index 8160b62e1..e0e93f48d 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -68,7 +68,8 @@ impl SubnetRuntimeProxy { address: {}, ", &config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address ); - let (command_sender, mut command_rcv) = mpsc::channel::(5120); + let (command_sender, mut command_rcv) = + mpsc::channel::(1024 * 20); let ws_runtime_endpoint = config.ws_endpoint.clone(); let http_runtime_endpoint = config.http_endpoint.clone(); let subnet_contract_address = Arc::new(config.subnet_contract_address.clone()); diff --git a/crates/topos-tce-api/src/grpc/mod.rs b/crates/topos-tce-api/src/grpc/mod.rs index c7e9fbc06..2463d3ced 100644 --- a/crates/topos-tce-api/src/grpc/mod.rs +++ b/crates/topos-tce-api/src/grpc/mod.rs @@ -29,7 +29,7 @@ pub(crate) mod console; #[cfg(test)] mod tests; -const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 5120; +const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024 * 20; pub(crate) mod builder; pub(crate) mod messaging; @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService { .map(move |message| Self::parse_stream(message, stream_id)) .boxed(); - let (command_sender, command_receiver) = mpsc::channel(6144); + let (command_sender, command_receiver) = mpsc::channel(1024 * 20); let (outbound_stream, rx) = mpsc::channel::, OutboundMessage), Status>>( DEFAULT_CHANNEL_STREAM_CAPACITY, diff --git a/crates/topos-tce-api/src/lib.rs b/crates/topos-tce-api/src/lib.rs index 44d2d20e0..7d1158d52 100644 --- a/crates/topos-tce-api/src/lib.rs +++ b/crates/topos-tce-api/src/lib.rs @@ -9,10 +9,10 @@ mod tests; pub(crate) mod constants { /// Constant size of every channel in the crate - pub(crate) const CHANNEL_SIZE: usize = 6144; + pub(crate) const CHANNEL_SIZE: usize = 1024 * 20; /// Constant size of every transient stream channel in the crate - pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 2048; + pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024 * 20; } pub use runtime::{ error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent, diff --git a/crates/topos-tce-broadcast/src/constant.rs b/crates/topos-tce-broadcast/src/constant.rs index e1f80d8eb..74799481a 100644 --- a/crates/topos-tce-broadcast/src/constant.rs +++ b/crates/topos-tce-broadcast/src/constant.rs @@ -6,19 +6,19 @@ lazy_static! { std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(6144); + .unwrap_or(1024 * 20); /// Size of the channel between double echo and the task manager pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize = std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(6144); + .unwrap_or(1024 * 20); /// Size of the channel to send protocol events from the double echo pub static ref PROTOCOL_CHANNEL_SIZE: usize = std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(6144); + .unwrap_or(1024 * 20); /// Capacity alert threshold for the double echo command channel pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE .checked_mul(10) diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index c6b67b023..4439508c6 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -55,7 +55,7 @@ pub struct DoubleEcho { } impl DoubleEcho { - pub const MAX_BUFFER_SIZE: usize = 6144; + pub const MAX_BUFFER_SIZE: usize = 1024 * 20; #[allow(clippy::too_many_arguments)] pub fn new( diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 783945f8f..c7746e9e9 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -107,7 +107,7 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut interval = tokio::time::interval(Duration::from_millis(5)); + let mut interval = tokio::time::interval(Duration::from_millis(3)); loop { tokio::select! { diff --git a/crates/topos-tce-proxy/src/worker.rs b/crates/topos-tce-proxy/src/worker.rs index 9597011e9..e8f78234f 100644 --- a/crates/topos-tce-proxy/src/worker.rs +++ b/crates/topos-tce-proxy/src/worker.rs @@ -19,11 +19,11 @@ pub struct TceProxyWorker { } impl TceProxyWorker { - /// Construct a new [`TceProxyWorker`] with a 2048 items deep channel to send commands to and receive events from a TCE node on the given subnet. + /// Construct a new [`TceProxyWorker`] with a 1024 * 20 items deep channel to send commands to and receive events from a TCE node on the given subnet. /// The worker holds a [`crate::client::TceClient`] pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> { - let (command_sender, mut command_rcv) = mpsc::channel::(2048); - let (evt_sender, evt_rcv) = mpsc::channel::(2048); + let (command_sender, mut command_rcv) = mpsc::channel::(1024 * 20); + let (evt_sender, evt_rcv) = mpsc::channel::(1024 * 20); let (tce_client_shutdown_channel, shutdown_receiver) = mpsc::channel::>(1); diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index 1a410438c..f7478c508 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -69,7 +69,7 @@ impl AppContext { validator_store: Arc, api_context: RuntimeContext, ) -> (Self, mpsc::Receiver) { - let (events, receiver) = mpsc::channel(5120); + let (events, receiver) = mpsc::channel(1024 * 20); ( Self { is_validator,