Skip to content

Commit

Permalink
Merge pull request #2438 from subspace/graceful-piece-cache-init-shut…
Browse files Browse the repository at this point in the history
…down

Graceful piece cache initialization shutdown
  • Loading branch information
nazar-pc authored Jan 24, 2024
2 parents 58f5dd8 + 9356f9a commit 4543684
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 28 deletions.
97 changes: 69 additions & 28 deletions crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ mod tests;

use crate::node_client::NodeClient;
use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset};
use crate::utils::AsyncJoinOnDrop;
use crate::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::oneshot;
use futures::stream::FuturesUnordered;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
use parking_lot::RwLock;
use rayon::prelude::*;
use std::collections::HashMap;
use std::num::NonZeroU16;
use std::sync::Arc;
Expand All @@ -21,6 +20,7 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWrapper, LocalRecordProvider, UniqueRecordBinaryHeap};
use tokio::sync::mpsc;
use tokio::task::yield_now;
use tracing::{debug, error, info, trace, warn};

const WORKER_CHANNEL_CAPACITY: usize = 100;
Expand Down Expand Up @@ -220,8 +220,10 @@ where
let cache_state = mem::take(&mut *self.caches.write());
let mut stored_pieces = Vec::with_capacity(new_caches.len());
let mut free_offsets = Vec::with_capacity(new_caches.len());
for state in cache_state {
for mut state in cache_state {
state.stored_pieces.clear();
stored_pieces.push(state.stored_pieces);
state.free_offsets.clear();
free_offsets.push(state.free_offsets);
}
stored_pieces.resize(new_caches.len(), HashMap::default());
Expand All @@ -230,35 +232,74 @@ where
debug!("Collecting pieces that were in the cache before");

// Build cache state of all backends
let mut caches = stored_pieces
.into_par_iter()
let maybe_caches_futures = stored_pieces
.into_iter()
.zip(free_offsets)
.zip(new_caches)
.map(|((mut stored_pieces, mut free_offsets), new_cache)| {
let contents = new_cache.contents();
stored_pieces.clear();
stored_pieces.reserve(contents.len());
free_offsets.clear();

for (offset, maybe_piece_index) in contents {
match maybe_piece_index {
Some(piece_index) => {
stored_pieces
.insert(RecordKey::from(piece_index.to_multihash()), offset);
}
None => {
free_offsets.push(offset);
}
}
.enumerate()
.map(
|(index, ((mut stored_pieces, mut free_offsets), new_cache))| {
run_future_in_dedicated_thread(
move || async {
let contents = new_cache.contents();
stored_pieces.reserve(contents.len());

for (offset, maybe_piece_index) in contents {
match maybe_piece_index {
Some(piece_index) => {
stored_pieces.insert(
RecordKey::from(piece_index.to_multihash()),
offset,
);
}
None => {
free_offsets.push(offset);
}
}

// Allow for task to be aborted
yield_now().await;
}

DiskPieceCacheState {
stored_pieces,
free_offsets,
backend: new_cache,
}
},
format!("piece-cache.{index}"),
)
},
)
.collect::<Result<Vec<_>, _>>();

let caches_futures = match maybe_caches_futures {
Ok(caches_futures) => caches_futures,
Err(error) => {
error!(
%error,
"Failed to spawn piece cache reading thread"
);

return;
}
};

let mut caches = Vec::with_capacity(caches_futures.len());
let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();

while let Some(maybe_cache) = caches_futures.next().await {
match maybe_cache {
Ok(cache) => {
caches.push(cache);
}
Err(_cancelled) => {
error!("Piece cache reading thread panicked");

DiskPieceCacheState {
stored_pieces,
free_offsets,
backend: new_cache,
return;
}
})
.collect::<Vec<_>>();
};
}

info!("Synchronizing piece cache");

Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/single_disk_farm/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl DiskPieceCache {
let file = &self.inner.file;
let mut element = vec![0; Self::element_size()];

// TODO: Stop early on first missing entry
(0..self.inner.num_elements).map(move |offset| {
match Self::read_piece_internal(file, offset, &mut element) {
Ok(maybe_piece_index) => (Offset(offset), maybe_piece_index),
Expand Down

0 comments on commit 4543684

Please sign in to comment.