Skip to content

Commit

Permalink
Merge pull request #2935 from subspace/piece-cache-scrub-warning
Browse files Browse the repository at this point in the history
Print warning on missing piece cache during scrubbing instead of error
  • Loading branch information
nazar-pc authored Jul 22, 2024
2 parents 69dda16 + 2cc8740 commit 9a20564
Showing 1 changed file with 99 additions and 96 deletions.
195 changes: 99 additions & 96 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,6 @@ pub enum SingleDiskFarmScrubError {
/// Unexpected metadata version
#[error("Unexpected metadata version {0}")]
UnexpectedMetadataVersion(u8),
/// Cache file does not exist
#[error("Cache file does not exist at {file}")]
CacheFileDoesNotExist {
/// Cache file
file: PathBuf,
},
/// Cache can't be opened
#[error("Cache at {file} can't be opened: {error}")]
CacheCantBeOpened {
Expand Down Expand Up @@ -2284,114 +2278,123 @@ impl SingleDiskFarm {
}

if target.cache() {
let file = directory.join(DiskPieceCache::FILE_NAME);
info!(path = %file.display(), "Checking cache file");
Self::scrub_cache(directory, dry_run)?;
}

let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
Ok(plot_file) => plot_file,
Err(error) => {
return Err(if error.kind() == io::ErrorKind::NotFound {
SingleDiskFarmScrubError::CacheFileDoesNotExist { file }
} else {
SingleDiskFarmScrubError::CacheCantBeOpened { file, error }
});
}
};
info!("Farm check completed");

// Error doesn't matter here
let _ = cache_file.advise_sequential_access();
Ok(())
}

let cache_size = match cache_file.size() {
Ok(metadata_size) => metadata_size,
Err(error) => {
return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
file,
error,
});
}
};
fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
let span = Span::current();

let element_size = DiskPieceCache::element_size();
let number_of_cached_elements = cache_size / u64::from(element_size);
let dummy_element = vec![0; element_size as usize];
(0..number_of_cached_elements)
.into_par_iter()
.map_with(vec![0; element_size as usize], |element, cache_offset| {
let _span_guard = span.enter();
let file = directory.join(DiskPieceCache::FILE_NAME);
info!(path = %file.display(), "Checking cache file");

let offset = cache_offset * u64::from(element_size);
if let Err(error) = cache_file.read_exact_at(element, offset) {
warn!(
path = %file.display(),
%cache_offset,
size = %element.len() as u64,
%offset,
%error,
"Failed to read cached piece, replacing with dummy element"
);
let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
Ok(plot_file) => plot_file,
Err(error) => {
return if error.kind() == io::ErrorKind::NotFound {
warn!(
file = %file.display(),
"Cache file does not exist, this is expected in farming cluster"
);
Ok(())
} else {
Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
};
}
};

if !dry_run {
if let Err(error) = cache_file.write_all_at(&dummy_element, offset) {
return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
file: file.clone(),
size: u64::from(element_size),
offset,
error,
});
}
}
// Error doesn't matter here
let _ = cache_file.advise_sequential_access();

return Ok(());
}
let cache_size = match cache_file.size() {
Ok(metadata_size) => metadata_size,
Err(error) => {
return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
}
};

let (index_and_piece_bytes, expected_checksum) =
element.split_at(element_size as usize - mem::size_of::<Blake3Hash>());
let actual_checksum = blake3_hash(index_and_piece_bytes);
if actual_checksum != expected_checksum && element != &dummy_element {
warn!(
%cache_offset,
actual_checksum = %hex::encode(actual_checksum),
expected_checksum = %hex::encode(expected_checksum),
"Cached piece checksum mismatch, replacing with dummy element"
);
let element_size = DiskPieceCache::element_size();
let number_of_cached_elements = cache_size / u64::from(element_size);
let dummy_element = vec![0; element_size as usize];
(0..number_of_cached_elements)
.into_par_iter()
.map_with(vec![0; element_size as usize], |element, cache_offset| {
let _span_guard = span.enter();

if !dry_run {
if let Err(error) = cache_file.write_all_at(&dummy_element, offset) {
return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
file: file.clone(),
size: u64::from(element_size),
offset,
error,
});
}
}
let offset = cache_offset * u64::from(element_size);
if let Err(error) = cache_file.read_exact_at(element, offset) {
warn!(
path = %file.display(),
%cache_offset,
size = %element.len() as u64,
%offset,
%error,
"Failed to read cached piece, replacing with dummy element"
);

return Ok(());
if !dry_run {
if let Err(error) = cache_file.write_all_at(&dummy_element, offset) {
return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
file: file.clone(),
size: u64::from(element_size),
offset,
error,
});
}
}

Ok(())
})
.try_for_each({
let span = &span;
let checked_elements = AtomicUsize::new(0);
return Ok(());
}

move |result| {
let _span_guard = span.enter();
let (index_and_piece_bytes, expected_checksum) =
element.split_at(element_size as usize - mem::size_of::<Blake3Hash>());
let actual_checksum = blake3_hash(index_and_piece_bytes);
if actual_checksum != expected_checksum && element != &dummy_element {
warn!(
%cache_offset,
actual_checksum = %hex::encode(actual_checksum),
expected_checksum = %hex::encode(expected_checksum),
"Cached piece checksum mismatch, replacing with dummy element"
);

let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
if checked_elements > 1 && checked_elements % 1000 == 0 {
info!(
"Checked {}/{} cache elements",
checked_elements, number_of_cached_elements
);
if !dry_run {
if let Err(error) = cache_file.write_all_at(&dummy_element, offset) {
return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
file: file.clone(),
size: u64::from(element_size),
offset,
error,
});
}
}

result
return Ok(());
}

Ok(())
})
.try_for_each({
let span = &span;
let checked_elements = AtomicUsize::new(0);

move |result| {
let _span_guard = span.enter();

let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
if checked_elements > 1 && checked_elements % 1000 == 0 {
info!(
"Checked {}/{} cache elements",
checked_elements, number_of_cached_elements
);
}
})?;
}

info!("Farm check completed");
result
}
})?;

Ok(())
}
Expand Down

0 comments on commit 9a20564

Please sign in to comment.