From e0fe46d0aa35ea3b7108df59b7fd8a84c5dd37af Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 20 Jan 2024 18:22:42 +0200 Subject: [PATCH 1/4] Replace `SingleDiskFarm::on_sector_[plotting|plotted]()` with `SingleDiskFarm::on_sector_update()` that will be extended later --- .../src/plotting.rs | 4 +- .../src/bin/subspace-farmer/commands/farm.rs | 20 +++++--- .../subspace-farmer/src/single_disk_farm.rs | 51 +++++++++++-------- .../src/single_disk_farm/plotting.rs | 24 +++++---- 4 files changed, 58 insertions(+), 41 deletions(-) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index c650aacb34..d827636938 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -10,7 +10,7 @@ use backoff::future::retry; use backoff::{Error as BackoffError, ExponentialBackoff}; use futures::stream::FuturesUnordered; use futures::StreamExt; -use parity_scale_codec::Encode; +use parity_scale_codec::{Decode, Encode}; use rayon::prelude::*; use std::error::Error; use std::mem; @@ -97,7 +97,7 @@ impl PieceGetter for ArchivedHistorySegment { } /// Information about sector that was plotted -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Encode, Decode)] pub struct PlottedSector { /// Sector ID pub sector_id: SectorId, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 80ccd827c4..0a5516c782 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -23,7 +23,7 @@ use subspace_core_primitives::{PublicKey, Record, SectorIndex}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer::piece_cache::PieceCache; use subspace_farmer::single_disk_farm::{ - SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, + SectorPlottingDetails, SectorUpdate, SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, }; use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; @@ -633,10 +633,8 @@ where // Collect newly plotted pieces let on_plotted_sector_callback = - move |(plotted_sector, maybe_old_plotted_sector): &( - PlottedSector, - Option, - )| { + move |plotted_sector: &PlottedSector, + maybe_old_plotted_sector: &Option| { let _span_guard = span.enter(); { @@ -645,7 +643,7 @@ where .as_mut() .expect("Initial value was populated above; qed"); - if let Some(old_plotted_sector) = maybe_old_plotted_sector { + if let Some(old_plotted_sector) = &maybe_old_plotted_sector { readers_and_pieces.delete_sector(disk_farm_index, old_plotted_sector); } readers_and_pieces.add_sector(disk_farm_index, plotted_sector); @@ -659,7 +657,15 @@ where }; single_disk_farm - .on_sector_plotted(Arc::new(on_plotted_sector_callback)) + .on_sector_update(Arc::new(move |(_sector_index, sector_state)| { + if let SectorUpdate::Plotting(SectorPlottingDetails::Finished { + plotted_sector, + old_plotted_sector, + }) = sector_state + { + on_plotted_sector_callback(plotted_sector, old_plotted_sector); + } + })) .detach(); single_disk_farm diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 6baa4f4208..e9c5e4544d 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -539,21 +539,36 @@ type HandlerFn = Arc; type Handler = Bag, A>; /// Details about sector currently being plotted -pub struct SectorPlottingDetails { - /// Sector index - pub sector_index: SectorIndex, - /// Progress so far in % (not including this sector) - pub progress: f32, - /// Whether sector is being replotted - pub replotting: bool, - /// Whether this is the last sector queued so far - pub last_queued: bool, +#[derive(Debug, Clone, Encode, Decode)] +pub enum SectorPlottingDetails { + /// Starting plotting of a sector + Starting { + /// Progress so far in % (not including this sector) + progress: f32, + /// Whether sector is being replotted + replotting: bool, + /// Whether this is the last sector queued so far + last_queued: bool, + }, + /// Finished plotting + Finished { + /// Information about plotted sector + plotted_sector: PlottedSector, + /// Information about old plotted sector that was replaced + old_plotted_sector: Option, + }, +} + +/// Various sector updates +#[derive(Debug, Clone, Encode, Decode)] +pub enum SectorUpdate { + /// Sector is is being plotted + Plotting(SectorPlottingDetails), } #[derive(Default, Debug)] struct Handlers { - sector_plotting: Handler, - sector_plotted: Handler<(PlottedSector, Option)>, + sector_update: Handler<(SectorIndex, SectorUpdate)>, solution: Handler, plot_audited: Handler, } @@ -1316,17 +1331,9 @@ impl SingleDiskFarm { self.piece_reader.clone() } - /// Subscribe to sector plotting notification - pub fn on_sector_plotting(&self, callback: HandlerFn) -> HandlerId { - self.handlers.sector_plotting.add(callback) - } - - /// Subscribe to notification about plotted sectors - pub fn on_sector_plotted( - &self, - callback: HandlerFn<(PlottedSector, Option)>, - ) -> HandlerId { - self.handlers.sector_plotted.add(callback) + /// Subscribe to sector updates + pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId { + self.handlers.sector_update.add(callback) } /// Subscribe to notification about audited plots diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index ee8523fb4a..7a1abd2efe 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -1,5 +1,5 @@ use crate::single_disk_farm::{ - BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, + BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, SectorUpdate, RESERVED_PLOT_METADATA, }; use crate::thread_pool_manager::PlottingThreadPoolManager; @@ -184,14 +184,14 @@ where info!(%sector_index, "Plotting sector ({progress:.2}% complete)"); } + let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Starting { + progress, + replotting, + last_queued, + }); handlers - .sector_plotting - .call_simple(&SectorPlottingDetails { - sector_index, - progress, - replotting, - last_queued, - }); + .sector_update + .call_simple(&(sector_index, sector_state)); // This `loop` is a workaround for edge-case in local setup if expiration is configured to // 1. In that scenario we get replotting notification essentially straight from block import @@ -403,9 +403,13 @@ where } } + let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished { + plotted_sector, + old_plotted_sector: maybe_old_plotted_sector, + }); handlers - .sector_plotted - .call_simple(&(plotted_sector, maybe_old_plotted_sector)); + .sector_update + .call_simple(&(sector_index, sector_state)); } Ok(()) From 84502003b015f090083ad4a611471470d56a3f80 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 20 Jan 2024 19:03:46 +0200 Subject: [PATCH 2/4] Expose information about plot updates (downloading/encoding/writing) --- .../src/bin/subspace-farmer/commands/farm.rs | 1 + .../subspace-farmer/src/single_disk_farm.rs | 14 ++++ .../src/single_disk_farm/plotting.rs | 76 +++++++++++++++++-- 3 files changed, 83 insertions(+), 8 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 0a5516c782..97155ae5e5 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -661,6 +661,7 @@ where if let SectorUpdate::Plotting(SectorPlottingDetails::Finished { plotted_sector, old_plotted_sector, + .. }) = sector_state { on_plotted_sector_callback(plotted_sector, old_plotted_sector); diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index e9c5e4544d..64f205e91c 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -550,12 +550,26 @@ pub enum SectorPlottingDetails { /// Whether this is the last sector queued so far last_queued: bool, }, + /// Downloading sector pieces + Downloading, + /// Downloaded sector pieces + Downloaded(Duration), + /// Encoding sector pieces + Encoding, + /// Encoded sector pieces + Encoded(Duration), + /// Writing sector + Writing, + /// Wrote sector + Wrote(Duration), /// Finished plotting Finished { /// Information about plotted sector plotted_sector: PlottedSector, /// Information about old plotted sector that was replaced old_plotted_sector: Option, + /// How much time it took to plot a sector + time: Duration, }, } diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 7a1abd2efe..55cc63da9f 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -19,7 +19,7 @@ use std::ops::Range; use std::pin::pin; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{ Blake3Hash, HistorySize, PieceOffset, PublicKey, SectorId, SectorIndex, SegmentHeader, @@ -193,6 +193,8 @@ where .sector_update .call_simple(&(sector_index, sector_state)); + let start = Instant::now(); + // This `loop` is a workaround for edge-case in local setup if expiration is configured to // 1. In that scenario we get replotting notification essentially straight from block import // pipeline of the node, before block is imported. This can result in subsequent request for @@ -231,6 +233,13 @@ where .await .map_err(plotting::PlottingError::from)?; + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Plotting(SectorPlottingDetails::Downloading), + )); + + let start = Instant::now(); + let downloaded_sector_fut = download_sector(DownloadSectorOptions { public_key: &public_key, sector_index, @@ -243,13 +252,21 @@ where pieces_in_sector, }); - (downloading_permit, downloaded_sector_fut.await?) + let downloaded_sector = downloaded_sector_fut.await?; + + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(start.elapsed())), + )); + + (downloading_permit, downloaded_sector) }; // Initiate downloading of pieces for the next segment index if already known if let Some(sector_index) = next_segment_index_hint { let piece_getter = piece_getter.clone(); let downloading_semaphore = Arc::clone(&downloading_semaphore); + let handlers = Arc::clone(&handlers); let kzg = kzg.clone(); maybe_next_downloaded_sector_fut.replace(AsyncJoinOnDrop::new( @@ -260,6 +277,13 @@ where .await .map_err(plotting::PlottingError::from)?; + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Plotting(SectorPlottingDetails::Downloading), + )); + + let start = Instant::now(); + let downloaded_sector_fut = download_sector(DownloadSectorOptions { public_key: &public_key, sector_index, @@ -272,7 +296,16 @@ where pieces_in_sector, }); - Ok((downloading_permit, downloaded_sector_fut.await?)) + let downloaded_sector = downloaded_sector_fut.await?; + + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Plotting(SectorPlottingDetails::Downloaded( + start.elapsed(), + )), + )); + + Ok((downloading_permit, downloaded_sector)) } .in_current_span(), ), @@ -293,6 +326,13 @@ where let mut sector = Vec::new(); let mut sector_metadata = Vec::new(); + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Plotting(SectorPlottingDetails::Encoding), + )); + + let start = Instant::now(); + let plotted_sector = { let plot_sector_fut = pin!(encode_sector::( downloaded_sector, @@ -318,6 +358,11 @@ where })? }; + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Plotting(SectorPlottingDetails::Encoded(start.elapsed())), + )); + Ok((sector, sector_metadata, table_generator, plotted_sector)) }) }; @@ -341,11 +386,25 @@ where plotting_result? }; - plot_file.write_all_at(§or, (sector_index as usize * sector_size) as u64)?; - metadata_file.write_all_at( - §or_metadata, - RESERVED_PLOT_METADATA + (u64::from(sector_index) * sector_metadata_size as u64), - )?; + { + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Plotting(SectorPlottingDetails::Writing), + )); + + let start = Instant::now(); + + plot_file.write_all_at(§or, (sector_index as usize * sector_size) as u64)?; + metadata_file.write_all_at( + §or_metadata, + RESERVED_PLOT_METADATA + (u64::from(sector_index) * sector_metadata_size as u64), + )?; + + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Plotting(SectorPlottingDetails::Wrote(start.elapsed())), + )); + } if sector_index + 1 > metadata_header.plotted_sector_count { metadata_header.plotted_sector_count = sector_index + 1; @@ -406,6 +465,7 @@ where let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished { plotted_sector, old_plotted_sector: maybe_old_plotted_sector, + time: start.elapsed(), }); handlers .sector_update From 3cdc6b318596c7bccc7358fb529bdd944c457a98 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 20 Jan 2024 19:04:36 +0200 Subject: [PATCH 3/4] Move sector modifying write to later stage since farmer no longer writes directly into the sector on disk --- crates/subspace-farmer/src/single_disk_farm/plotting.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 55cc63da9f..4f6f834cc0 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -313,9 +313,6 @@ where )); } - // Inform others that this sector is being modified - modifying_sector_index.write().await.replace(sector_index); - let sector; let sector_metadata; let plotted_sector; @@ -386,6 +383,9 @@ where plotting_result? }; + // Inform others that this sector is being modified + modifying_sector_index.write().await.replace(sector_index); + { handlers.sector_update.call_simple(&( sector_index, From 1a45992bc66c6aeea01c976a7447879c675f0ace Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 20 Jan 2024 19:33:01 +0200 Subject: [PATCH 4/4] Add sector expiration updates --- .../subspace-farmer/src/single_disk_farm.rs | 17 ++++++ .../src/single_disk_farm/plotting.rs | 54 +++++++++++++++---- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 64f205e91c..31c013b7fe 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -573,11 +573,27 @@ pub enum SectorPlottingDetails { }, } +/// Details about sector expiration +#[derive(Debug, Clone, Encode, Decode)] +pub enum SectorExpirationDetails { + /// Sector expiration became known + Determined { + /// Segment index at which sector expires + expires_at: SegmentIndex, + }, + /// Sector will expire at the next segment index and should be replotted + AboutToExpire, + /// Sector already expired + Expired, +} + /// Various sector updates #[derive(Debug, Clone, Encode, Decode)] pub enum SectorUpdate { /// Sector is is being plotted Plotting(SectorPlottingDetails), + /// Sector expiration information updated + Expiration(SectorExpirationDetails), } #[derive(Default, Debug)] @@ -1013,6 +1029,7 @@ impl SingleDiskFarm { last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(), min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime, node_client: node_client.clone(), + handlers: Arc::clone(&handlers), sectors_metadata: Arc::clone(§ors_metadata), sectors_to_plot_sender, initial_plotting_finished: farming_delay_sender, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 4f6f834cc0..75c04dc0e5 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -1,6 +1,6 @@ use crate::single_disk_farm::{ - BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, SectorUpdate, - RESERVED_PLOT_METADATA, + BackgroundTaskError, Handlers, PlotMetadataHeader, SectorExpirationDetails, + SectorPlottingDetails, SectorUpdate, RESERVED_PLOT_METADATA, }; use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::AsyncJoinOnDrop; @@ -482,6 +482,7 @@ pub(super) struct PlottingSchedulerOptions { pub(super) last_archived_segment_index: SegmentIndex, pub(super) min_sector_lifetime: HistorySize, pub(super) node_client: NC, + pub(super) handlers: Arc, pub(super) sectors_metadata: Arc>>, pub(super) sectors_to_plot_sender: mpsc::Sender, pub(super) initial_plotting_finished: Option>, @@ -503,6 +504,7 @@ where last_archived_segment_index, min_sector_lifetime, node_client, + handlers, sectors_metadata, sectors_to_plot_sender, initial_plotting_finished, @@ -550,6 +552,7 @@ where target_sector_count, min_sector_lifetime, &node_client, + &handlers, sectors_metadata, &last_archived_segment, archived_segments_receiver, @@ -695,6 +698,7 @@ async fn send_plotting_notifications( target_sector_count: SectorIndex, min_sector_lifetime: HistorySize, node_client: &NC, + handlers: &Handlers, sectors_metadata: Arc>>, last_archived_segment: &Atomic, mut archived_segments_receiver: mpsc::Receiver<()>, @@ -771,6 +775,18 @@ where %expires_at, "Sector expires soon #1, scheduling replotting" ); + + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Expiration( + if expires_at <= archived_segment_header.segment_index() { + SectorExpirationDetails::Expired + } else { + SectorExpirationDetails::AboutToExpire + }, + ), + )); + // Time to replot sectors_to_replot.push(SectorToReplot { sector_index, @@ -827,24 +843,35 @@ where metadata; qed", ); + let expires_at = expiration_history_size.segment_index(); + trace!( %sector_index, %history_size, - sector_expire_at = %expiration_history_size.segment_index(), + sector_expire_at = %expires_at, "Determined sector expiration segment index" ); // +1 means we will start replotting a bit before it actually expires to avoid // storing expired sectors - if expiration_history_size.segment_index() - <= (archived_segment_header.segment_index() + SegmentIndex::ONE) - { - let expires_at = expiration_history_size.segment_index(); + if expires_at <= (archived_segment_header.segment_index() + SegmentIndex::ONE) { debug!( %sector_index, %history_size, %expires_at, "Sector expires soon #2, scheduling replotting" ); + + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Expiration( + if expires_at <= archived_segment_header.segment_index() { + SectorExpirationDetails::Expired + } else { + SectorExpirationDetails::AboutToExpire + }, + ), + )); + // Time to replot sectors_to_replot.push(SectorToReplot { sector_index, @@ -854,12 +881,19 @@ where trace!( %sector_index, %history_size, - sector_expire_at = %expiration_history_size.segment_index(), + sector_expire_at = %expires_at, "Sector expires later, remembering sector expiration" ); + + handlers.sector_update.call_simple(&( + sector_index, + SectorUpdate::Expiration(SectorExpirationDetails::Determined { + expires_at, + }), + )); + // Store expiration so we don't have to recalculate it later - sectors_expire_at - .insert(sector_index, expiration_history_size.segment_index()); + sectors_expire_at.insert(sector_index, expires_at); } } }