Skip to content

Commit

Permalink
Make sure metadata for plotted sectors is written sequentially
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Jun 13, 2024
1 parent 42e27aa commit 817d45b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 24 deletions.
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,8 @@ impl SingleDiskFarm {

let plotting_options = PlottingOptions {
metadata_header,
sectors_metadata: &sectors_metadata,
sectors_being_modified: &sectors_being_modified,
sectors_to_plot_receiver,
sector_plotting_options: SectorPlottingOptions {
public_key,
Expand All @@ -822,9 +824,7 @@ impl SingleDiskFarm {
sector_size,
plot_file: &plot_file,
metadata_file,
sectors_metadata: &sectors_metadata,
handlers: &handlers,
sectors_being_modified: &sectors_being_modified,
global_mutex: &global_mutex,
plotter,
},
Expand Down
61 changes: 39 additions & 22 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ pub(super) struct SectorPlottingOptions<'a, NC, P> {
pub(super) metadata_file: File,
#[cfg(windows)]
pub(super) metadata_file: UnbufferedIoFileWindows,
pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
pub(super) handlers: &'a Handlers,
pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
pub(super) global_mutex: &'a AsyncMutex<()>,
pub(super) plotter: P,
}

pub(super) struct PlottingOptions<'a, NC, P> {
pub(super) metadata_header: PlotMetadataHeader,
pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC, P>,
}
Expand All @@ -122,6 +122,8 @@ where
{
let PlottingOptions {
mut metadata_header,
sectors_metadata,
sectors_being_modified,
mut sectors_to_plot_receiver,
sector_plotting_options,
} = plotting_options;
Expand All @@ -138,7 +140,12 @@ where
};

let sector_index = sector_to_plot.sector_index;
let sector_plotting_init_fut = plot_single_sector(sector_to_plot, &sector_plotting_options)
let sector_plotting_init_fut = plot_single_sector(
sector_to_plot,
&sector_plotting_options,
sectors_metadata,
sectors_being_modified,
)
.instrument(info_span!("", %sector_index))
.fuse();
let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut);
Expand All @@ -158,8 +165,10 @@ where
process_plotting_result(
maybe_sector_plotting_result?,
&mut metadata_header,
sectors_metadata,
sectors_being_modified,
&sector_plotting_options.metadata_file
)?;
).await?;
}
}
}
Expand All @@ -168,23 +177,28 @@ where
process_plotting_result(
maybe_sector_plotting_result?,
&mut metadata_header,
sectors_metadata,
sectors_being_modified,
&sector_plotting_options.metadata_file
)?;
).await?;
}
}
}

Ok(())
}

fn process_plotting_result(
async fn process_plotting_result(
sector_plotting_result: SectorPlottingResult,
metadata_header: &mut PlotMetadataHeader,
sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
#[cfg(not(windows))] metadata_file: &File,
#[cfg(windows)] metadata_file: &UnbufferedIoFileWindows,
) -> Result<(), PlottingError> {
let SectorPlottingResult {
sector_index,
sector_metadata,
replotting,
last_queued,
} = sector_plotting_result;
Expand All @@ -194,6 +208,19 @@ fn process_plotting_result(
metadata_file.write_all_at(&metadata_header.encode(), 0)?;
}

{
let mut sectors_metadata = sectors_metadata.write().await;
// If exists then we're replotting, otherwise we create sector for the first time
if let Some(existing_sector_metadata) = sectors_metadata.get_mut(sector_index as usize) {
*existing_sector_metadata = sector_metadata;
} else {
sectors_metadata.push(sector_metadata);
}
}

// Inform others that this sector is no longer being modified
sectors_being_modified.write().await.remove(&sector_index);

if last_queued {
if replotting {
info!("Replotting complete");
Expand All @@ -220,13 +247,16 @@ where

struct SectorPlottingResult {
sector_index: SectorIndex,
sector_metadata: SectorMetadataChecksummed,
replotting: bool,
last_queued: bool,
}

async fn plot_single_sector<'a, NC, P>(
sector_to_plot: SectorToPlot,
sector_plotting_options: &'a SectorPlottingOptions<'a, NC, P>,
sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
) -> Result<impl Future<Output = Result<SectorPlottingResult, PlottingError>> + 'a, PlottingError>
where
NC: NodeClient,
Expand All @@ -239,9 +269,7 @@ where
sector_size,
plot_file,
metadata_file,
sectors_metadata,
handlers,
sectors_being_modified,
global_mutex,
plotter,
} = sector_plotting_options;
Expand Down Expand Up @@ -368,17 +396,6 @@ where
}
};

{
let mut sectors_metadata = sectors_metadata.write().await;
// If exists then we're replotting, otherwise we create sector for the first time
if let Some(existing_sector_metadata) = sectors_metadata.get_mut(sector_index as usize)
{
*existing_sector_metadata = plotted_sector.sector_metadata.clone();
} else {
sectors_metadata.push(plotted_sector.sector_metadata.clone());
}
}

let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| {
let old_history_size = old_sector_metadata.history_size;

Expand All @@ -405,15 +422,14 @@ where
}
});

// Inform others that this sector is no longer being modified
sectors_being_modified.write().await.remove(&sector_index);

if replotting {
debug!("Sector replotted successfully");
} else {
debug!("Sector plotted successfully");
}

let sector_metadata = plotted_sector.sector_metadata.clone();

let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished {
plotted_sector,
old_plotted_sector: maybe_old_plotted_sector,
Expand All @@ -425,6 +441,7 @@ where

Ok(SectorPlottingResult {
sector_index,
sector_metadata,
replotting,
last_queued,
})
Expand Down

0 comments on commit 817d45b

Please sign in to comment.