From eb199bdc54764f4b7deb46bfb9a8c480e6537dd0 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 11 Dec 2023 11:34:19 +0200 Subject: [PATCH 1/2] Remove extra boxing around `Unpin` requirement --- crates/sc-proof-of-time/src/source/gossip.rs | 21 ++++----- .../src/bin/subspace-farmer/commands/farm.rs | 15 +++--- .../src/bin/subspace-farmer/utils.rs | 29 +++++------- .../src/single_disk_farm/plotting.rs | 47 ++++++++++--------- .../subspace-networking/examples/metrics.rs | 30 ++++++------ .../subspace-networking/src/behavior/tests.rs | 6 +-- .../src/gossip_worker.rs | 20 ++++---- .../src/domain_worker_starter.rs | 5 +- domains/client/subnet-gossip/src/worker.rs | 16 +++---- 9 files changed, 91 insertions(+), 98 deletions(-) diff --git a/crates/sc-proof-of-time/src/source/gossip.rs b/crates/sc-proof-of-time/src/source/gossip.rs index f0061f31a9..ad9d9edce4 100644 --- a/crates/sc-proof-of-time/src/source/gossip.rs +++ b/crates/sc-proof-of-time/src/source/gossip.rs @@ -22,6 +22,7 @@ use std::collections::{HashMap, VecDeque}; use std::future::poll_fn; use std::hash::{Hash, Hasher}; use std::num::{NonZeroU32, NonZeroUsize}; +use std::pin::pin; use std::sync::{atomic, Arc}; use subspace_core_primitives::{PotCheckpoints, PotSeed, SlotNumber}; use tracing::{debug, error, trace, warn}; @@ -167,18 +168,16 @@ where /// should be running on a dedicated thread. pub async fn run(mut self) { let message_receiver = self.engine.lock().messages_for(self.topic); - let mut incoming_unverified_messages = Box::pin( - message_receiver - .filter_map(|notification| async move { - notification.sender.map(|sender| { - let proof = GossipProof::decode(&mut notification.message.as_ref()) - .expect("Only valid messages get here; qed"); - - (sender, proof) - }) + let incoming_unverified_messages = + pin!(message_receiver.filter_map(|notification| async move { + notification.sender.map(|sender| { + let proof = GossipProof::decode(&mut notification.message.as_ref()) + .expect("Only valid messages get here; qed"); + + (sender, proof) }) - .fuse(), - ); + })); + let mut incoming_unverified_messages = incoming_unverified_messages.fuse(); loop { let gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a0dc1c5837..5f3502fe5c 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -15,6 +15,7 @@ use std::fs; use std::net::SocketAddr; use std::num::{NonZeroU8, NonZeroUsize}; use std::path::PathBuf; +use std::pin::pin; use std::str::FromStr; use std::sync::Arc; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; @@ -598,7 +599,7 @@ where // event handlers drop(readers_and_pieces); - let farm_fut = run_future_in_dedicated_thread( + let farm_fut = pin!(run_future_in_dedicated_thread( Box::pin(async move { while let Some(result) = single_disk_farms_stream.next().await { let id = result?; @@ -608,26 +609,24 @@ where anyhow::Ok(()) }), "farmer-farm".to_string(), - )?; - let mut farm_fut = Box::pin(farm_fut).fuse(); + )?); - let networking_fut = run_future_in_dedicated_thread( + let networking_fut = pin!(run_future_in_dedicated_thread( Box::pin(async move { node_runner.run().await }), "farmer-networking".to_string(), - )?; - let mut networking_fut = Box::pin(networking_fut).fuse(); + )?); futures::select!( // Signal future _ = signal.fuse() => {}, // Farm future - result = farm_fut => { + result = farm_fut.fuse() => { result??; }, // Node runner future - _ = networking_fut => { + _ = networking_fut.fuse() => { info!("Node runner exited.") }, ); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs b/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs index 10a34708ce..709deff5a5 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs @@ -25,24 +25,21 @@ pub(crate) fn raise_fd_limit() { #[cfg(unix)] pub(crate) async fn shutdown_signal() { use futures::FutureExt; + use std::pin::pin; futures::future::select( - Box::pin( - signal::unix::signal(signal::unix::SignalKind::interrupt()) - .expect("Setting signal handlers must never fail") - .recv() - .map(|_| { - tracing::info!("Received SIGINT, shutting down farmer..."); - }), - ), - Box::pin( - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("Setting signal handlers must never fail") - .recv() - .map(|_| { - tracing::info!("Received SIGTERM, shutting down farmer..."); - }), - ), + pin!(signal::unix::signal(signal::unix::SignalKind::interrupt()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGINT, shutting down farmer..."); + }),), + pin!(signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGTERM, shutting down farmer..."); + }),), ) .await; } diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 5d0f0c9b69..a85bd6e01d 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -16,6 +16,7 @@ use std::fs::File; use std::io; use std::num::{NonZeroU16, NonZeroUsize}; use std::ops::Range; +use std::pin::pin; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -289,29 +290,31 @@ where let mut sector = Vec::new(); let mut sector_metadata = Vec::new(); - let plot_sector_fut = encode_sector::( - downloaded_sector, - EncodeSectorOptions { - sector_index, - erasure_coding, - pieces_in_sector, - sector_output: &mut sector, - sector_metadata_output: &mut sector_metadata, - encoding_semaphore: Some(encoding_semaphore), - table_generator: &mut table_generator, - }, - ); - - let plotted_sector = Handle::current().block_on(async { - select! { - plotting_result = Box::pin(plot_sector_fut).fuse() => { - plotting_result.map_err(PlottingError::from) + let plotted_sector = { + let plot_sector_fut = pin!(encode_sector::( + downloaded_sector, + EncodeSectorOptions { + sector_index, + erasure_coding, + pieces_in_sector, + sector_output: &mut sector, + sector_metadata_output: &mut sector_metadata, + encoding_semaphore: Some(encoding_semaphore), + table_generator: &mut table_generator, + }, + )); + + Handle::current().block_on(async { + select! { + plotting_result = plot_sector_fut.fuse() => { + plotting_result.map_err(PlottingError::from) + } + _ = stop_receiver.recv().fuse() => { + Err(PlottingError::FarmIsShuttingDown) + } } - _ = stop_receiver.recv().fuse() => { - Err(PlottingError::FarmIsShuttingDown) - } - } - })?; + })? + }; Ok((sector, sector_metadata, table_generator, plotted_sector)) }) diff --git a/crates/subspace-networking/examples/metrics.rs b/crates/subspace-networking/examples/metrics.rs index 96c6e0d3f3..406e81649c 100644 --- a/crates/subspace-networking/examples/metrics.rs +++ b/crates/subspace-networking/examples/metrics.rs @@ -114,23 +114,21 @@ async fn get_peer(peer_id: PeerId, node: Node) { #[cfg(unix)] pub(crate) async fn shutdown_signal() { + use std::pin::pin; + futures::future::select( - Box::pin( - signal::unix::signal(signal::unix::SignalKind::interrupt()) - .expect("Setting signal handlers must never fail") - .recv() - .map(|_| { - tracing::info!("Received SIGINT, shutting down farmer..."); - }), - ), - Box::pin( - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("Setting signal handlers must never fail") - .recv() - .map(|_| { - tracing::info!("Received SIGTERM, shutting down farmer..."); - }), - ), + pin!(signal::unix::signal(signal::unix::SignalKind::interrupt()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGINT, shutting down farmer..."); + }),), + pin!(signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGTERM, shutting down farmer..."); + }),), ) .await; } diff --git a/crates/subspace-networking/src/behavior/tests.rs b/crates/subspace-networking/src/behavior/tests.rs index 34db7acb6d..ccdf545483 100644 --- a/crates/subspace-networking/src/behavior/tests.rs +++ b/crates/subspace-networking/src/behavior/tests.rs @@ -230,7 +230,7 @@ async fn test_async_handler_works_with_pending_internal_future() { let (node_2, mut node_runner_2) = crate::construct(config_2).unwrap(); - let bootstrap_fut = Box::pin({ + tokio::spawn({ let node = node_2.clone(); async move { @@ -240,10 +240,6 @@ async fn test_async_handler_works_with_pending_internal_future() { } }); - tokio::spawn(async move { - bootstrap_fut.await; - }); - tokio::spawn(async move { node_runner_2.run().await; }); diff --git a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs index b7c0bf742b..666bd2ac11 100644 --- a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs +++ b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs @@ -12,6 +12,7 @@ use sp_core::twox_256; use sp_messenger::messages::ChainId; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; use std::collections::{BTreeMap, HashSet}; +use std::pin::pin; use std::sync::Arc; const LOG_TARGET: &str = "cross_chain_gossip_worker"; @@ -123,16 +124,15 @@ fn topic() -> Block::Hash { impl GossipWorker { /// Starts the Gossip message worker. pub async fn run(mut self) { - let mut incoming_cross_chain_messages = Box::pin( - self.gossip_engine - .lock() - .messages_for(topic::()) - .filter_map(|notification| async move { - Message::decode(&mut ¬ification.message[..]) - .ok() - .map(|msg| (notification.sender, msg)) - }), - ); + let mut incoming_cross_chain_messages = pin!(self + .gossip_engine + .lock() + .messages_for(topic::()) + .filter_map(|notification| async move { + Message::decode(&mut ¬ification.message[..]) + .ok() + .map(|msg| (notification.sender, msg)) + })); loop { let engine = self.gossip_engine.clone(); diff --git a/domains/client/domain-operator/src/domain_worker_starter.rs b/domains/client/domain-operator/src/domain_worker_starter.rs index 10933e695e..1a9fab1cc0 100644 --- a/domains/client/domain-operator/src/domain_worker_starter.rs +++ b/domains/client/domain-operator/src/domain_worker_starter.rs @@ -36,6 +36,7 @@ use sp_domains_fraud_proof::FraudProofApi; use sp_messenger::MessengerApi; use sp_runtime::traits::NumberFor; use sp_transaction_pool::runtime_api::TaggedTransactionQueue; +use std::pin::pin; use std::sync::Arc; use subspace_runtime_primitives::Balance; use tracing::{info, Instrument}; @@ -138,8 +139,8 @@ pub(super) async fn start_worker< .boxed() } }; - let mut new_slot_notification_stream = Box::pin(new_slot_notification_stream); - let mut acknowledgement_sender_stream = Box::pin(acknowledgement_sender_stream); + let mut new_slot_notification_stream = pin!(new_slot_notification_stream); + let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream); loop { tokio::select! { // Ensure any new slot/block import must handle first before the `acknowledgement_sender_stream` diff --git a/domains/client/subnet-gossip/src/worker.rs b/domains/client/subnet-gossip/src/worker.rs index fe444858c1..d1d44e7ceb 100644 --- a/domains/client/subnet-gossip/src/worker.rs +++ b/domains/client/subnet-gossip/src/worker.rs @@ -7,6 +7,7 @@ use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use sc_network_gossip::GossipEngine; use sp_runtime::traits::Block as BlockT; +use std::pin::pin; use std::sync::Arc; /// A worker plays the executor gossip protocol. @@ -49,14 +50,13 @@ where } pub(super) async fn run(mut self) { - let mut incoming = Box::pin( - self.gossip_engine - .lock() - .messages_for(topic::()) - .filter_map(|notification| async move { - GossipMessage::::decode(&mut ¬ification.message[..]).ok() - }), - ); + let mut incoming = pin!(self + .gossip_engine + .lock() + .messages_for(topic::()) + .filter_map(|notification| async move { + GossipMessage::::decode(&mut ¬ification.message[..]).ok() + })); loop { let engine = self.gossip_engine.clone(); From 8440b5ebda79e1213f7b9aa7282d6314e3ef9d59 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 11 Dec 2023 11:41:44 +0200 Subject: [PATCH 2/2] Fix fused future usage --- crates/sc-proof-of-time/src/source/gossip.rs | 11 +++++------ .../src/gossip_worker.rs | 18 +++++++++--------- domains/client/subnet-gossip/src/worker.rs | 18 +++++++++--------- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/crates/sc-proof-of-time/src/source/gossip.rs b/crates/sc-proof-of-time/src/source/gossip.rs index ad9d9edce4..c8399161e6 100644 --- a/crates/sc-proof-of-time/src/source/gossip.rs +++ b/crates/sc-proof-of-time/src/source/gossip.rs @@ -180,17 +180,16 @@ where let mut incoming_unverified_messages = incoming_unverified_messages.fuse(); loop { - let gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)); + let mut gossip_engine_poll = poll_fn(|cx| self.engine.lock().poll_unpin(cx)).fuse(); + futures::select! { - message = incoming_unverified_messages.next() => { - if let Some((sender, proof)) = message { - self.handle_proof_candidate(sender, proof).await; - } + (sender, proof) = incoming_unverified_messages.select_next_some() => { + self.handle_proof_candidate(sender, proof).await; }, message = self.to_gossip_receiver.select_next_some() => { self.handle_to_gossip_messages(message).await }, - _ = gossip_engine_poll.fuse() => { + _ = gossip_engine_poll => { error!("Gossip engine has terminated"); return; } diff --git a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs index 666bd2ac11..1a52c19166 100644 --- a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs +++ b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs @@ -12,6 +12,7 @@ use sp_core::twox_256; use sp_messenger::messages::ChainId; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; use std::collections::{BTreeMap, HashSet}; +use std::future::poll_fn; use std::pin::pin; use std::sync::Arc; @@ -124,7 +125,7 @@ fn topic() -> Block::Hash { impl GossipWorker { /// Starts the Gossip message worker. pub async fn run(mut self) { - let mut incoming_cross_chain_messages = pin!(self + let incoming_cross_chain_messages = pin!(self .gossip_engine .lock() .messages_for(topic::()) @@ -133,27 +134,26 @@ impl GossipWorker { .ok() .map(|msg| (notification.sender, msg)) })); + let mut incoming_cross_chain_messages = incoming_cross_chain_messages.fuse(); loop { let engine = self.gossip_engine.clone(); - let gossip_engine = futures::future::poll_fn(|cx| engine.lock().poll_unpin(cx)); + let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse(); futures::select! { - cross_chain_message = incoming_cross_chain_messages.next().fuse() => { + cross_chain_message = incoming_cross_chain_messages.next() => { if let Some((maybe_peer, msg)) = cross_chain_message { tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Network: {:?}", msg.chain_id); self.handle_cross_chain_message(msg, maybe_peer); } }, - cross_chain_message = self.gossip_msg_stream.next().fuse() => { - if let Some(msg) = cross_chain_message { - tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id); - self.handle_cross_chain_message(msg, None); - } + msg = self.gossip_msg_stream.select_next_some() => { + tracing::debug!(target: LOG_TARGET, "Incoming cross chain message for chain from Relayer: {:?}", msg.chain_id); + self.handle_cross_chain_message(msg, None); } - _ = gossip_engine.fuse() => { + _ = gossip_engine => { tracing::error!(target: LOG_TARGET, "Gossip engine has terminated."); return; } diff --git a/domains/client/subnet-gossip/src/worker.rs b/domains/client/subnet-gossip/src/worker.rs index d1d44e7ceb..5b05833d13 100644 --- a/domains/client/subnet-gossip/src/worker.rs +++ b/domains/client/subnet-gossip/src/worker.rs @@ -2,11 +2,12 @@ use crate::{ topic, BundleFor, BundleReceiver, GossipMessage, GossipMessageHandler, GossipValidator, LOG_TARGET, }; -use futures::{future, FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use sc_network_gossip::GossipEngine; use sp_runtime::traits::Block as BlockT; +use std::future::poll_fn; use std::pin::pin; use std::sync::Arc; @@ -50,20 +51,21 @@ where } pub(super) async fn run(mut self) { - let mut incoming = pin!(self + let incoming = pin!(self .gossip_engine .lock() .messages_for(topic::()) .filter_map(|notification| async move { GossipMessage::::decode(&mut ¬ification.message[..]).ok() })); + let mut incoming = incoming.fuse(); loop { let engine = self.gossip_engine.clone(); - let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); + let mut gossip_engine = poll_fn(|cx| engine.lock().poll_unpin(cx)).fuse(); futures::select! { - gossip_message = incoming.next().fuse() => { + gossip_message = incoming.next() => { if let Some(message) = gossip_message { tracing::debug!(target: LOG_TARGET, ?message, "Rebroadcasting an executor gossip message"); match message { @@ -73,12 +75,10 @@ where return } } - bundle = self.bundle_receiver.next().fuse() => { - if let Some(bundle) = bundle { - self.gossip_bundle(bundle); - } + bundle = self.bundle_receiver.select_next_some() => { + self.gossip_bundle(bundle); } - _ = gossip_engine.fuse() => { + _ = gossip_engine => { tracing::error!(target: LOG_TARGET, "Gossip engine has terminated."); return; }