Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

fix: increase channel size and pending cert interval #491

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/topos-config/src/tce/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig {
}

impl SynchronizationConfig {
pub const INTERVAL_SECONDS: u64 = 10;
pub const INTERVAL_SECONDS: u64 = 60 * 3;
pub const LIMIT_PER_SUBNET: usize = 100;

const fn default_interval_seconds() -> u64 {
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA

use super::HealthStatus;

const MAX_BATCH_SIZE: usize = 10;
const MAX_BATCH_SIZE: usize = 1024 * 20;

pub struct Behaviour {
batch_size: usize,
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Behaviour {
.unwrap_or(Ok(MAX_BATCH_SIZE))
.unwrap();
let gossipsub = gossipsub::ConfigBuilder::default()
.max_transmit_size(2 * 1024 * 1024)
.max_transmit_size(20 * 2048 * 2048)
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(|msg_id| {
// Content based id
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-sequencer-subnet-runtime/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl SubnetRuntimeProxy {
address: {}, ",
&config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address
);
let (command_sender, mut command_rcv) = mpsc::channel::<SubnetRuntimeProxyCommand>(256);
let (command_sender, mut command_rcv) =
mpsc::channel::<SubnetRuntimeProxyCommand>(1024 * 20);
let ws_runtime_endpoint = config.ws_endpoint.clone();
let http_runtime_endpoint = config.http_endpoint.clone();
let subnet_contract_address = Arc::new(config.subnet_contract_address.clone());
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) mod console;
#[cfg(test)]
mod tests;

const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100;
const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024 * 20;

pub(crate) mod builder;
pub(crate) mod messaging;
Expand Down Expand Up @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService {
.map(move |message| Self::parse_stream(message, stream_id))
.boxed();

let (command_sender, command_receiver) = mpsc::channel(2048);
let (command_sender, command_receiver) = mpsc::channel(1024 * 20);

let (outbound_stream, rx) = mpsc::channel::<Result<(Option<Uuid>, OutboundMessage), Status>>(
DEFAULT_CHANNEL_STREAM_CAPACITY,
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ mod tests;

pub(crate) mod constants {
/// Constant size of every channel in the crate
pub(crate) const CHANNEL_SIZE: usize = 2048;
pub(crate) const CHANNEL_SIZE: usize = 1024 * 20;

/// Constant size of every transient stream channel in the crate
pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024;
pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024 * 20;
}
pub use runtime::{
error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent,
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ lazy_static! {
std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
.unwrap_or(1024 * 20);
/// Size of the channel between double echo and the task manager
pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize =
std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(20_480);
.unwrap_or(1024 * 20);
/// Size of the channel to send protocol events from the double echo
pub static ref PROTOCOL_CHANNEL_SIZE: usize =
std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
.unwrap_or(1024 * 20);
/// Capacity alert threshold for the double echo command channel
pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE
.checked_mul(10)
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct DoubleEcho {
}

impl DoubleEcho {
pub const MAX_BUFFER_SIZE: usize = 2048;
pub const MAX_BUFFER_SIZE: usize = 1024 * 20;

#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl TaskManager {
}

pub async fn run(mut self, shutdown_receiver: CancellationToken) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut interval = tokio::time::interval(Duration::from_millis(3));

loop {
tokio::select! {
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-tce-proxy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use topos_core::{
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100;
const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 100;
const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 100;
const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 5120;
const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 5120;
const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 5120;

// Maximum backoff retry timeout in seconds (1 hour)
const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600);
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-tce-proxy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ pub struct TceProxyWorker {
}

impl TceProxyWorker {
/// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet.
/// Construct a new [`TceProxyWorker`] with a 1024 * 20 items deep channel to send commands to and receive events from a TCE node on the given subnet.
/// The worker holds a [`crate::client::TceClient`]
pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> {
let (command_sender, mut command_rcv) = mpsc::channel::<TceProxyCommand>(128);
let (evt_sender, evt_rcv) = mpsc::channel::<TceProxyEvent>(128);
let (command_sender, mut command_rcv) = mpsc::channel::<TceProxyCommand>(1024 * 20);
let (evt_sender, evt_rcv) = mpsc::channel::<TceProxyEvent>(1024 * 20);
let (tce_client_shutdown_channel, shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);

Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-synchronizer/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Default for SynchronizerBuilder {
network_client: None,
store: None,
config: SynchronizationConfig::default(),
event_channel_size: 100,
event_channel_size: 1024,
shutdown: None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl AppContext {
validator_store: Arc<ValidatorStore>,
api_context: RuntimeContext,
) -> (Self, mpsc::Receiver<Events>) {
let (events, receiver) = mpsc::channel(100);
let (events, receiver) = mpsc::channel(1024 * 20);
(
Self {
is_validator,
Expand Down
Loading