Skip to content

Commit

Permalink
Merge pull request #3271 from autonomys/improve-cache-sync
Browse files Browse the repository at this point in the history
Skip writing to problematic piece caches, add request retry timeout
  • Loading branch information
nazar-pc authored Dec 2, 2024
2 parents a8ce875 + 58778d5 commit 2be1461
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ use prometheus_client::registry::Registry;
use std::env::current_exe;
use std::mem;
use std::net::SocketAddr;
use std::time::Duration;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::utils::AsyncJoinOnDrop;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_proof_of_space::Table;

const REQUEST_RETRY_MAX_ELAPSED_TIME: Duration = Duration::from_mins(1);

/// Arguments for cluster
#[derive(Debug, Parser)]
pub(crate) struct ClusterArgs {
Expand Down Expand Up @@ -101,7 +104,7 @@ where
let nats_client = NatsClient::new(
nats_servers,
ExponentialBackoff {
max_elapsed_time: None,
max_elapsed_time: Some(REQUEST_RETRY_MAX_ELAPSED_TIME),
..ExponentialBackoff::default()
},
)
Expand Down
47 changes: 32 additions & 15 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::channel::mpsc;
use futures::future::FusedFuture;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt};
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use prometheus_client::registry::Registry;
use rayon::prelude::*;
use std::collections::hash_map::Entry;
Expand Down Expand Up @@ -541,6 +541,7 @@ where
let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();

let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
let ignored_cache_indices = &RwLock::new(HashSet::new());

let downloading_pieces_stream =
stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
Expand Down Expand Up @@ -571,15 +572,16 @@ where

let piece = match result {
Ok(Some(piece)) => {
trace!(%piece_index, "Downloaded piece successfully");
trace!(%batch, %piece_index, "Downloaded piece successfully");
piece
}
Ok(None) => {
debug!(%piece_index, "Couldn't find piece");
debug!(%batch, %piece_index, "Couldn't find piece");
continue;
}
Err(error) => {
debug!(
%batch,
%error,
%piece_index,
"Failed to get piece for piece cache"
Expand All @@ -596,6 +598,7 @@ where
// Find plot in which there is a place for new piece to be stored
let Some(offset) = caches.pop_free_offset() else {
error!(
%batch,
%piece_index,
"Failed to store piece in cache, there was no space"
);
Expand All @@ -608,23 +611,37 @@ where
let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;

if let Some(backend) = maybe_backend
&& let Err(error) =
backend.write_piece(piece_offset, piece_index, &piece).await
{
// TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly
error!(
%error,
let skip_write = ignored_cache_indices.read().contains(&cache_index);
if skip_write {
trace!(
%batch,
%cache_index,
%piece_index,
%piece_offset,
"Failed to write piece into cache"
"Skipping known problematic cache index"
);
continue;
}
} else {
if let Some(backend) = maybe_backend
&& let Err(error) =
backend.write_piece(piece_offset, piece_index, &piece).await
{
error!(
%error,
%batch,
%cache_index,
%piece_index,
%piece_offset,
"Failed to write piece into cache, ignoring this cache going \
forward"
);
ignored_cache_indices.write().insert(cache_index);
continue;
}

let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
caches.lock().push_stored_piece(key, offset);
let key =
KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
caches.lock().push_stored_piece(key, offset);
}

let prev_downloaded_pieces_count =
downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
Expand Down

0 comments on commit 2be1461

Please sign in to comment.