diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index b80e0aa756..d5fc854343 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -8,8 +8,8 @@ use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::{Parser, ValueHint}; use futures::channel::oneshot; -use futures::stream::{FuturesOrdered, FuturesUnordered}; -use futures::{FutureExt, StreamExt, TryStreamExt}; +use futures::stream::FuturesUnordered; +use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use prometheus_client::registry::Registry; use std::fs; @@ -197,7 +197,7 @@ pub(crate) struct FarmingArgs { disable_farm_locking: bool, /// Exit on farm error. /// - /// By default, farmer will continue running if the are still other working farms. + /// By default, farmer will continue running if there are still other working farms. #[arg(long)] exit_on_farm_error: bool, } @@ -206,7 +206,7 @@ fn cache_percentage_parser(s: &str) -> anyhow::Result { let cache_percentage = NonZeroU8::from_str(s)?; if cache_percentage.get() > 99 { - return Err(anyhow::anyhow!("Cache percentage can't exceed 99")); + return Err(anyhow!("Cache percentage can't exceed 99")); } Ok(cache_percentage) @@ -295,7 +295,7 @@ where let farmer_app_info = node_client .farmer_app_info() .await - .map_err(|error| anyhow::anyhow!(error))?; + .map_err(|error| anyhow!(error))?; let first_farm_directory = &disk_farms .first() @@ -350,7 +350,7 @@ where NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) - .map_err(|error| anyhow::anyhow!(error))?; + .map_err(|error| anyhow!(error))?; let validator = Some(SegmentCommitmentPieceValidator::new( node.clone(), node_client.clone(), @@ -407,17 +407,14 @@ where let replotting_thread_pool_core_indices; if let Some(plotting_cpu_cores) = plotting_cpu_cores { plotting_thread_pool_core_indices = parse_cpu_cores_sets(&plotting_cpu_cores) - .map_err(|error| anyhow::anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?; + .map_err(|error| anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?; replotting_thread_pool_core_indices = match replotting_cpu_cores { - Some(replotting_cpu_cores) => { - parse_cpu_cores_sets(&replotting_cpu_cores).map_err(|error| { - anyhow::anyhow!("Failed to parse `--replotting-cpu-cores`: {error}") - })? - } + Some(replotting_cpu_cores) => parse_cpu_cores_sets(&replotting_cpu_cores) + .map_err(|error| anyhow!("Failed to parse `--replotting-cpu-cores`: {error}"))?, None => plotting_thread_pool_core_indices.clone(), }; if plotting_thread_pool_core_indices.len() != replotting_thread_pool_core_indices.len() { - return Err(anyhow::anyhow!( + return Err(anyhow!( "Number of plotting thread pools ({}) is not the same as for replotting ({})", plotting_thread_pool_core_indices.len(), replotting_thread_pool_core_indices.len() @@ -551,7 +548,7 @@ where }) => { return ( farm_index, - Err(anyhow::anyhow!( + Err(anyhow!( "Allocated space {} ({}) is not enough, minimum is ~{} (~{}, \ {} bytes to be exact)", bytesize::to_string(allocated_space, true), @@ -646,60 +643,43 @@ where info!("Collecting already plotted pieces (this will take some time)..."); // Collect already plotted pieces - { - let mut plotted_pieces = plotted_pieces.write().await; + let mut total_and_plotted_sectors = Vec::with_capacity(farms.len()); - for (farm_index, farm) in farms.iter().enumerate() { - let farm_index = farm_index.try_into().map_err(|_error| { - anyhow!( - "More than 256 plots are not supported, consider running multiple farmer \ + for (farm_index, farm) in farms.iter().enumerate() { + let mut plotted_pieces = plotted_pieces.write().await; + let farm_index = farm_index.try_into().map_err(|_error| { + anyhow!( + "More than 256 plots are not supported, consider running multiple farmer \ instances" - ) - })?; - - plotted_pieces.add_farm(farm_index, farm.piece_reader()); - - let plotted_sectors = farm.plotted_sectors(); - let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| { - anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}") - })?; - while let Some(plotted_sector_result) = plotted_sectors.next().await { - match plotted_sector_result { - Ok(plotted_sector) => { - plotted_pieces.add_sector(farm_index, &plotted_sector); - } - Err(error) => { - error!( - %error, - %farm_index, - "Failed reading plotted sector on startup, skipping" - ); - } - } - } + ) + })?; + + plotted_pieces.add_farm(farm_index, farm.piece_reader()); + + let total_sector_count = farm.total_sectors_count(); + let mut plotted_sectors_count = 0; + let plotted_sectors = farm.plotted_sectors(); + let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| { + anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}") + })?; + + while let Some(plotted_sector_result) = plotted_sectors.next().await { + plotted_sectors_count += 1; + plotted_pieces.add_sector( + farm_index, + &plotted_sector_result.map_err(|error| { + anyhow!( + "Failed reading plotted sector on startup for farm {farm_index}: {error}" + ) + })?, + ) } + + total_and_plotted_sectors.push((total_sector_count, plotted_sectors_count)); } info!("Finished collecting already plotted pieces successfully"); - let total_and_plotted_sectors = farms - .iter() - .enumerate() - .map(|(farm_index, farm)| async move { - let total_sector_count = farm.total_sectors_count(); - let plotted_sectors_count = farm.plotted_sectors_count().await.map_err(|error| { - anyhow!( - "Failed to get plotted sectors count from from index {farm_index}: \ - {error}" - ) - })?; - - anyhow::Ok((total_sector_count, plotted_sectors_count)) - }) - .collect::>() - .try_collect::>() - .await?; - let mut farms_stream = (0u8..) .zip(farms) .zip(total_and_plotted_sectors) @@ -871,7 +851,7 @@ where let farm_fut = pin!(farm_fut); let farmer_cache_worker_fut = pin!(farmer_cache_worker_fut); - futures::select!( + select! { // Signal future _ = signal.fuse() => {}, @@ -889,7 +869,7 @@ where _ = farmer_cache_worker_fut.fuse() => { info!("Farmer cache worker exited.") }, - ); + } anyhow::Ok(()) } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs index 9210d12656..1a77b01705 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs @@ -163,6 +163,15 @@ where async move { let internal_result = match req { SegmentHeaderRequest::SegmentIndexes { segment_indexes } => { + if segment_indexes.len() as u64 > SEGMENT_HEADER_NUMBER_LIMIT { + debug!( + "segment_indexes length exceed the limit: {} ", + segment_indexes.len() + ); + + return None; + } + debug!( segment_indexes_count = ?segment_indexes.len(), "Segment headers request received." diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index 2db598b33d..43b30b06e5 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -2,7 +2,7 @@ use crate::node_client; use async_trait::async_trait; use derive_more::{Display, From}; use futures::Stream; -use parity_scale_codec::{Decode, Encode, Input, Output}; +use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output}; use serde::{Deserialize, Serialize}; use std::future::Future; use std::pin::Pin; @@ -50,7 +50,7 @@ pub trait PieceCache: Send + Sync + fmt::Debug { /// doesn't happen for the same piece being accessed! async fn contents( &self, - ) -> Box)> + Unpin + '_>; + ) -> Box)> + Unpin + Send + '_>; /// Store piece in cache at specified offset, replacing existing piece if there is any. /// @@ -332,7 +332,7 @@ pub trait PieceReader: Send + Sync + fmt::Debug { } /// Opaque handler ID for event handlers, once dropped handler will be removed automatically -pub trait HandlerId: Send + fmt::Debug { +pub trait HandlerId: Send + Sync + fmt::Debug { /// Consumes [`HandlerId`] and prevents handler from being removed automatically. fn detach(&self); } @@ -353,6 +353,39 @@ pub enum FarmId { Ulid(Ulid), } +impl Encode for FarmId { + fn size_hint(&self) -> usize { + 1_usize + + match self { + FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)), + } + } + fn encode_to(&self, output: &mut O) { + match self { + FarmId::Ulid(ulid) => { + output.push_byte(0); + Encode::encode_to(&ulid.0, output); + } + } + } +} + +impl EncodeLike for FarmId {} + +impl Decode for FarmId { + fn decode(input: &mut I) -> Result { + match input + .read_byte() + .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))? + { + 0 => u128::decode(input) + .map(|ulid| FarmId::Ulid(Ulid(ulid))) + .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")), + _ => Err("Could not decode `FarmId`, variant doesn't exist".into()), + } + } +} + #[allow(clippy::new_without_default)] impl FarmId { /// Creates new ID @@ -370,9 +403,6 @@ pub trait Farm { /// Number of sectors in this farm fn total_sectors_count(&self) -> SectorIndex; - /// Number of sectors successfully plotted so far - async fn plotted_sectors_count(&self) -> Result; - /// Get plotted sectors instance fn plotted_sectors(&self) -> Arc; diff --git a/crates/subspace-farmer/src/piece_cache.rs b/crates/subspace-farmer/src/piece_cache.rs index c5cb23948d..3f005b8da5 100644 --- a/crates/subspace-farmer/src/piece_cache.rs +++ b/crates/subspace-farmer/src/piece_cache.rs @@ -80,7 +80,7 @@ impl farm::PieceCache for PieceCache { async fn contents( &self, - ) -> Box)> + Unpin + '_> { + ) -> Box)> + Unpin + Send + '_> { let this = self.clone(); let (mut sender, receiver) = mpsc::channel(1); let read_contents = task::spawn_blocking(move || { diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index e0f338f068..2e4c8c2eb8 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -6,9 +6,7 @@ mod plotted_sectors; mod plotting; pub mod unbuffered_io_file_windows; -use crate::farm::{ - Farm, FarmError, FarmId, HandlerFn, PieceReader, PlotCache, PlottedSectors, SectorUpdate, -}; +use crate::farm::{Farm, FarmId, HandlerFn, PieceReader, PlotCache, PlottedSectors, SectorUpdate}; pub use crate::farm::{FarmingError, FarmingNotification}; use crate::identity::{Identity, IdentityError}; use crate::node_client::NodeClient; @@ -615,10 +613,6 @@ impl Farm for SingleDiskFarm { self.total_sectors_count } - async fn plotted_sectors_count(&self) -> Result { - Ok(self.plotted_sectors_count().await) - } - fn plotted_sectors(&self) -> Arc { Arc::new(self.plotted_sectors()) } @@ -1469,16 +1463,6 @@ impl SingleDiskFarm { self.total_sectors_count } - /// Number of sectors successfully plotted so far - pub async fn plotted_sectors_count(&self) -> SectorIndex { - self.sectors_metadata - .read() - .await - .len() - .try_into() - .expect("Number of sectors never exceeds `SectorIndex` type; qed") - } - /// Read information about sectors plotted so far pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors { SingleDiskPlottedSectors { diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index da1b681873..1e270e82d1 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -24,7 +24,7 @@ impl farm::PieceCache for DiskPieceCache { async fn contents( &self, - ) -> Box)> + Unpin + '_> { + ) -> Box)> + Unpin + Send + '_> { if let Some(piece_cache) = &self.maybe_piece_cache { farm::PieceCache::contents(piece_cache).await } else {