From 817d45b79cc1b292bdd80de3abe3361591a7d5c2 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 13 Jun 2024 20:46:32 +0300 Subject: [PATCH] Make sure metadata for plotted sectors is written sequentially --- .../subspace-farmer/src/single_disk_farm.rs | 4 +- .../src/single_disk_farm/plotting.rs | 61 ++++++++++++------- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 445d5cc143d..26966c492c6 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -814,6 +814,8 @@ impl SingleDiskFarm { let plotting_options = PlottingOptions { metadata_header, + sectors_metadata: §ors_metadata, + sectors_being_modified: §ors_being_modified, sectors_to_plot_receiver, sector_plotting_options: SectorPlottingOptions { public_key, @@ -822,9 +824,7 @@ impl SingleDiskFarm { sector_size, plot_file: &plot_file, metadata_file, - sectors_metadata: §ors_metadata, handlers: &handlers, - sectors_being_modified: §ors_being_modified, global_mutex: &global_mutex, plotter, }, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index c66b106c347..a46a12fdea7 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -96,15 +96,15 @@ pub(super) struct SectorPlottingOptions<'a, NC, P> { pub(super) metadata_file: File, #[cfg(windows)] pub(super) metadata_file: UnbufferedIoFileWindows, - pub(super) sectors_metadata: &'a AsyncRwLock>, pub(super) handlers: &'a Handlers, - pub(super) sectors_being_modified: &'a AsyncRwLock>, pub(super) global_mutex: &'a AsyncMutex<()>, pub(super) plotter: P, } pub(super) struct PlottingOptions<'a, NC, P> { pub(super) metadata_header: PlotMetadataHeader, + pub(super) sectors_metadata: &'a AsyncRwLock>, + pub(super) sectors_being_modified: &'a AsyncRwLock>, pub(super) sectors_to_plot_receiver: mpsc::Receiver, pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC, P>, } @@ -122,6 +122,8 @@ where { let PlottingOptions { mut metadata_header, + sectors_metadata, + sectors_being_modified, mut sectors_to_plot_receiver, sector_plotting_options, } = plotting_options; @@ -138,7 +140,12 @@ where }; let sector_index = sector_to_plot.sector_index; - let sector_plotting_init_fut = plot_single_sector(sector_to_plot, §or_plotting_options) + let sector_plotting_init_fut = plot_single_sector( + sector_to_plot, + §or_plotting_options, + sectors_metadata, + sectors_being_modified, + ) .instrument(info_span!("", %sector_index)) .fuse(); let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut); @@ -158,8 +165,10 @@ where process_plotting_result( maybe_sector_plotting_result?, &mut metadata_header, + sectors_metadata, + sectors_being_modified, §or_plotting_options.metadata_file - )?; + ).await?; } } } @@ -168,8 +177,10 @@ where process_plotting_result( maybe_sector_plotting_result?, &mut metadata_header, + sectors_metadata, + sectors_being_modified, §or_plotting_options.metadata_file - )?; + ).await?; } } } @@ -177,14 +188,17 @@ where Ok(()) } -fn process_plotting_result( +async fn process_plotting_result( sector_plotting_result: SectorPlottingResult, metadata_header: &mut PlotMetadataHeader, + sectors_metadata: &AsyncRwLock>, + sectors_being_modified: &AsyncRwLock>, #[cfg(not(windows))] metadata_file: &File, #[cfg(windows)] metadata_file: &UnbufferedIoFileWindows, ) -> Result<(), PlottingError> { let SectorPlottingResult { sector_index, + sector_metadata, replotting, last_queued, } = sector_plotting_result; @@ -194,6 +208,19 @@ fn process_plotting_result( metadata_file.write_all_at(&metadata_header.encode(), 0)?; } + { + let mut sectors_metadata = sectors_metadata.write().await; + // If exists then we're replotting, otherwise we create sector for the first time + if let Some(existing_sector_metadata) = sectors_metadata.get_mut(sector_index as usize) { + *existing_sector_metadata = sector_metadata; + } else { + sectors_metadata.push(sector_metadata); + } + } + + // Inform others that this sector is no longer being modified + sectors_being_modified.write().await.remove(§or_index); + if last_queued { if replotting { info!("Replotting complete"); @@ -220,6 +247,7 @@ where struct SectorPlottingResult { sector_index: SectorIndex, + sector_metadata: SectorMetadataChecksummed, replotting: bool, last_queued: bool, } @@ -227,6 +255,8 @@ struct SectorPlottingResult { async fn plot_single_sector<'a, NC, P>( sector_to_plot: SectorToPlot, sector_plotting_options: &'a SectorPlottingOptions<'a, NC, P>, + sectors_metadata: &'a AsyncRwLock>, + sectors_being_modified: &'a AsyncRwLock>, ) -> Result> + 'a, PlottingError> where NC: NodeClient, @@ -239,9 +269,7 @@ where sector_size, plot_file, metadata_file, - sectors_metadata, handlers, - sectors_being_modified, global_mutex, plotter, } = sector_plotting_options; @@ -368,17 +396,6 @@ where } }; - { - let mut sectors_metadata = sectors_metadata.write().await; - // If exists then we're replotting, otherwise we create sector for the first time - if let Some(existing_sector_metadata) = sectors_metadata.get_mut(sector_index as usize) - { - *existing_sector_metadata = plotted_sector.sector_metadata.clone(); - } else { - sectors_metadata.push(plotted_sector.sector_metadata.clone()); - } - } - let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| { let old_history_size = old_sector_metadata.history_size; @@ -405,15 +422,14 @@ where } }); - // Inform others that this sector is no longer being modified - sectors_being_modified.write().await.remove(§or_index); - if replotting { debug!("Sector replotted successfully"); } else { debug!("Sector plotted successfully"); } + let sector_metadata = plotted_sector.sector_metadata.clone(); + let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished { plotted_sector, old_plotted_sector: maybe_old_plotted_sector, @@ -425,6 +441,7 @@ where Ok(SectorPlottingResult { sector_index, + sector_metadata, replotting, last_queued, })