Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

chore: cleanup feature flags, merge taskmanager #272

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/docker_build_push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ jobs:
uses: ./.github/workflows/docker_utils.yml
secrets: inherit

broadcast_channels:
uses: ./.github/workflows/docker_utils.yml
secrets: inherit
with:
features: broadcast_via_channels

network:
uses: ./.github/workflows/docker_utils.yml
secrets: inherit
Expand Down
7 changes: 2 additions & 5 deletions crates/topos-tce-broadcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ rand.workspace = true

topos-test-sdk = { path = "../topos-test-sdk/" }

[features]
task-manager-channels = []

[[bench]]
name = "double_echo"
path = "benches/double_echo.rs"
harness = false
path = "benches/benchmark.rs"
harness = false
20 changes: 20 additions & 0 deletions crates/topos-tce-broadcast/benches/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
mod double_echo;

pub fn criterion_benchmark(c: &mut Criterion) {
let certificates = 10_000;

let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("double_echo", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async { double_echo::processing_double_echo(certificates).await })
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
141 changes: 123 additions & 18 deletions crates/topos-tce-broadcast/benches/double_echo.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,125 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
mod task_manager;

pub fn criterion_benchmark(c: &mut Criterion) {
let certificates = 10_000;

let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("double_echo", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async { task_manager::processing_double_echo(certificates).await })
})
});
use std::collections::HashSet;
use tce_transport::{ProtocolEvents, ReliableBroadcastParams};
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use topos_tce_broadcast::double_echo::DoubleEcho;
use topos_tce_broadcast::sampler::SubscriptionsView;
use topos_tce_broadcast::DoubleEchoCommand;
use topos_test_sdk::certificates::create_certificate_chain;
use topos_test_sdk::constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1};

const CHANNEL_SIZE: usize = 256_000;

#[derive(Clone)]
struct TceParams {
nb_peers: usize,
broadcast_params: ReliableBroadcastParams,
}

struct Context {
event_receiver: Receiver<ProtocolEvents>,
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
pub async fn processing_double_echo(n: u64) {
let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE);

let (cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE);
let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE);
let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);

let params = TceParams {
nb_peers: 10,
broadcast_params: ReliableBroadcastParams {
echo_threshold: 8,
ready_threshold: 5,
delivery_threshold: 8,
},
};

let mut ctx = Context { event_receiver };

let double_echo = DoubleEcho::new(
params.clone().broadcast_params,
cmd_receiver,
event_sender,
double_echo_shutdown_receiver,
);

// List of peers
let mut peers = HashSet::new();
for i in 0..params.nb_peers {
let peer = topos_p2p::utils::local_key_pair(Some(i as u8))
.public()
.to_peer_id();
peers.insert(peer);
}

let msg = SubscriptionsView {
echo: peers.clone(),
ready: peers.clone(),
network_size: params.nb_peers,
};

tokio::spawn(double_echo.run(subscriptions_view_receiver));

subscriptions_view_sender.send(msg.clone()).await.unwrap();

let certificates =
create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], n as usize);

let double_echo_selected_echo = msg
.echo
.iter()
.take(params.broadcast_params.echo_threshold)
.cloned()
.collect::<Vec<_>>();

let double_echo_selected_ready = msg
.ready
.iter()
.take(params.broadcast_params.delivery_threshold)
.cloned()
.collect::<Vec<_>>();

for cert in &certificates {
let _ = cmd_sender
.send(DoubleEchoCommand::Broadcast {
cert: cert.clone(),
need_gossip: true,
})
.await;
}

for cert in &certificates {
for p in &double_echo_selected_echo {
let _ = cmd_sender
.send(DoubleEchoCommand::Echo {
from_peer: *p,
certificate_id: cert.id,
})
.await;
}

for p in &double_echo_selected_ready {
let _ = cmd_sender
.send(DoubleEchoCommand::Ready {
from_peer: *p,
certificate_id: cert.id,
})
.await;
}
}

let mut count = 0;

while let Some(event) = ctx.event_receiver.recv().await {
if let ProtocolEvents::CertificateDelivered { .. } = event {
count += 1;

if count == n {
break;
}
}
}
}
118 changes: 0 additions & 118 deletions crates/topos-tce-broadcast/benches/task_manager.rs

This file was deleted.

6 changes: 0 additions & 6 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@ lazy_static! {
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel between double echo and the task manager
pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize =
std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(20_480);
/// Size of the channel to send protocol events from the double echo
pub static ref PROTOCOL_CHANNEL_SIZE: usize =
std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ impl BroadcastState {
// we update the status to Delivered and change the status
if !self.status.is_delivered() && self.reached_delivery_threshold() {
self.status = self.status.delivered();

debug!(
"📝 Certificate {} is now {}",
&self.certificate.id, self.status
Expand All @@ -136,7 +135,6 @@ impl BroadcastState {
);

DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL.inc();

_ = self
.event_sender
.try_send(ProtocolEvents::CertificateDelivered {
Expand Down
Loading
Loading