diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 548282e7c2..1f5d40a769 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -29,8 +29,8 @@ use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::ss58::parse_ss58_reward_address; use subspace_farmer::utils::{ - all_cpu_cores, create_tokio_thread_pool_manager_for_pinned_nodes, - run_future_in_dedicated_thread, thread_pool_core_indices, AsyncJoinOnDrop, + all_cpu_cores, create_plotting_thread_pool_manager, run_future_in_dedicated_thread, + thread_pool_core_indices, AsyncJoinOnDrop, }; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; @@ -454,13 +454,10 @@ where )); let all_cpu_cores = all_cpu_cores(); - let plotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_nodes( - "plotting", - plotting_thread_pool_core_indices, - )?; - let replotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_nodes( - "replotting", - replotting_thread_pool_core_indices, + let plotting_thread_pool_manager = create_plotting_thread_pool_manager( + plotting_thread_pool_core_indices + .into_iter() + .zip(replotting_thread_pool_core_indices), )?; let farming_thread_pool_size = farming_thread_pool_size .map(|farming_thread_pool_size| farming_thread_pool_size.get()) @@ -519,7 +516,6 @@ where farm_during_initial_plotting, farming_thread_pool_size, plotting_thread_pool_manager: plotting_thread_pool_manager.clone(), - replotting_thread_pool_manager: replotting_thread_pool_manager.clone(), plotting_delay: Some(plotting_delay_receiver), }, disk_farm_index, diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 6e73b95279..b3126ef801 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -17,7 +17,7 @@ pub use crate::single_disk_farm::plotting::PlottingError; use crate::single_disk_farm::plotting::{ plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, }; -use crate::thread_pool_manager::ThreadPoolManager; +use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::KNOWN_PEERS_CACHE_SIZE; use async_lock::RwLock; @@ -288,10 +288,7 @@ pub struct SingleDiskFarmOptions { /// compute-intensive operations during proving) pub farming_thread_pool_size: usize, /// Thread pool manager used for plotting - pub plotting_thread_pool_manager: ThreadPoolManager, - /// Thread pool manager used for replotting, typically smaller pool than for plotting to not - /// affect farming as much - pub replotting_thread_pool_manager: ThreadPoolManager, + pub plotting_thread_pool_manager: PlottingThreadPoolManager, /// Notification for plotter to start, can be used to delay plotting until some initialization /// has happened externally pub plotting_delay: Option>, @@ -625,7 +622,6 @@ impl SingleDiskFarm { downloading_semaphore, farming_thread_pool_size, plotting_thread_pool_manager, - replotting_thread_pool_manager, plotting_delay, farm_during_initial_plotting, } = options; @@ -929,7 +925,6 @@ impl SingleDiskFarm { sectors_to_plot_receiver, downloading_semaphore, plotting_thread_pool_manager, - replotting_thread_pool_manager, stop_receiver: &mut stop_receiver.resubscribe(), }; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 7aa4bea139..ee8523fb4a 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -2,7 +2,7 @@ use crate::single_disk_farm::{ BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, RESERVED_PLOT_METADATA, }; -use crate::thread_pool_manager::ThreadPoolManager; +use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::AsyncJoinOnDrop; use crate::{node_client, NodeClient}; use async_lock::RwLock; @@ -116,8 +116,7 @@ pub(super) struct PlottingOptions<'a, NC, PG> { /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process pub(crate) downloading_semaphore: Arc, - pub(super) plotting_thread_pool_manager: ThreadPoolManager, - pub(super) replotting_thread_pool_manager: ThreadPoolManager, + pub(super) plotting_thread_pool_manager: PlottingThreadPoolManager, pub(super) stop_receiver: &'a mut broadcast::Receiver<()>, } @@ -151,7 +150,6 @@ where mut sectors_to_plot_receiver, downloading_semaphore, plotting_thread_pool_manager, - replotting_thread_pool_manager, stop_receiver, } = plotting_options; @@ -324,10 +322,11 @@ where }) }; + let thread_pools = plotting_thread_pool_manager.get_thread_pools(); let thread_pool = if replotting { - replotting_thread_pool_manager.get_thread_pool() + &thread_pools.replotting } else { - plotting_thread_pool_manager.get_thread_pool() + &thread_pools.plotting }; // Give a chance to interrupt plotting if necessary diff --git a/crates/subspace-farmer/src/thread_pool_manager.rs b/crates/subspace-farmer/src/thread_pool_manager.rs index 1a6d6675d4..fd86ef160e 100644 --- a/crates/subspace-farmer/src/thread_pool_manager.rs +++ b/crates/subspace-farmer/src/thread_pool_manager.rs @@ -4,35 +4,42 @@ use std::num::NonZeroUsize; use std::ops::Deref; use std::sync::Arc; +/// A wrapper around thread pool pair for plotting purposes +#[derive(Debug)] +pub struct PlottingThreadPoolPair { + pub plotting: ThreadPool, + pub replotting: ThreadPool, +} + #[derive(Debug)] struct Inner { - thread_pools: Vec, + thread_pool_pairs: Vec, } -/// Wrapper around [`ThreadPool`] that on `Drop` will return thread pool back into corresponding -/// [`ThreadPoolManager`]. +/// Wrapper around [`PlottingThreadPoolPair`] that on `Drop` will return thread pool back into corresponding +/// [`PlottingThreadPoolManager`]. #[derive(Debug)] -pub struct ThreadPoolGuard { +pub struct PlottingThreadPoolsGuard { inner: Arc<(Mutex, Condvar)>, - thread_pool: Option, + thread_pool_pair: Option, } -impl Deref for ThreadPoolGuard { - type Target = ThreadPool; +impl Deref for PlottingThreadPoolsGuard { + type Target = PlottingThreadPoolPair; fn deref(&self) -> &Self::Target { - self.thread_pool + self.thread_pool_pair .as_ref() .expect("Value exists until `Drop`; qed") } } -impl Drop for ThreadPoolGuard { +impl Drop for PlottingThreadPoolsGuard { fn drop(&mut self) { let (mutex, cvar) = &*self.inner; let mut inner = mutex.lock(); - inner.thread_pools.push( - self.thread_pool + inner.thread_pool_pairs.push( + self.thread_pool_pair .take() .expect("Happens only once in `Drop`; qed"), ); @@ -40,33 +47,38 @@ impl Drop for ThreadPoolGuard { } } -/// Thread pool manager. +/// Plotting thread pool manager. +/// +/// This abstraction wraps a set of thread pool pairs and allows to use them one at a time. /// -/// This abstraction wraps a set of thread pools and allows to use them one at a time. +/// Each pair contains one thread pool for plotting purposes and one for replotting, this is because +/// they'll share the same set of CPU cores in most cases and wit would be inefficient to use them +/// concurrently. /// /// For example on machine with 64 logical cores and 4 NUMA nodes it would be recommended to create -/// 4 thread pools with 16 threads each, which would mean work done within thread pool is tied to -/// that thread pool. +/// 4 thread pools with 16 threads each plotting thread pool and 8 threads in each replotting thread +/// pool, which would mean work done within thread pool is tied to CPU cores dedicated for that +/// thread pool. #[derive(Debug, Clone)] -pub struct ThreadPoolManager { +pub struct PlottingThreadPoolManager { inner: Arc<(Mutex, Condvar)>, } -impl ThreadPoolManager { +impl PlottingThreadPoolManager { /// Create new thread pool manager by instantiating `thread_pools` thread pools using /// `create_thread_pool`. /// /// `create_thread_pool` takes one argument `thread_pool_index`. pub fn new( - create_thread_pool: C, - thread_pools: NonZeroUsize, + create_thread_pools: C, + thread_pool_pairs: NonZeroUsize, ) -> Result where - C: FnMut(usize) -> Result, + C: FnMut(usize) -> Result, { let inner = Inner { - thread_pools: (0..thread_pools.get()) - .map(create_thread_pool) + thread_pool_pairs: (0..thread_pool_pairs.get()) + .map(create_thread_pools) .collect::, _>>()?, }; @@ -75,23 +87,23 @@ impl ThreadPoolManager { }) } - /// Get one of inner thread pools, will block until one is available if needed + /// Get one of inner thread pool pairs, will block until one is available if needed #[must_use] - pub fn get_thread_pool(&self) -> ThreadPoolGuard { + pub fn get_thread_pools(&self) -> PlottingThreadPoolsGuard { let (mutex, cvar) = &*self.inner; let mut inner = mutex.lock(); - let thread_pool = inner.thread_pools.pop().unwrap_or_else(|| { + let thread_pool_pair = inner.thread_pool_pairs.pop().unwrap_or_else(|| { cvar.wait(&mut inner); - inner.thread_pools.pop().expect( + inner.thread_pool_pairs.pop().expect( "Guaranteed by parking_lot's API to happen when thread pool is inserted; qed", ) }); - ThreadPoolGuard { + PlottingThreadPoolsGuard { inner: Arc::clone(&self.inner), - thread_pool: Some(thread_pool), + thread_pool_pair: Some(thread_pool_pair), } } } diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index c95e41d03f..7d47edd68e 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -5,11 +5,11 @@ pub mod ss58; #[cfg(test)] mod tests; -use crate::thread_pool_manager::ThreadPoolManager; +use crate::thread_pool_manager::{PlottingThreadPoolManager, PlottingThreadPoolPair}; use futures::channel::oneshot; use futures::channel::oneshot::Canceled; use futures::future::Either; -use rayon::{ThreadBuilder, ThreadPoolBuildError, ThreadPoolBuilder}; +use rayon::{ThreadBuilder, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; use std::future::Future; use std::num::NonZeroUsize; use std::ops::Deref; @@ -240,14 +240,15 @@ pub fn thread_pool_core_indices( if let Some(thread_pools) = thread_pools { let mut thread_pool_core_indices = Vec::::with_capacity(thread_pools.get()); + let total_cpu_cores = all_numa_nodes + .iter() + .flat_map(|set| set.cpu_cores()) + .count(); + if let Some(thread_pool_size) = thread_pool_size { // If thread pool size is fixed, loop over all CPU cores as many times as necessary and // assign contiguous ranges of CPU cores to corresponding thread pools - let total_cpu_cores = all_numa_nodes - .iter() - .flat_map(|set| set.cpu_cores()) - .count(); for _ in 0..thread_pools.get() { let cpu_cores_range = if let Some(last_cpu_index) = thread_pool_core_indices .last() @@ -273,18 +274,22 @@ pub fn thread_pool_core_indices( }); } } else { - // If thread pool size is not fixed, we iterate over all NUMA nodes as many times as - // necessary + // If thread pool size is not fixed, create threads pools with `total_cpu_cores/thread_pools` threads - for thread_pool_index in 0..thread_pools.get() { - thread_pool_core_indices.push(CpuCoreSet { - cores: all_numa_nodes[thread_pool_index % all_numa_nodes.len()] - .cores - .clone(), + let all_cpu_cores = all_numa_nodes + .iter() + .flat_map(|cpu_core_set| cpu_core_set.cores.iter()) + .copied() + .collect::>(); + + thread_pool_core_indices = all_cpu_cores + .chunks_exact(total_cpu_cores / thread_pools) + .map(|cpu_cores| CpuCoreSet { + cores: cpu_cores.to_vec(), #[cfg(feature = "numa")] topology: topology.clone(), - }); - } + }) + .collect(); } thread_pool_core_indices } else { @@ -293,47 +298,68 @@ pub fn thread_pool_core_indices( } } -/// Creates thread pools for each of CPU core set with number of threads corresponding to number of cores in -/// each set and pins threads to all of those CPU cores (all at once, not thread per core). Each thread will -/// also have Tokio context available. +fn create_plotting_thread_pool_manager_thread_pool_pair( + thread_prefix: &'static str, + thread_pool_index: usize, + cpu_core_set: CpuCoreSet, +) -> Result { + ThreadPoolBuilder::new() + .thread_name(move |thread_index| { + format!("{thread_prefix}-{thread_pool_index}.{thread_index}") + }) + .num_threads(cpu_core_set.cpu_cores().len()) + .spawn_handler({ + let handle = Handle::current(); + + rayon_custom_spawn_handler(move |thread| { + let cpu_core_set = cpu_core_set.clone(); + let handle = handle.clone(); + + move || { + cpu_core_set.pin_current_thread(); + drop(cpu_core_set); + + let _guard = handle.enter(); + + task::block_in_place(|| thread.run()) + } + }) + }) + .build() +} + +/// Creates thread pool pairs for each of CPU core set pair with number of plotting and replotting threads corresponding +/// to number of cores in each set and pins threads to all of those CPU cores (each thread to all cors in a set, not +/// thread per core). Each thread will also have Tokio context available. /// /// The easiest way to obtain CPUs is using [`all_cpu_cores`], but [`thread_pool_core_indices`] in case -/// support for user customizations is desired. -pub fn create_tokio_thread_pool_manager_for_pinned_nodes( - thread_prefix: &'static str, - mut cpu_core_sets: Vec, -) -> Result { +/// support for user customizations is desired. They will then have to be composed into pairs for this function. +pub fn create_plotting_thread_pool_manager( + mut cpu_core_sets: I, +) -> Result +where + I: ExactSizeIterator, +{ let total_thread_pools = cpu_core_sets.len(); - ThreadPoolManager::new( + PlottingThreadPoolManager::new( |thread_pool_index| { - let cpu_core_set = cpu_core_sets - .pop() + let (plotting_cpu_core_set, replotting_cpu_core_set) = cpu_core_sets + .next() .expect("Number of thread pools is the same as cpu core sets; qed"); - ThreadPoolBuilder::new() - .thread_name(move |thread_index| { - format!("{thread_prefix}-{thread_pool_index}.{thread_index}") - }) - .num_threads(cpu_core_set.cpu_cores().len()) - .spawn_handler({ - let handle = Handle::current(); - - rayon_custom_spawn_handler(move |thread| { - let cpu_core_set = cpu_core_set.clone(); - let handle = handle.clone(); - - move || { - cpu_core_set.pin_current_thread(); - drop(cpu_core_set); - - let _guard = handle.enter(); - - task::block_in_place(|| thread.run()) - } - }) - }) - .build() + Ok(PlottingThreadPoolPair { + plotting: create_plotting_thread_pool_manager_thread_pool_pair( + "plotting", + thread_pool_index, + plotting_cpu_core_set, + )?, + replotting: create_plotting_thread_pool_manager_thread_pool_pair( + "replotting", + thread_pool_index, + replotting_cpu_core_set, + )?, + }) }, NonZeroUsize::new(total_thread_pools) .expect("Thread pool is guaranteed to be non-empty; qed"),