diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 2534874867..02fdc7409f 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -1074,83 +1074,82 @@ impl SingleDiskFarm { } }; - let handle = Handle::current(); - let span = span.clone(); - thread_pool.install(move || { - let _span_guard = span.enter(); + let farming_fut = async move { + if start_receiver.recv().await.is_err() { + // Dropped before starting + return Ok(()); + } - let farming_fut = async move { - if start_receiver.recv().await.is_err() { - // Dropped before starting + if let Some(farming_delay) = delay_farmer_receiver { + if farming_delay.await.is_err() { + // Dropped before resolving return Ok(()); } + } - if let Some(farming_delay) = delay_farmer_receiver { - if farming_delay.await.is_err() { - // Dropped before resolving - return Ok(()); - } - } - - if cfg!(windows) { - let plot = RayonFiles::open_with( + if cfg!(windows) { + let plot = thread_pool.install(|| { + RayonFiles::open_with( &directory.join(Self::PLOT_FILE), UnbufferedIoFileWindows::open, - )?; - let plot_audit = PlotAudit::new(&plot); - - let farming_options = FarmingOptions { - public_key, - reward_address, - node_client, - plot_audit, - sectors_metadata, - kzg, - erasure_coding, - handlers, - modifying_sector_index, - slot_info_notifications: slot_info_forwarder_receiver, - }; - farming::(farming_options).await - } else { - let plot = RayonFiles::open(&directory.join(Self::PLOT_FILE))?; - let plot_audit = PlotAudit::new(&plot); - - let farming_options = FarmingOptions { - public_key, - reward_address, - node_client, - plot_audit, - sectors_metadata, - kzg, - erasure_coding, - handlers, - modifying_sector_index, - slot_info_notifications: slot_info_forwarder_receiver, - }; - farming::(farming_options).await - } - }; + ) + })?; + let plot_audit = PlotAudit::new(&plot); + + let farming_options = FarmingOptions { + public_key, + reward_address, + node_client, + plot_audit, + sectors_metadata, + kzg, + erasure_coding, + handlers, + modifying_sector_index, + slot_info_notifications: slot_info_forwarder_receiver, + thread_pool, + }; + farming::(farming_options).await + } else { + let plot = thread_pool + .install(move || RayonFiles::open(&directory.join(Self::PLOT_FILE)))?; + let plot_audit = PlotAudit::new(&plot); + + let farming_options = FarmingOptions { + public_key, + reward_address, + node_client, + plot_audit, + sectors_metadata, + kzg, + erasure_coding, + handlers, + modifying_sector_index, + slot_info_notifications: slot_info_forwarder_receiver, + thread_pool, + }; + farming::(farming_options).await + } + }; - handle.block_on(async { - select! { - farming_result = farming_fut.fuse() => { - if let Err(error) = farming_result - && let Some(error_sender) = error_sender.lock().take() - && let Err(error) = error_sender.send(error.into()) - { - error!( - %error, - "Farming failed to send error to background task", - ); - } - } - _ = stop_receiver.recv().fuse() => { - // Nothing, just exit + Handle::current().block_on(async { + select! { + farming_result = farming_fut.fuse() => { + if let Err(error) = farming_result + && let Some(error_sender) = error_sender.lock().take() + && let Err(error) = error_sender.send(error.into()) + { + error!( + %error, + "Farming failed to send error to background task", + ); } } - }); - }) + _ = stop_receiver.recv().fuse() => { + // Nothing, just exit + } + } + }); } }); let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false); diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 2d34958420..9639d606c2 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -9,7 +9,7 @@ use futures::channel::mpsc; use futures::StreamExt; use parity_scale_codec::{Decode, Encode, Error, Input, Output}; use parking_lot::Mutex; -use rayon::ThreadPoolBuildError; +use rayon::{ThreadPool, ThreadPoolBuildError}; use std::sync::Arc; use std::time::{Duration, Instant}; use std::{fmt, io}; @@ -23,7 +23,7 @@ use subspace_farmer_components::ReadAtSync; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; use thiserror::Error; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn, Span}; /// Auditing details #[derive(Debug, Copy, Clone, Encode, Decode)] @@ -331,6 +331,7 @@ pub(super) struct FarmingOptions { pub(super) handlers: Arc, pub(super) modifying_sector_index: Arc>>, pub(super) slot_info_notifications: mpsc::Receiver, + pub(super) thread_pool: ThreadPool, } /// Starts farming process. @@ -356,6 +357,7 @@ where handlers, modifying_sector_index, mut slot_info_notifications, + thread_pool, } = farming_options; let farmer_app_info = node_client @@ -367,6 +369,7 @@ where let farming_timeout = farmer_app_info.farming_timeout; let table_generator = Arc::new(Mutex::new(PosTable::generator())); + let span = Span::current(); while let Some(slot_info) = slot_info_notifications.next().await { let result: Result<(), FarmingError> = try { @@ -380,15 +383,19 @@ where let modifying_sector_guard = modifying_sector_index.read().await; let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied(); - plot_audit.audit(PlotAuditOptions:: { - public_key: &public_key, - reward_address: &reward_address, - slot_info, - sectors_metadata: §ors_metadata, - kzg: &kzg, - erasure_coding: &erasure_coding, - maybe_sector_being_modified, - table_generator: &table_generator, + thread_pool.install(|| { + let _span_guard = span.enter(); + + plot_audit.audit(PlotAuditOptions:: { + public_key: &public_key, + reward_address: &reward_address, + slot_info, + sectors_metadata: §ors_metadata, + kzg: &kzg, + erasure_coding: &erasure_coding, + maybe_sector_being_modified, + table_generator: &table_generator, + }) })? }; @@ -401,6 +408,8 @@ where a_solution_distance.cmp(&b_solution_distance) }); + let mut sectors_solutions = sectors_solutions.into_iter(); + handlers .farming_notification .call_simple(&FarmingNotification::Auditing(AuditingDetails { @@ -408,7 +417,13 @@ where time: start.elapsed(), })); - 'solutions_processing: for (sector_index, sector_solutions) in sectors_solutions { + 'solutions_processing: while let Some((sector_index, sector_solutions)) = thread_pool + .install(|| { + let _span_guard = span.enter(); + + sectors_solutions.next() + }) + { if sector_solutions.is_empty() { continue; }