Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into remove-extra-boxing
Browse files Browse the repository at this point in the history
# Conflicts:
#	crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
  • Loading branch information
nazar-pc committed Dec 11, 2023
2 parents 8440b5e + 21faadb commit 0a2b00d
Show file tree
Hide file tree
Showing 9 changed files with 703 additions and 114 deletions.
8 changes: 7 additions & 1 deletion crates/pallet-domains/src/staking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,13 @@ where
{
for (operator_id, reason) in operator_ids {
Operators::<T>::try_mutate(operator_id, |maybe_operator| {
let operator = maybe_operator.as_mut().ok_or(Error::UnknownOperator)?;
let operator = match maybe_operator.as_mut() {
// If the operator is already slashed and removed due to fraud proof, when the operator
// is slash again due to invalid bundle, which happen after the ER is confirmed, we can
// not find the operator here thus just return.
None => return Ok(()),
Some(operator) => operator,
};
let mut pending_slashes =
PendingSlashes::<T>::get(operator.current_domain_id).unwrap_or_default();

Expand Down
12 changes: 8 additions & 4 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,11 @@ where
));

let _piece_cache_worker = run_future_in_dedicated_thread(
Box::pin(piece_cache_worker.run(piece_getter.clone())),
{
let future = piece_cache_worker.run(piece_getter.clone());

move || future
},
"cache-worker".to_string(),
);

Expand Down Expand Up @@ -600,19 +604,19 @@ where
drop(readers_and_pieces);

let farm_fut = pin!(run_future_in_dedicated_thread(
Box::pin(async move {
move || async move {
while let Some(result) = single_disk_farms_stream.next().await {
let id = result?;

info!(%id, "Farm exited successfully");
}
anyhow::Ok(())
}),
},
"farmer-farm".to_string(),
)?);

let networking_fut = pin!(run_future_in_dedicated_thread(
Box::pin(async move { node_runner.run().await }),
move || async move { node_runner.run().await },
"farmer-networking".to_string(),
)?);

Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(
array_chunks,
assert_matches,
const_option,
hash_extract_if,
impl_trait_in_assoc_type,
Expand Down
111 changes: 62 additions & 49 deletions crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#[cfg(test)]
mod tests;

use crate::node_client::NodeClient;
use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset};
use crate::utils::AsyncJoinOnDrop;
Expand All @@ -11,7 +14,7 @@ use std::collections::HashMap;
use std::num::NonZeroU16;
use std::sync::Arc;
use std::{fmt, mem};
use subspace_core_primitives::{Piece, PieceIndex, SegmentIndex};
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey};
use subspace_networking::libp2p::PeerId;
Expand Down Expand Up @@ -108,6 +111,21 @@ where
return;
}

let mut segment_headers_notifications =
match self.node_client.subscribe_archived_segment_headers().await {
Ok(segment_headers_notifications) => segment_headers_notifications,
Err(error) => {
error!(%error, "Failed to subscribe to archived segments notifications");
return;
}
};

// Keep up with segment indices that were potentially created since reinitialization,
// depending on the size of the diff this may pause block production for a while (due to
// subscription we have created above)
self.keep_up_after_initial_sync(&piece_getter, &mut worker_state)
.await;

loop {
select! {
maybe_command = worker_receiver.recv().fuse() => {
Expand All @@ -118,10 +136,14 @@ where

self.handle_command(command, &piece_getter, &mut worker_state).await;
}
_ = self.keep_up_sync(&piece_getter, &mut worker_state).fuse() => {
// Keep-up sync only ends with subscription, which lasts for duration of an
// instance
return;
maybe_segment_header = segment_headers_notifications.next().fuse() => {
if let Some(segment_header) = maybe_segment_header {
self.process_segment_header(segment_header, &mut worker_state).await;
} else {
// Keep-up sync only ends with subscription, which lasts for duration of an
// instance
return;
}
}
}
}
Expand Down Expand Up @@ -158,17 +180,26 @@ where
// Making offset as unoccupied and remove corresponding key from heap
cache.free_offsets.push(offset);
match cache.backend.read_piece_index(offset) {
Some(piece_index) => {
Ok(Some(piece_index)) => {
worker_state.heap.remove(KeyWrapper(piece_index));
}
None => {
Ok(None) => {
warn!(
%disk_farm_index,
%offset,
"Piece index out of range, this is likely an implementation bug, \
not freeing heap element"
);
}
Err(error) => {
error!(
%error,
%disk_farm_index,
?key,
%offset,
"Error while reading piece from cache, might be a disk corruption"
);
}
}
return;
}
Expand Down Expand Up @@ -392,33 +423,15 @@ where
info!("Finished piece cache synchronization");
}

async fn keep_up_sync<PG>(&self, piece_getter: &PG, worker_state: &mut CacheWorkerState)
where
PG: PieceGetter,
{
let mut segment_headers_notifications =
match self.node_client.subscribe_archived_segment_headers().await {
Ok(segment_headers_notifications) => segment_headers_notifications,
Err(error) => {
error!(%error, "Failed to subscribe to archived segments notifications");
return;
}
};

// Keep up with segment indices that were potentially created since reinitialization,
// depending on the size of the diff this may pause block production for a while (due to
// subscription we have created above)
self.keep_up_after_initial_sync(piece_getter, worker_state)
.await;

while let Some(segment_header) = segment_headers_notifications.next().await {
let segment_index = segment_header.segment_index();
debug!(%segment_index, "Starting to process newly archived segment");

if worker_state.last_segment_index >= segment_index {
continue;
}
async fn process_segment_header(
&self,
segment_header: SegmentHeader,
worker_state: &mut CacheWorkerState,
) {
let segment_index = segment_header.segment_index();
debug!(%segment_index, "Starting to process newly archived segment");

if worker_state.last_segment_index < segment_index {
// TODO: Can probably do concurrency here
for piece_index in segment_index.segment_piece_indexes() {
if !worker_state
Expand Down Expand Up @@ -460,22 +473,22 @@ where
}

worker_state.last_segment_index = segment_index;
}

match self
.node_client
.acknowledge_archived_segment_header(segment_index)
.await
{
Ok(()) => {
debug!(%segment_index, "Acknowledged archived segment");
}
Err(error) => {
error!(%segment_index, ?error, "Failed to acknowledge archived segment");
}
};
match self
.node_client
.acknowledge_archived_segment_header(segment_index)
.await
{
Ok(()) => {
debug!(%segment_index, "Acknowledged archived segment");
}
Err(error) => {
error!(%segment_index, ?error, "Failed to acknowledge archived segment");
}
};

debug!(%segment_index, "Finished processing newly archived segment");
}
debug!(%segment_index, "Finished processing newly archived segment");
}

async fn keep_up_after_initial_sync<PG>(
Expand Down Expand Up @@ -514,12 +527,12 @@ where
for piece_index in piece_indices {
let key = KeyWrapper(piece_index);
if !worker_state.heap.should_include_key(key) {
trace!(%piece_index, "Piece doesn't need to be cached #1");
trace!(%piece_index, "Piece doesn't need to be cached #2");

continue;
}

trace!(%piece_index, "Piece needs to be cached #1");
trace!(%piece_index, "Piece needs to be cached #2");

let result = piece_getter
.get_piece(
Expand Down
Loading

0 comments on commit 0a2b00d

Please sign in to comment.