From c0b4a8eb2c01e775452548be54603e804e5ee92d Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 11 Dec 2023 11:17:20 +0200 Subject: [PATCH] Relax `Send` and `Unpin` requirements for futures spawned using `run_future_in_dedicated_thread` --- .../src/bin/subspace-farmer/commands/farm.rs | 12 +++++++---- crates/subspace-farmer/src/utils.rs | 11 ++++++---- crates/subspace-farmer/src/utils/tests.rs | 21 +++++++------------ 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a0dc1c5837..2e64f59e55 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -406,7 +406,11 @@ where )); let _piece_cache_worker = run_future_in_dedicated_thread( - Box::pin(piece_cache_worker.run(piece_getter.clone())), + { + let future = piece_cache_worker.run(piece_getter.clone()); + + move || future + }, "cache-worker".to_string(), ); @@ -599,20 +603,20 @@ where drop(readers_and_pieces); let farm_fut = run_future_in_dedicated_thread( - Box::pin(async move { + move || async move { while let Some(result) = single_disk_farms_stream.next().await { let id = result?; info!(%id, "Farm exited successfully"); } anyhow::Ok(()) - }), + }, "farmer-farm".to_string(), )?; let mut farm_fut = Box::pin(farm_fut).fuse(); let networking_fut = run_future_in_dedicated_thread( - Box::pin(async move { node_runner.run().await }), + move || async move { node_runner.run().await }, "farmer-networking".to_string(), )?; let mut networking_fut = Box::pin(networking_fut).fuse(); diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index e833cfbe46..c396c52c53 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -11,7 +11,7 @@ use futures::future::Either; use rayon::ThreadBuilder; use std::future::Future; use std::ops::Deref; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use std::{io, thread}; use tokio::runtime::Handle; @@ -94,12 +94,13 @@ impl Deref for JoinOnDrop { /// Runs future on a dedicated thread with the specified name, will block on drop until background /// thread with future is stopped too, ensuring nothing is left in memory -pub fn run_future_in_dedicated_thread( - future: Fut, +pub fn run_future_in_dedicated_thread( + create_future: CreateFut, thread_name: String, ) -> io::Result> + Send> where - Fut: Future + Unpin + Send + 'static, + CreateFut: (FnOnce() -> Fut) + Send + 'static, + Fut: Future + 'static, T: Send + 'static, { let (drop_tx, drop_rx) = oneshot::channel::<()>(); @@ -108,6 +109,8 @@ where let join_handle = thread::Builder::new().name(thread_name).spawn(move || { let _tokio_handle_guard = handle.enter(); + let future = pin!(create_future()); + let result = match handle.block_on(futures::future::select(future, drop_rx)) { Either::Left((result, _)) => result, Either::Right(_) => { diff --git a/crates/subspace-farmer/src/utils/tests.rs b/crates/subspace-farmer/src/utils/tests.rs index 75ead550a5..1da3e7f0d0 100644 --- a/crates/subspace-farmer/src/utils/tests.rs +++ b/crates/subspace-farmer/src/utils/tests.rs @@ -4,13 +4,10 @@ use tokio::sync::oneshot; #[tokio::test] async fn run_future_in_dedicated_thread_ready() { - let value = run_future_in_dedicated_thread( - Box::pin(async { future::ready(1).await }), - "ready".to_string(), - ) - .unwrap() - .await - .unwrap(); + let value = run_future_in_dedicated_thread(|| future::ready(1), "ready".to_string()) + .unwrap() + .await + .unwrap(); assert_eq!(value, 1); } @@ -19,11 +16,7 @@ async fn run_future_in_dedicated_thread_ready() { async fn run_future_in_dedicated_thread_cancellation() { // This may hang if not implemented correctly drop( - run_future_in_dedicated_thread( - Box::pin(async { future::pending::<()>().await }), - "cancellation".to_string(), - ) - .unwrap(), + run_future_in_dedicated_thread(future::pending::<()>, "cancellation".to_string()).unwrap(), ); } @@ -44,11 +37,11 @@ fn run_future_in_dedicated_thread_tokio_on_drop() { tokio::runtime::Runtime::new().unwrap().block_on(async { drop(run_future_in_dedicated_thread( - Box::pin(async { + move || async move { let s = S; let _ = receiver.await; drop(s); - }), + }, "tokio_on_drop".to_string(), )); });