From 56d3fd39bf744f07bab58017f5736d50ef4b169b Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 30 Jan 2024 03:20:58 +0200 Subject: [PATCH 1/4] Early exit from piece cache reading if hit vacant piece cache entry --- .../src/single_disk_farm/piece_cache.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index c658e5ee44..45d763f770 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -113,11 +113,22 @@ impl DiskPieceCache { ) -> impl ExactSizeIterator)> + '_ { let file = &self.inner.file; let mut element = vec![0; Self::element_size()]; + let mut early_exit = false; - // TODO: Stop early on first missing entry (0..self.inner.num_elements).map(move |offset| { + if early_exit { + return (Offset(offset), None); + } + match Self::read_piece_internal(file, offset, &mut element) { - Ok(maybe_piece_index) => (Offset(offset), maybe_piece_index), + Ok(maybe_piece_index) => { + if maybe_piece_index.is_none() { + // End of stored pieces, no need to read further + early_exit = true; + } + + (Offset(offset), maybe_piece_index) + } Err(error) => { warn!(%error, %offset, "Failed to read cache element"); (Offset(offset), None) From 59d84447e2156ee67433e92947601c9889e89943 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 30 Jan 2024 05:17:05 +0200 Subject: [PATCH 2/4] Allow networking stack to shut down gracefully when fully occupied with work --- crates/subspace-networking/src/node_runner.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 22bcdbbbe4..7e7b918960 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -42,6 +42,7 @@ use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use std::time::Duration; +use tokio::task::yield_now; use tokio::time::Sleep; use tracing::{debug, error, trace, warn}; @@ -295,6 +296,9 @@ where self.handle_removed_address_event(event); }, } + + // Allow to exit from busy loop during graceful shutdown + yield_now().await; } } From 421f938f05d988410effe04285cd9c8d4b4e3778 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 30 Jan 2024 05:17:43 +0200 Subject: [PATCH 3/4] More graceful farmer shutdown by dropping things in a specific order --- .../src/bin/subspace-farmer/commands/farm.rs | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index b2eea42756..4c3c6610eb 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -432,14 +432,14 @@ where Arc::clone(&readers_and_pieces), )); - let _piece_cache_worker = run_future_in_dedicated_thread( + let piece_cache_worker_fut = run_future_in_dedicated_thread( { let future = piece_cache_worker.run(piece_getter.clone()); move || future }, "cache-worker".to_string(), - ); + )?; let mut single_disk_farms = Vec::with_capacity(disk_farms.len()); let max_pieces_in_sector = match max_pieces_in_sector { @@ -774,7 +774,7 @@ where // event handlers drop(readers_and_pieces); - let farm_fut = pin!(run_future_in_dedicated_thread( + let farm_fut = run_future_in_dedicated_thread( move || async move { while let Some(result) = single_disk_farms_stream.next().await { let id = result?; @@ -784,25 +784,39 @@ where anyhow::Ok(()) }, "farmer-farm".to_string(), - )?); + )?; - let networking_fut = pin!(run_future_in_dedicated_thread( + let networking_fut = run_future_in_dedicated_thread( move || async move { node_runner.run().await }, "farmer-networking".to_string(), - )?); + )?; + + // This defines order in which things are dropped + let networking_fut = networking_fut; + let farm_fut = farm_fut; + let piece_cache_worker_fut = piece_cache_worker_fut; + + let networking_fut = pin!(networking_fut); + let farm_fut = pin!(farm_fut); + let piece_cache_worker_fut = pin!(piece_cache_worker_fut); futures::select!( // Signal future _ = signal.fuse() => {}, + // Networking future + _ = networking_fut.fuse() => { + info!("Node runner exited.") + }, + // Farm future result = farm_fut.fuse() => { result??; }, - // Node runner future - _ = networking_fut.fuse() => { - info!("Node runner exited.") + // Piece cache worker future + _ = piece_cache_worker_fut.fuse() => { + info!("Piece cache worker exited.") }, ); From 2f732be93eadee28db5019f4c9af73f25af0dd76 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 30 Jan 2024 05:27:54 +0200 Subject: [PATCH 4/4] Run piece verification in blocking tasks rather than directly in async context --- .../src/utils/piece_validator.rs | 91 ++++++++++--------- .../src/sync_from_dsn/piece_validator.rs | 45 +++++---- 2 files changed, 75 insertions(+), 61 deletions(-) diff --git a/crates/subspace-farmer/src/utils/piece_validator.rs b/crates/subspace-farmer/src/utils/piece_validator.rs index 446acf612e..824a02e0e1 100644 --- a/crates/subspace-farmer/src/utils/piece_validator.rs +++ b/crates/subspace-farmer/src/utils/piece_validator.rs @@ -46,56 +46,65 @@ where piece_index: PieceIndex, piece: Piece, ) -> Option { - if source_peer_id != self.dsn_node.id() { - let segment_index = piece_index.segment_index(); + if source_peer_id == self.dsn_node.id() { + return Some(piece); + } - let maybe_segment_commitment = self - .segment_commitment_cache - .lock() - .get(&segment_index) - .copied(); - let segment_commitment = match maybe_segment_commitment { - Some(segment_commitment) => segment_commitment, - None => { - let segment_headers = - match self.node_client.segment_headers(vec![segment_index]).await { - Ok(segment_headers) => segment_headers, - Err(error) => { - error!( - %piece_index, - ?error, - "Failed tor retrieve segment headers from node" - ); - return None; - } - }; + let segment_index = piece_index.segment_index(); - let segment_commitment = match segment_headers.into_iter().next().flatten() { - Some(segment_header) => segment_header.segment_commitment(), - None => { + let maybe_segment_commitment = self + .segment_commitment_cache + .lock() + .get(&segment_index) + .copied(); + let segment_commitment = match maybe_segment_commitment { + Some(segment_commitment) => segment_commitment, + None => { + let segment_headers = + match self.node_client.segment_headers(vec![segment_index]).await { + Ok(segment_headers) => segment_headers, + Err(error) => { error!( %piece_index, - %segment_index, - "Segment commitment for segment index wasn't found on node" + ?error, + "Failed tor retrieve segment headers from node" ); return None; } }; - self.segment_commitment_cache - .lock() - .push(segment_index, segment_commitment); + let segment_commitment = match segment_headers.into_iter().next().flatten() { + Some(segment_header) => segment_header.segment_commitment(), + None => { + error!( + %piece_index, + %segment_index, + "Segment commitment for segment index wasn't found on node" + ); + return None; + } + }; + + self.segment_commitment_cache + .lock() + .push(segment_index, segment_commitment); + + segment_commitment + } + }; - segment_commitment - } - }; + let is_valid_fut = tokio::task::spawn_blocking({ + let kzg = self.kzg.clone(); - if !is_piece_valid( - &self.kzg, - &piece, - &segment_commitment, - piece_index.position(), - ) { + move || { + is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position()) + .then_some(piece) + } + }); + + match is_valid_fut.await.unwrap_or_default() { + Some(piece) => Some(piece), + None => { warn!( %piece_index, %source_peer_id, @@ -104,10 +113,8 @@ where // We don't care about result here let _ = self.dsn_node.ban_peer(source_peer_id).await; - return None; + None } } - - Some(piece) } } diff --git a/crates/subspace-service/src/sync_from_dsn/piece_validator.rs b/crates/subspace-service/src/sync_from_dsn/piece_validator.rs index fb5a569863..c79ecdba1d 100644 --- a/crates/subspace-service/src/sync_from_dsn/piece_validator.rs +++ b/crates/subspace-service/src/sync_from_dsn/piece_validator.rs @@ -44,25 +44,34 @@ where piece_index: PieceIndex, piece: Piece, ) -> Option { - if source_peer_id != self.dsn_node.id() { - let segment_index = piece_index.segment_index(); + if source_peer_id == self.dsn_node.id() { + return Some(piece); + } + + let segment_index = piece_index.segment_index(); + + let maybe_segment_header = self.segment_headers_store.get_segment_header(segment_index); + let segment_commitment = match maybe_segment_header { + Some(segment_header) => segment_header.segment_commitment(), + None => { + error!(%segment_index, "No segment commitment in the cache."); - let maybe_segment_header = self.segment_headers_store.get_segment_header(segment_index); - let segment_commitment = match maybe_segment_header { - Some(segment_header) => segment_header.segment_commitment(), - None => { - error!(%segment_index, "No segment commitment in the cache."); + return None; + } + }; - return None; - } - }; + let is_valid_fut = tokio::task::spawn_blocking({ + let kzg = self.kzg.clone(); - if !is_piece_valid( - self.kzg, - &piece, - &segment_commitment, - piece_index.position(), - ) { + move || { + is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position()) + .then_some(piece) + } + }); + + match is_valid_fut.await.unwrap_or_default() { + Some(piece) => Some(piece), + None => { warn!( %piece_index, %source_peer_id, @@ -71,10 +80,8 @@ where // We don't care about result here let _ = self.dsn_node.ban_peer(source_peer_id).await; - return None; + None } } - - Some(piece) } }