Skip to content

Commit

Permalink
refactor: add simpler shutdown handling
Browse files Browse the repository at this point in the history
This change adds a shutdown crate that simplifies shutdown handling. The
API removes the need to create as many async move blocks and clone
broadcast channels.
  • Loading branch information
nathanielc committed Jan 14, 2025
1 parent f21b8a3 commit 97ca266
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 73 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"peer-svc",
"pipeline",
"recon",
"shutdown",
"sql",
"validation",
"beetle/iroh-bitswap",
Expand Down Expand Up @@ -182,6 +183,7 @@ serde_qs = "0.10.1"
serde_with = "2.1"
sha2 = { version = "0.10", default-features = false }
sha3 = "0.10"
shutdown = { path = "./shutdown/" }
smallvec = "1.10"
# pragma optimize hangs forver on 0.8, possibly due to libsqlite-sys upgrade
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio", "chrono"] }
Expand Down
3 changes: 3 additions & 0 deletions anchor-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ chrono.workspace = true

[features]
test-network = []

[dev-dependencies]
shutdown.workspace = true
34 changes: 11 additions & 23 deletions anchor-service/src/anchor_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl AnchorService {
/// - Store the TimeEvents using the AnchorClient
///
/// This function will run indefinitely, or until the process is shutdown.
pub async fn run(&mut self, shutdown_signal: impl Future<Output = ()>) {
pub async fn run(mut self, shutdown_signal: impl Future<Output = ()>) {
let shutdown_signal = shutdown_signal.fuse();
pin_mut!(shutdown_signal);

Expand Down Expand Up @@ -235,7 +235,8 @@ mod tests {
use ceramic_core::NodeKey;
use ceramic_sql::sqlite::SqlitePool;
use expect_test::expect_file;
use tokio::{sync::broadcast, time::sleep};
use shutdown::Shutdown;
use tokio::time::sleep;

use super::AnchorService;
use crate::{MockAnchorEventService, MockCas};
Expand All @@ -248,28 +249,22 @@ mod tests {
let node_id = NodeKey::random().id();
let anchor_interval = Duration::from_millis(5);
let anchor_batch_size = 1000000;
let mut anchor_service = AnchorService::new(
let anchor_service = AnchorService::new(
tx_manager,
event_service.clone(),
pool,
node_id,
anchor_interval,
anchor_batch_size,
);
let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1);
tokio::spawn(async move {
anchor_service
.run(async move {
let _ = shutdown_signal.recv().await;
})
.await
});
let shutdown = Shutdown::new();
tokio::spawn(anchor_service.run(shutdown.wait_fut()));
while event_service.events.lock().unwrap().is_empty() {
sleep(Duration::from_millis(1)).await;
}
expect_file!["./test-data/test_anchor_service_run.txt"]
.assert_debug_eq(&event_service.events.lock().unwrap());
shutdown_signal_tx.send(()).unwrap();
shutdown.shutdown();
}

#[tokio::test]
Expand All @@ -280,28 +275,21 @@ mod tests {
let node_id = NodeKey::random().id();
let anchor_interval = Duration::from_millis(5);
let anchor_batch_size = 1000000;
let mut anchor_service = AnchorService::new(
let anchor_service = AnchorService::new(
tx_manager,
event_service.clone(),
pool,
node_id,
anchor_interval,
anchor_batch_size,
);
let (shutdown_signal_tx, mut shutdown_signal) = broadcast::channel::<()>(1);
// let mut shutdown_signal = shutdown_signal_rx.resubscribe();
tokio::spawn(async move {
anchor_service
.run(async move {
let _ = shutdown_signal.recv().await;
})
.await
});
let shutdown = Shutdown::new();
tokio::spawn(anchor_service.run(shutdown.wait_fut()));
while event_service.events.lock().unwrap().is_empty() {
sleep(Duration::from_millis(1)).await;
}
expect_file!["./test-data/test_anchor_service_run_1.txt"]
.assert_debug_eq(&event_service.events.lock().unwrap());
shutdown_signal_tx.send(()).unwrap();
shutdown.shutdown();
}
}
1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ recon.workspace = true
serde.workspace = true
serde_ipld_dagcbor.workspace = true
serde_json.workspace = true
shutdown.workspace = true
swagger.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
7 changes: 4 additions & 3 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion::logical_expr::{col, lit, BuiltInWindowFunction, Expr, ExprFuncti
use futures::TryFutureExt;
use multiaddr::Protocol;
use recon::Key;
use shutdown::Shutdown;
use swagger::{ApiError, ByteArray};
#[cfg(not(target_env = "msvc"))]
use tikv_jemalloc_ctl::epoch;
Expand Down Expand Up @@ -401,7 +402,7 @@ where
model: Arc<M>,
p2p: P,
pipeline: Option<SessionContext>,
shutdown_signal: broadcast::Receiver<()>,
shutdown_signal: Shutdown,
) -> Self {
let (tx, event_rx) = tokio::sync::mpsc::channel::<EventInsert>(1024);
let event_store = model.clone();
Expand Down Expand Up @@ -433,7 +434,7 @@ where
event_store: Arc<M>,
mut event_rx: tokio::sync::mpsc::Receiver<EventInsert>,
node_id: NodeId,
mut shutdown_signal: broadcast::Receiver<()>,
shutdown_signal: Shutdown,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS));
Expand All @@ -455,7 +456,7 @@ where
events.extend(buf);
}
}
_ = shutdown_signal.recv() => {
_ = shutdown_signal.wait_fut() => {
tracing::debug!("Insert many task got shutdown signal");
shutdown = true;
}
Expand Down
5 changes: 3 additions & 2 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use mockall::{mock, predicate};
use multiaddr::Multiaddr;
use multibase::Base;
use recon::Key;
use shutdown::Shutdown;
use test_log::test;
use tokio::join;

Expand Down Expand Up @@ -202,8 +203,8 @@ where
M: EventService + 'static,
P: P2PService,
{
let (_, rx) = tokio::sync::broadcast::channel(1);
Server::new(node_id, network, interest, model, p2p, pipeline, rx)
let shutdown = Shutdown::new();
Server::new(node_id, network, interest, model, p2p, pipeline, shutdown)
}

#[test(tokio::test)]
Expand Down
8 changes: 5 additions & 3 deletions flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ tracing.workspace = true
ceramic-arrow-test.workspace = true
ceramic-pipeline.workspace = true
expect-test.workspace = true
tokio = { workspace = true, features = ["macros", "rt"] }
test-log.workspace = true
http.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
mockall.workspace = true
object_store.workspace = true
shutdown.workspace = true
test-log.workspace = true
tokio = { workspace = true, features = ["macros", "rt"] }
tokio-stream = { workspace = true, features = ["net"] }
tracing-subscriber.workspace = true

[package.metadata.cargo-machete]
ignored = [
Expand Down
1 change: 1 addition & 0 deletions one/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object_store.workspace = true
prometheus-client.workspace = true
recon.workspace = true
serde_ipld_dagcbor.workspace = true
shutdown.workspace = true
signal-hook = "0.3.17"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
swagger.workspace = true
Expand Down
Loading

0 comments on commit 97ca266

Please sign in to comment.