Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: remove buffered messages when creating a new task (#269)
Browse files Browse the repository at this point in the history
Signed-off-by: Bastian Gruber <[email protected]>
Co-authored-by: Simon Paitrault <[email protected]>
  • Loading branch information
gruberb and Freyskeyd authored Jul 27, 2023
1 parent a182f42 commit 2963509
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
17 changes: 9 additions & 8 deletions crates/topos-tce-broadcast/src/task_manager_channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ impl TaskManager {

spawn(task.run());

if let Some(messages) = self.buffered_messages.remove(&cert.id) {
let sink = task_context.sink.clone();
spawn(async move {
for msg in messages {
_ = sink.send(msg).await;
}
});
}

entry.insert(task_context);
}
std::collections::hash_map::Entry::Occupied(_) => {},
Expand All @@ -117,14 +126,6 @@ impl TaskManager {
break;
}
}

for (certificate_id, messages) in &mut self.buffered_messages {
if let Some(task) = self.tasks.get(certificate_id) {
for msg in messages {
_ = task.sink.send(msg.clone()).await;
}
}
}
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions crates/topos-tce-broadcast/src/task_manager_futures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::future::IntoFuture;
use std::pin::Pin;
use tce_transport::{ProtocolEvents, ReliableBroadcastParams};
use tokio::sync::mpsc;
use tokio::{spawn, sync::mpsc};
use topos_core::uci::CertificateId;
use topos_metrics::DOUBLE_ECHO_ACTIVE_TASKS_COUNT;
use tracing::warn;
Expand Down Expand Up @@ -102,6 +102,15 @@ impl TaskManager {

self.running_tasks.push(task.into_future());

if let Some(messages) = self.buffered_messages.remove(&cert.id) {
let sink = task_context.sink.clone();
spawn(async move {
for msg in messages {
_ = sink.send(msg).await;
}
});
}

DOUBLE_ECHO_ACTIVE_TASKS_COUNT.inc();

entry.insert(task_context);
Expand Down Expand Up @@ -131,14 +140,6 @@ impl TaskManager {
break;
}
}

for (certificate_id, messages) in &mut self.buffered_messages {
if let Some(task) = self.tasks.get_mut(certificate_id) {
for msg in messages {
_ = task.sink.send(msg.clone()).await;
}
}
}
}
}
}
Expand Down

0 comments on commit 2963509

Please sign in to comment.