diff --git a/README.md b/README.md index fe7fed60c3..12c275d4d1 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Latest Release](https://img.shields.io/github/v/release/autonomys/subspace?display_name=tag&style=flat-square)](https://github.com/autonomys/subspace/releases) [![Downloads Latest](https://img.shields.io/github/downloads/autonomys/subspace/latest/total?style=flat-square)](https://github.com/autonomys/subspace/releases/latest) -[![Rust](https://img.shields.io/github/actions/workflow/status/autonomys/subspace/rust.yml?branch=main)](https://github.com/autonomys/subspace/actions/workflows/rust.yaml) +[![Rust](https://img.shields.io/github/actions/workflow/status/autonomys/subspace/rust.yml?branch=main)](https://github.com/autonomys/subspace/actions/workflows/rust.yml) [![Rust Docs](https://img.shields.io/github/actions/workflow/status/autonomys/subspace/rustdoc.yml?branch=main)](https://autonomys.github.io/subspace) This is a mono repository for [Subspace Network](https://subspace.network/) implementation, primarily containing diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs index e05ea9b51e..4d7aae6d1e 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -2,7 +2,7 @@ use crate::commands::shared::DiskFarm; use anyhow::anyhow; -use async_lock::{Mutex as AsyncMutex, Semaphore}; +use async_lock::Mutex as AsyncMutex; use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::Parser; @@ -36,7 +36,6 @@ use subspace_farmer::utils::{ use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_kzg::Kzg; use subspace_proof_of_space::Table; -use tokio::sync::Barrier; use tracing::{error, info, info_span, warn, Instrument}; const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30); @@ -59,8 +58,7 @@ pub(super) struct FarmerArgs { /// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that /// farmer will make sure to not exceed (and will pre-allocated all the space on startup to /// ensure it will not run out of space in runtime). Optionally, `record-chunks-mode` can be - /// set to `ConcurrentChunks` or `WholeSector` in order to avoid internal benchmarking during - /// startup. + /// set to `ConcurrentChunks` (default) or `WholeSector`. disk_farms: Vec, /// Address for farming rewards #[arg(long, value_parser = parse_ss58_reward_address)] @@ -256,9 +254,6 @@ where let farms = { let node_client = node_client.clone(); let info_mutex = &AsyncMutex::new(()); - let faster_read_sector_record_chunks_mode_barrier = - Arc::new(Barrier::new(disk_farms.len())); - let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1)); let registry = &Mutex::new(registry); let mut farms = Vec::with_capacity(disk_farms.len()); @@ -272,10 +267,6 @@ where let erasure_coding = erasure_coding.clone(); let plotter = Arc::clone(&plotter); let global_mutex = Arc::clone(&global_mutex); - let faster_read_sector_record_chunks_mode_barrier = - Arc::clone(&faster_read_sector_record_chunks_mode_barrier); - let faster_read_sector_record_chunks_mode_concurrency = - Arc::clone(&faster_read_sector_record_chunks_mode_concurrency); async move { let farm_fut = SingleDiskFarm::new::<_, PosTable>( @@ -297,9 +288,8 @@ where max_plotting_sectors_per_farm, disable_farm_locking, read_sector_record_chunks_mode: disk_farm - .read_sector_record_chunks_mode, - faster_read_sector_record_chunks_mode_barrier, - faster_read_sector_record_chunks_mode_concurrency, + .read_sector_record_chunks_mode + .unwrap_or(ReadSectorRecordChunksMode::ConcurrentChunks), registry: Some(registry), create, }, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index d8cb3ccf3d..b2283814c7 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -54,7 +54,6 @@ use subspace_kzg::Kzg; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; -use tokio::sync::Barrier; use tracing::{error, info, info_span, warn, Instrument}; /// Get piece retry attempts number. @@ -198,8 +197,7 @@ pub(crate) struct FarmingArgs { /// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that /// farmer will make sure to not exceed (and will pre-allocated all the space on startup to /// ensure it will not run out of space in runtime). Optionally, `record-chunks-mode` can be - /// set to `ConcurrentChunks` or `WholeSector` in order to avoid internal benchmarking during - /// startup. + /// set to `ConcurrentChunks` (default) or `WholeSector`. disk_farms: Vec, /// WebSocket RPC URL of the Subspace node to connect to #[arg(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")] @@ -571,9 +569,6 @@ where let (farms, plotting_delay_senders) = { let info_mutex = &AsyncMutex::new(()); - let faster_read_sector_record_chunks_mode_barrier = - Arc::new(Barrier::new(disk_farms.len())); - let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1)); let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len()) .map(|_| oneshot::channel()) .unzip::<_, _, Vec<_>, Vec<_>>(); @@ -591,10 +586,6 @@ where let erasure_coding = erasure_coding.clone(); let plotter = Arc::clone(&plotter); let global_mutex = Arc::clone(&global_mutex); - let faster_read_sector_record_chunks_mode_barrier = - Arc::clone(&faster_read_sector_record_chunks_mode_barrier); - let faster_read_sector_record_chunks_mode_concurrency = - Arc::clone(&faster_read_sector_record_chunks_mode_concurrency); async move { let farm_fut = SingleDiskFarm::new::<_, PosTable>( @@ -615,9 +606,8 @@ where max_plotting_sectors_per_farm, disable_farm_locking, read_sector_record_chunks_mode: disk_farm - .read_sector_record_chunks_mode, - faster_read_sector_record_chunks_mode_barrier, - faster_read_sector_record_chunks_mode_concurrency, + .read_sector_record_chunks_mode + .unwrap_or(ReadSectorRecordChunksMode::ConcurrentChunks), registry: Some(registry), create, }, diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 9c294f4886..89e51bf5a2 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -40,7 +40,7 @@ use crate::single_disk_farm::plotting::{ use crate::single_disk_farm::reward_signing::reward_signing; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::{farm, KNOWN_PEERS_CACHE_SIZE}; -use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore}; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use async_trait::async_trait; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::{mpsc, oneshot}; @@ -49,7 +49,6 @@ use futures::{select, FutureExt, StreamExt}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use prometheus_client::registry::Registry; -use rand::prelude::*; use rayon::prelude::*; use rayon::{ThreadPoolBuildError, ThreadPoolBuilder}; use serde::{Deserialize, Serialize}; @@ -64,27 +63,27 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::{fmt, fs, io, mem}; use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; use subspace_core_primitives::pieces::Record; use subspace_core_primitives::sectors::SectorIndex; use subspace_core_primitives::segments::{HistorySize, SegmentIndex}; -use subspace_core_primitives::{PublicKey, ScalarBytes}; +use subspace_core_primitives::PublicKey; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed}; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAtSync}; +use subspace_farmer_components::FarmerProtocolInfo; use subspace_kzg::Kzg; use subspace_networking::KnownPeersManager; use subspace_proof_of_space::Table; use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse}; use thiserror::Error; use tokio::runtime::Handle; -use tokio::sync::{broadcast, Barrier}; +use tokio::sync::broadcast; use tokio::task; -use tracing::{debug, error, info, trace, warn, Instrument, Span}; +use tracing::{error, info, trace, warn, Instrument, Span}; // Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to // usize depending on chain parameters @@ -95,10 +94,6 @@ const RESERVED_PLOT_METADATA: u64 = 1024 * 1024; /// Reserve 1M of space for farm info (for potential future expansion) const RESERVED_FARM_INFO: u64 = 1024 * 1024; const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_secs(30); -/// Limit for reads in internal benchmark. -/// -/// 4 seconds is proving time, hence 3 seconds for reads. -const INTERNAL_BENCHMARK_READ_TIMEOUT: Duration = Duration::from_millis(3500); /// Exclusive lock for single disk farm info file, ensuring no concurrent edits by cooperating processes is done #[derive(Debug)] @@ -317,13 +312,8 @@ where pub max_plotting_sectors_per_farm: NonZeroUsize, /// Disable farm locking, for example if file system doesn't support it pub disable_farm_locking: bool, - /// Explicit mode to use for reading of sector record chunks instead of doing internal - /// benchmarking - pub read_sector_record_chunks_mode: Option, - /// Barrier before internal benchmarking between different farms - pub faster_read_sector_record_chunks_mode_barrier: Arc, - /// Limit concurrency of internal benchmarking between different farms - pub faster_read_sector_record_chunks_mode_concurrency: Arc, + /// Mode to use for reading of sector record chunks instead + pub read_sector_record_chunks_mode: ReadSectorRecordChunksMode, /// Prometheus registry pub registry: Option<&'a Mutex<&'a mut Registry>>, /// Whether to create a farm if it doesn't yet exist @@ -862,8 +852,6 @@ impl SingleDiskFarm { max_plotting_sectors_per_farm, disable_farm_locking, read_sector_record_chunks_mode, - faster_read_sector_record_chunks_mode_barrier, - faster_read_sector_record_chunks_mode_concurrency, registry, create, } = options; @@ -983,40 +971,6 @@ impl SingleDiskFarm { let (farming_plot, farming_thread_pool) = AsyncJoinOnDrop::new(farming_plot_fut, false).await??; - faster_read_sector_record_chunks_mode_barrier.wait().await; - - let (read_sector_record_chunks_mode, farming_plot, farming_thread_pool) = - if let Some(mode) = read_sector_record_chunks_mode { - (mode, farming_plot, farming_thread_pool) - } else { - // Error doesn't matter here - let _permit = faster_read_sector_record_chunks_mode_concurrency - .acquire() - .await; - let span = span.clone(); - let plot_file = Arc::clone(&plot_file); - - let read_sector_record_chunks_mode_fut = task::spawn_blocking(move || { - farming_thread_pool - .install(move || { - let _span_guard = span.enter(); - - faster_read_sector_record_chunks_mode( - &*plot_file, - &farming_plot, - sector_size, - metadata_header.plotted_sector_count, - ) - .map(|mode| (mode, farming_plot)) - }) - .map(|(mode, farming_plot)| (mode, farming_plot, farming_thread_pool)) - }); - - AsyncJoinOnDrop::new(read_sector_record_chunks_mode_fut, false).await?? - }; - - faster_read_sector_record_chunks_mode_barrier.wait().await; - let plotting_join_handle = task::spawn_blocking({ let sectors_metadata = Arc::clone(§ors_metadata); let handlers = Arc::clone(&handlers); @@ -2423,80 +2377,3 @@ fn write_dummy_sector_metadata( error, }) } - -fn faster_read_sector_record_chunks_mode( - original_plot: &OP, - farming_plot: &FP, - sector_size: usize, - mut plotted_sector_count: SectorIndex, -) -> Result -where - OP: FileExt + Sync, - FP: ReadAtSync, -{ - info!("Benchmarking faster proving method"); - - let mut sector_bytes = vec![0u8; sector_size]; - - if plotted_sector_count == 0 { - thread_rng().fill_bytes(&mut sector_bytes); - original_plot.write_all_at(§or_bytes, 0)?; - - plotted_sector_count = 1; - } - - let mut fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks; - let mut fastest_time = Duration::MAX; - - for _ in 0..3 { - let sector_offset = - sector_size as u64 * thread_rng().gen_range(0..plotted_sector_count) as u64; - let farming_plot = farming_plot.offset(sector_offset); - - // Reading the whole sector at once - { - let start = Instant::now(); - farming_plot.read_at(&mut sector_bytes, 0)?; - let elapsed = start.elapsed(); - - debug!(?elapsed, "Whole sector"); - - if elapsed >= INTERNAL_BENCHMARK_READ_TIMEOUT { - debug!( - ?elapsed, - "Reading whole sector is too slow, using chunks instead" - ); - - fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks; - break; - } - - if fastest_time > elapsed { - fastest_mode = ReadSectorRecordChunksMode::WholeSector; - fastest_time = elapsed; - } - } - - // A lot simplified version of concurrent chunks - { - let start = Instant::now(); - (0..Record::NUM_CHUNKS).into_par_iter().try_for_each(|_| { - let offset = thread_rng().gen_range(0_usize..sector_size / ScalarBytes::FULL_BYTES) - * ScalarBytes::FULL_BYTES; - farming_plot.read_at(&mut [0; ScalarBytes::FULL_BYTES], offset as u64) - })?; - let elapsed = start.elapsed(); - - debug!(?elapsed, "Chunks"); - - if fastest_time > elapsed { - fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks; - fastest_time = elapsed; - } - } - } - - info!(?fastest_mode, "Faster proving method found"); - - Ok(fastest_mode) -}