diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 228e1c0778..71ef3dc3af 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -34,7 +34,8 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::ss58::parse_ss58_reward_address; use subspace_farmer::utils::{ all_cpu_cores, create_plotting_thread_pool_manager, parse_cpu_cores_sets, - run_future_in_dedicated_thread, thread_pool_core_indices, AsyncJoinOnDrop, + recommended_number_of_farming_threads, run_future_in_dedicated_thread, + thread_pool_core_indices, AsyncJoinOnDrop, }; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; @@ -116,7 +117,8 @@ pub(crate) struct FarmingArgs { #[arg(long)] sector_downloading_concurrency: Option, /// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and - /// number of NUMA nodes on NUMA system. It is further restricted by + /// number of NUMA nodes on NUMA system or L3 cache groups on large CPUs. It is further + /// restricted by /// `--sector-downloading-concurrency` and setting this option higher than /// `--sector-downloading-concurrency` will have no effect. #[arg(long)] @@ -133,7 +135,8 @@ pub(crate) struct FarmingArgs { #[arg(long)] farming_thread_pool_size: Option, /// Size of one thread pool used for plotting, defaults to number of logical CPUs available - /// on UMA system and number of logical CPUs available in NUMA node on NUMA system. + /// on UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache + /// groups on large CPUs. /// /// Number of thread pools is defined by `--sector-encoding-concurrency` option, different /// thread pools might have different number of threads if NUMA nodes do not have the same size. @@ -154,7 +157,8 @@ pub(crate) struct FarmingArgs { plotting_cpu_cores: Option, /// Size of one thread pool used for replotting, typically smaller pool than for plotting /// to not affect farming as much, defaults to half of the number of logical CPUs available on - /// UMA system and number of logical CPUs available in NUMA node on NUMA system. + /// UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache + /// groups on large CPUs. /// /// Number of thread pools is defined by `--sector-encoding-concurrency` option, different /// thread pools might have different number of threads if NUMA nodes do not have the same size. @@ -498,7 +502,6 @@ where .unwrap_or(plotting_thread_pool_core_indices.len() + 1), )); - let all_cpu_cores = all_cpu_cores(); let plotting_thread_pool_manager = create_plotting_thread_pool_manager( plotting_thread_pool_core_indices .into_iter() @@ -506,14 +509,9 @@ where )?; let farming_thread_pool_size = farming_thread_pool_size .map(|farming_thread_pool_size| farming_thread_pool_size.get()) - .unwrap_or_else(|| { - all_cpu_cores - .first() - .expect("Not empty according to function description; qed") - .cpu_cores() - .len() - }); + .unwrap_or_else(recommended_number_of_farming_threads); + let all_cpu_cores = all_cpu_cores(); if all_cpu_cores.len() > 1 { info!(numa_nodes = %all_cpu_cores.len(), "NUMA system detected"); diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index 0b92364755..45695f7afd 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -9,6 +9,7 @@ use crate::thread_pool_manager::{PlottingThreadPoolManager, PlottingThreadPoolPa use futures::channel::oneshot; use futures::channel::oneshot::Canceled; use futures::future::Either; +use hwlocality::object::types::ObjectType; use rayon::{ThreadBuilder, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; use std::future::Future; use std::num::{NonZeroUsize, ParseIntError}; @@ -187,7 +188,30 @@ impl CpuCoreSet { } } -/// Get all cpu cores, grouped into sets according to NUMA nodes. +/// Recommended number of thread pool size for farming, equal to number of CPU cores in the first +/// NUMA node +pub fn recommended_number_of_farming_threads() -> usize { + #[cfg(feature = "numa")] + match hwlocality::Topology::new().map(std::sync::Arc::new) { + Ok(topology) => { + return topology + // Iterate over NUMA nodes + .objects_at_depth(hwlocality::object::depth::Depth::NUMANode) + // For each NUMA nodes get CPU set + .filter_map(|node| node.cpuset()) + // Get number of CPU cores + .map(|cpuset| cpuset.iter_set().count()) + .find(|&count| count > 0) + .unwrap_or_else(num_cpus::get); + } + Err(error) => { + warn!(%error, "Failed to get NUMA topology"); + } + } + num_cpus::get() +} + +/// Get all cpu cores, grouped into sets according to NUMA nodes or L3 cache groups on large CPUs. /// /// Returned vector is guaranteed to have at least one element and have non-zero number of CPU cores /// in each set. @@ -196,8 +220,8 @@ pub fn all_cpu_cores() -> Vec { match hwlocality::Topology::new().map(std::sync::Arc::new) { Ok(topology) => { let cpu_cores = topology - // Iterate over NUMA nodes - .objects_at_depth(hwlocality::object::depth::Depth::NUMANode) + // Iterate over groups of L3 caches + .objects_with_type(ObjectType::L3Cache) // For each NUMA nodes get CPU set .filter_map(|node| node.cpuset()) // For each CPU set extract individual cores @@ -214,7 +238,7 @@ pub fn all_cpu_cores() -> Vec { } } Err(error) => { - warn!(%error, "Failed to get CPU topology"); + warn!(%error, "Failed to get L3 cache topology"); } } vec![CpuCoreSet { @@ -227,6 +251,9 @@ pub fn all_cpu_cores() -> Vec { /// Parse space-separated set of groups of CPU cores (individual cores are coma-separated) into /// vector of CPU core sets that can be used for creation of plotting/replotting thread pools. pub fn parse_cpu_cores_sets(s: &str) -> Result, ParseIntError> { + #[cfg(feature = "numa")] + let topology = hwlocality::Topology::new().map(std::sync::Arc::new).ok(); + s.split(' ') .map(|s| { let cores = s @@ -237,7 +264,7 @@ pub fn parse_cpu_cores_sets(s: &str) -> Result, ParseIntError> { Ok(CpuCoreSet { cores, #[cfg(feature = "numa")] - topology: hwlocality::Topology::new().map(std::sync::Arc::new).ok(), + topology: topology.clone(), }) }) .collect()