Skip to content

Commit

Permalink
Merge pull request #2479 from subspace/piece-cache-improvements
Browse files Browse the repository at this point in the history
Piece cache improvements
  • Loading branch information
nazar-pc authored Jan 30, 2024
2 parents 7471ac2 + 2f732be commit 37e0b93
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 72 deletions.
32 changes: 23 additions & 9 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,14 @@ where
Arc::clone(&readers_and_pieces),
));

let _piece_cache_worker = run_future_in_dedicated_thread(
let piece_cache_worker_fut = run_future_in_dedicated_thread(
{
let future = piece_cache_worker.run(piece_getter.clone());

move || future
},
"cache-worker".to_string(),
);
)?;

let mut single_disk_farms = Vec::with_capacity(disk_farms.len());
let max_pieces_in_sector = match max_pieces_in_sector {
Expand Down Expand Up @@ -774,7 +774,7 @@ where
// event handlers
drop(readers_and_pieces);

let farm_fut = pin!(run_future_in_dedicated_thread(
let farm_fut = run_future_in_dedicated_thread(
move || async move {
while let Some(result) = single_disk_farms_stream.next().await {
let id = result?;
Expand All @@ -784,25 +784,39 @@ where
anyhow::Ok(())
},
"farmer-farm".to_string(),
)?);
)?;

let networking_fut = pin!(run_future_in_dedicated_thread(
let networking_fut = run_future_in_dedicated_thread(
move || async move { node_runner.run().await },
"farmer-networking".to_string(),
)?);
)?;

// This defines order in which things are dropped
let networking_fut = networking_fut;
let farm_fut = farm_fut;
let piece_cache_worker_fut = piece_cache_worker_fut;

let networking_fut = pin!(networking_fut);
let farm_fut = pin!(farm_fut);
let piece_cache_worker_fut = pin!(piece_cache_worker_fut);

futures::select!(
// Signal future
_ = signal.fuse() => {},

// Networking future
_ = networking_fut.fuse() => {
info!("Node runner exited.")
},

// Farm future
result = farm_fut.fuse() => {
result??;
},

// Node runner future
_ = networking_fut.fuse() => {
info!("Node runner exited.")
// Piece cache worker future
_ = piece_cache_worker_fut.fuse() => {
info!("Piece cache worker exited.")
},
);

Expand Down
15 changes: 13 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,22 @@ impl DiskPieceCache {
) -> impl ExactSizeIterator<Item = (Offset, Option<PieceIndex>)> + '_ {
let file = &self.inner.file;
let mut element = vec![0; Self::element_size()];
let mut early_exit = false;

// TODO: Stop early on first missing entry
(0..self.inner.num_elements).map(move |offset| {
if early_exit {
return (Offset(offset), None);
}

match Self::read_piece_internal(file, offset, &mut element) {
Ok(maybe_piece_index) => (Offset(offset), maybe_piece_index),
Ok(maybe_piece_index) => {
if maybe_piece_index.is_none() {
// End of stored pieces, no need to read further
early_exit = true;
}

(Offset(offset), maybe_piece_index)
}
Err(error) => {
warn!(%error, %offset, "Failed to read cache element");
(Offset(offset), None)
Expand Down
91 changes: 49 additions & 42 deletions crates/subspace-farmer/src/utils/piece_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,56 +46,65 @@ where
piece_index: PieceIndex,
piece: Piece,
) -> Option<Piece> {
if source_peer_id != self.dsn_node.id() {
let segment_index = piece_index.segment_index();
if source_peer_id == self.dsn_node.id() {
return Some(piece);
}

let maybe_segment_commitment = self
.segment_commitment_cache
.lock()
.get(&segment_index)
.copied();
let segment_commitment = match maybe_segment_commitment {
Some(segment_commitment) => segment_commitment,
None => {
let segment_headers =
match self.node_client.segment_headers(vec![segment_index]).await {
Ok(segment_headers) => segment_headers,
Err(error) => {
error!(
%piece_index,
?error,
"Failed tor retrieve segment headers from node"
);
return None;
}
};
let segment_index = piece_index.segment_index();

let segment_commitment = match segment_headers.into_iter().next().flatten() {
Some(segment_header) => segment_header.segment_commitment(),
None => {
let maybe_segment_commitment = self
.segment_commitment_cache
.lock()
.get(&segment_index)
.copied();
let segment_commitment = match maybe_segment_commitment {
Some(segment_commitment) => segment_commitment,
None => {
let segment_headers =
match self.node_client.segment_headers(vec![segment_index]).await {
Ok(segment_headers) => segment_headers,
Err(error) => {
error!(
%piece_index,
%segment_index,
"Segment commitment for segment index wasn't found on node"
?error,
"Failed tor retrieve segment headers from node"
);
return None;
}
};

self.segment_commitment_cache
.lock()
.push(segment_index, segment_commitment);
let segment_commitment = match segment_headers.into_iter().next().flatten() {
Some(segment_header) => segment_header.segment_commitment(),
None => {
error!(
%piece_index,
%segment_index,
"Segment commitment for segment index wasn't found on node"
);
return None;
}
};

self.segment_commitment_cache
.lock()
.push(segment_index, segment_commitment);

segment_commitment
}
};

segment_commitment
}
};
let is_valid_fut = tokio::task::spawn_blocking({
let kzg = self.kzg.clone();

if !is_piece_valid(
&self.kzg,
&piece,
&segment_commitment,
piece_index.position(),
) {
move || {
is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position())
.then_some(piece)
}
});

match is_valid_fut.await.unwrap_or_default() {
Some(piece) => Some(piece),
None => {
warn!(
%piece_index,
%source_peer_id,
Expand All @@ -104,10 +113,8 @@ where

// We don't care about result here
let _ = self.dsn_node.ban_peer(source_peer_id).await;
return None;
None
}
}

Some(piece)
}
}
4 changes: 4 additions & 0 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::task::yield_now;
use tokio::time::Sleep;
use tracing::{debug, error, trace, warn};

Expand Down Expand Up @@ -295,6 +296,9 @@ where
self.handle_removed_address_event(event);
},
}

// Allow to exit from busy loop during graceful shutdown
yield_now().await;
}
}

Expand Down
45 changes: 26 additions & 19 deletions crates/subspace-service/src/sync_from_dsn/piece_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,34 @@ where
piece_index: PieceIndex,
piece: Piece,
) -> Option<Piece> {
if source_peer_id != self.dsn_node.id() {
let segment_index = piece_index.segment_index();
if source_peer_id == self.dsn_node.id() {
return Some(piece);
}

let segment_index = piece_index.segment_index();

let maybe_segment_header = self.segment_headers_store.get_segment_header(segment_index);
let segment_commitment = match maybe_segment_header {
Some(segment_header) => segment_header.segment_commitment(),
None => {
error!(%segment_index, "No segment commitment in the cache.");

let maybe_segment_header = self.segment_headers_store.get_segment_header(segment_index);
let segment_commitment = match maybe_segment_header {
Some(segment_header) => segment_header.segment_commitment(),
None => {
error!(%segment_index, "No segment commitment in the cache.");
return None;
}
};

return None;
}
};
let is_valid_fut = tokio::task::spawn_blocking({
let kzg = self.kzg.clone();

if !is_piece_valid(
self.kzg,
&piece,
&segment_commitment,
piece_index.position(),
) {
move || {
is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position())
.then_some(piece)
}
});

match is_valid_fut.await.unwrap_or_default() {
Some(piece) => Some(piece),
None => {
warn!(
%piece_index,
%source_peer_id,
Expand All @@ -71,10 +80,8 @@ where

// We don't care about result here
let _ = self.dsn_node.ban_peer(source_peer_id).await;
return None;
None
}
}

Some(piece)
}
}

0 comments on commit 37e0b93

Please sign in to comment.