Skip to content

Commit

Permalink
add spawn! macro for spawning crucial tokio tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
daltoncoder committed May 24, 2024
1 parent 8a98c0b commit cca2482
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 27 deletions.
57 changes: 31 additions & 26 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,36 +313,41 @@ impl<C: Collection> Consensus<C> {
.take()
.expect("Consensus was tried to start before initialization");

task::spawn(async move {
let edge_node = epoch_state.spawn_edge_consensus(reconfigure_notify.clone());
epoch_state.start_current_epoch().await;

let shutdown_future = waiter.wait_for_shutdown();
pin!(shutdown_future);

loop {
let reconfigure_future = reconfigure_notify.notified();

select! {
biased;
_ = &mut shutdown_future => {
if let Some(consensus) = epoch_state.consensus.take() {
consensus.shutdown().await;
let panic_waiter = waiter.clone();
spawn!(
async move {
let edge_node = epoch_state.spawn_edge_consensus(reconfigure_notify.clone());
epoch_state.start_current_epoch().await;

let shutdown_future = waiter.wait_for_shutdown();
pin!(shutdown_future);

loop {
let reconfigure_future = reconfigure_notify.notified();

select! {
biased;
_ = &mut shutdown_future => {
if let Some(consensus) = epoch_state.consensus.take() {
consensus.shutdown().await;
}
edge_node.shutdown().await;
epoch_state.shutdown();
break
}
_ = reconfigure_future => {
epoch_state.move_to_next_epoch().await;
continue
}
edge_node.shutdown().await;
epoch_state.shutdown();
break
}
_ = reconfigure_future => {
epoch_state.move_to_next_epoch().await;
continue
}
}
}

// Notify the epoch state that it is time to shutdown.
shutdown_notify_epoch_state.notify_waiters();
});
// Notify the epoch state that it is time to shutdown.
shutdown_notify_epoch_state.notify_waiters();
},
"CONSENSUS",
crucial(panic_waiter)
);
}
}

Expand Down
55 changes: 55 additions & 0 deletions core/interfaces/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,30 @@ macro_rules! partial {
};
}

/// A macro to spawn tokio tasks in the binary
/// takes the future you want to spawn as the first argument, a name for the tasks as the second
/// argument If the task being spawned is crucial to the binary pass a shutdown waiter to it as an
/// optional third argument to ensure the node shutdowns if it panics
#[macro_export]
macro_rules! spawn {
($future:expr, $name:expr, crucial($waiter:expr)) => {
tokio::task::Builder::new().name(concat!($name,"#WAITER")).spawn(async move{
let handle = tokio::task::Builder::new().name($name).spawn($future).expect("Tokio task created outside of tokio runtime");

if let Err(e) = handle.await {
tracing::error!("Crucial task:{} had a panic: {:?} \n Signaling to shutdown the rest of the node",$name, e);
$crate::ShutdownWaiter::trigger_shutdown(&$waiter);
}
}).expect("Tokio task created outside of tokio runtime")
};
($future:expr, $name:expr) => {
tokio::task::Builder::new().name($name).spawn($future).expect("Tokio task created outside of tokio runtime");
};
}

#[cfg(test)]
mod tests {
use std::time::Duration;
partial!(BlanketCollection {});

// This test only has to be compiled in order to be considered passing.
Expand All @@ -142,4 +164,37 @@ mod tests {
fn expect_collection<C: crate::Collection>() {}
expect_collection::<BlanketCollection>();
}

#[test]
fn test_spawn_macro_panic_shutdown() {
// Ensure that a spawned task using our macro signals a shutdown when it panics
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let shutdown_controller = crate::ShutdownController::new(false);
let waiter = shutdown_controller.waiter();
let panic_waiter = shutdown_controller.waiter();

// Spawn a thread using the spawn macro that panics after 20ms
spawn!(
async move {
tokio::time::sleep(Duration::from_millis(20)).await;
panic!();
},
"TEST",
crucial(panic_waiter)
);

// If the thread that panics doesnt trigger a shutdown afer it panics, fail the test
if let Err(e) =
tokio::time::timeout(Duration::from_millis(100), waiter.wait_for_shutdown())
.await
{
println!("{e}");
panic!("Failed to signal a shutdown when spawn thread panicked");
}
});
}
}
2 changes: 1 addition & 1 deletion core/interfaces/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use ink_quill::{ToDigest, TranscriptBuilder};
pub use lightning_schema::{AutoImplSerde, LightningMessage};

// Re-export top level modules and highly used stuff.
pub use crate::{c, fdi, partial, schema, types, Node, ShutdownWaiter};
pub use crate::{c, fdi, partial, schema, spawn, types, Node, ShutdownWaiter};

#[rustfmt::skip]
pub use crate::{
Expand Down

0 comments on commit cca2482

Please sign in to comment.