Skip to content

Commit

Permalink
Switch to async piece getter API
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Sep 9, 2024
1 parent c832e67 commit dbe8e35
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 41 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 8 additions & 17 deletions crates/subspace-object-fetching/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,19 @@ include = [
bench = false

[dependencies]
parity-scale-codec = { version = "3.6.12", default-features = false, features = ["derive"] }
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving", default-features = false }
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives", default-features = false }
subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding", default-features = false }
thiserror = { version = "1.0.63", optional = true }
tracing = { version = "0.1.40", default-features = false }
async-trait = "0.1.81"
parity-scale-codec = { version = "3.6.12", features = ["derive"] }
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" }
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" }
subspace-networking = { version = "0.1.0", path = "../subspace-networking" }
thiserror = { version = "1.0.63" }
tracing = { version = "0.1.40" }

[dev-dependencies]
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }

[features]
default = ["std"]
parallel = [
"subspace-archiving/parallel",
"subspace-core-primitives/parallel",
]
std = [
"parity-scale-codec/std",
"parallel",
"subspace-archiving/std",
"subspace-core-primitives/std",
"subspace-erasure-coding/std",
"thiserror",
"tracing/std",
]
3 changes: 2 additions & 1 deletion crates/subspace-object-fetching/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

//! Fetching archived objects from the Subspace Distributed Storage Network.
#![cfg_attr(not(feature = "std"), no_std)]
pub mod object_fetcher;

pub use object_fetcher::{Error, ObjectFetcher, ObjectPieceGetter};
76 changes: 53 additions & 23 deletions crates/subspace-object-fetching/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@

//! Fetching objects stored in the archived state of Subspace Network.
use async_trait::async_trait;
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
use std::fmt;
use std::sync::Arc;
use subspace_archiving::archiver::{Segment, SegmentItem};
use subspace_core_primitives::objects::GlobalObject;
use subspace_core_primitives::{
ArchivedHistorySegment, Piece, PieceIndex, RawRecord, Record, RecordedHistorySegment,
SegmentIndex,
};
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use tracing::error;

/// Maximum expected size of one object in bytes
Expand All @@ -38,42 +41,68 @@ pub enum Error {
}

/// Something that can be used to get decoded pieces by index
pub trait PieceGetter {
/// Get piece
fn get_piece(&self, piece_index: PieceIndex) -> Option<Piece>;
#[async_trait]
pub trait ObjectPieceGetter: fmt::Debug {
/// Get piece by index
async fn get_piece(&self, piece_index: PieceIndex) -> Option<Piece>;
}

impl<PG> PieceGetter for Vec<PG>
#[async_trait]
impl<PG> ObjectPieceGetter for Vec<PG>
where
PG: PieceGetter,
PG: ObjectPieceGetter + Sync,
{
fn get_piece(&self, piece_index: PieceIndex) -> Option<Piece> {
self.iter()
.find_map(|piece_getter| piece_getter.get_piece(piece_index))
async fn get_piece(&self, piece_index: PieceIndex) -> Option<Piece> {
for piece_getter in self.iter() {
if let Some(piece) = piece_getter.get_piece(piece_index).await {
return Some(piece);
}
}

None
}
}

#[async_trait]
impl<PG> ObjectPieceGetter for Arc<PG>
where
PG: ObjectPieceGetter + Send + Sync + ?Sized,
{
async fn get_piece(&self, piece_index: PieceIndex) -> Option<Piece> {
self.as_ref().get_piece(piece_index).await
}
}

#[async_trait]
impl<PV> ObjectPieceGetter for PieceProvider<PV>
where
PV: PieceValidator,
{
async fn get_piece(&self, piece_index: PieceIndex) -> Option<Piece> {
self.get_piece_from_cache(piece_index).await
}
}

/// Object fetcher from the Subspace DSN.
#[derive(Clone)]
pub struct ObjectFetcher {
pub piece_getter: Arc<dyn PieceGetter + Send + Sync + 'static>,
pub piece_getter: Arc<dyn ObjectPieceGetter + Send + Sync + 'static>,
}

impl ObjectFetcher {
/// Assemble the object in `mapping` by fetching necessary pieces using the piece getter and
/// putting the object's bytes together.
pub fn fetch_object(&self, mapping: GlobalObject) -> Result<Vec<u8>, Error> {
pub async fn fetch_object(&self, mapping: GlobalObject) -> Result<Vec<u8>, Error> {
// Try fast object assembling
if let Some(data) = self.fetch_object_fast(mapping)? {
if let Some(data) = self.fetch_object_fast(mapping).await? {
return Ok(data);
}

self.fetch_object_regular(mapping)
self.fetch_object_regular(mapping).await
}

/// Fast object fetching and assembling where the object doesn't cross piece (super fast) or
/// segment (just fast) boundaries, returns `Ok(None)` if fast retrieval is not guaranteed.
fn fetch_object_fast(&self, mapping: GlobalObject) -> Result<Option<Vec<u8>>, Error> {
async fn fetch_object_fast(&self, mapping: GlobalObject) -> Result<Option<Vec<u8>>, Error> {
// We care if the offset is before the last 2 bytes of a piece because if not we might be
// able to do very fast object retrieval without assembling and processing the whole
// segment. `-2` is because last 2 bytes might contain padding if a piece is the last piece
Expand Down Expand Up @@ -110,7 +139,7 @@ impl ObjectFetcher {
let mut read_records_data = Vec::<u8>::with_capacity(RawRecord::SIZE * 2);
let mut next_piece_index = mapping.piece_index;

let raw_piece_bytes = self.read_and_decode_piece(next_piece_index)?;
let raw_piece_bytes = self.read_and_decode_piece(next_piece_index).await?;
next_piece_index += PieceIndex::ONE;
read_records_data.extend_from_slice(&raw_piece_bytes);

Expand Down Expand Up @@ -143,7 +172,7 @@ impl ObjectFetcher {
} else if !last_data_piece_in_segment {
if !length_before_record_end {
// Need the next piece to read the length of data
let raw_piece_bytes = self.read_and_decode_piece(next_piece_index)?;
let raw_piece_bytes = self.read_and_decode_piece(next_piece_index).await?;
next_piece_index += PieceIndex::ONE;
read_records_data.extend_from_slice(&raw_piece_bytes);
}
Expand Down Expand Up @@ -174,7 +203,7 @@ impl ObjectFetcher {

// Read more pieces until we have enough data
while data.len() <= data_length as usize {
let raw_piece_bytes = self.read_and_decode_piece(next_piece_index)?;
let raw_piece_bytes = self.read_and_decode_piece(next_piece_index).await?;
next_piece_index += PieceIndex::ONE;
data.extend_from_slice(&raw_piece_bytes);
}
Expand All @@ -187,15 +216,15 @@ impl ObjectFetcher {

/// Fetch and assemble an object that can cross segment boundaries, which requires assembling
/// and iterating over full segments.
fn fetch_object_regular(&self, mapping: GlobalObject) -> Result<Vec<u8>, Error> {
async fn fetch_object_regular(&self, mapping: GlobalObject) -> Result<Vec<u8>, Error> {
let segment_index = mapping.piece_index.segment_index();
let piece_position_in_segment = mapping.piece_index.position();
// Used to access the data after it is converted to raw bytes
let offset_in_segment = (u64::from(piece_position_in_segment) * RawRecord::SIZE as u64
+ u64::from(mapping.offset)) as usize;

let mut data = {
let Segment::V0 { items } = self.read_segment(segment_index)?;
let Segment::V0 { items } = self.read_segment(segment_index).await?;
// Unconditional progress is enum variant + compact encoding of number of elements
let mut progress = 1 + Compact::compact_len(&(items.len() as u64));
let segment_item = items
Expand Down Expand Up @@ -249,7 +278,7 @@ impl ObjectFetcher {
}

for segment_index in segment_index + SegmentIndex::ONE.. {
let Segment::V0 { items } = self.read_segment(segment_index)?;
let Segment::V0 { items } = self.read_segment(segment_index).await?;
for segment_item in items {
if let SegmentItem::BlockContinuation { bytes, .. } = segment_item {
data.extend_from_slice(&bytes);
Expand All @@ -274,7 +303,7 @@ impl ObjectFetcher {

/// Read the whole segment by its index (just records, skipping witnesses)
// TODO: Reconstruction assumes that all source pieces are available, which is not always the case
fn read_segment(&self, segment_index: SegmentIndex) -> Result<Segment, Error> {
async fn read_segment(&self, segment_index: SegmentIndex) -> Result<Segment, Error> {
let mut segment_bytes =
Vec::<u8>::with_capacity(ArchivedHistorySegment::NUM_PIECES * RawRecord::SIZE);

Expand All @@ -283,7 +312,7 @@ impl ObjectFetcher {
.into_iter()
.take(RecordedHistorySegment::NUM_RAW_RECORDS)
{
let raw_piece_bytes = self.read_and_decode_piece(piece_index)?;
let raw_piece_bytes = self.read_and_decode_piece(piece_index).await?;
segment_bytes.extend_from_slice(&raw_piece_bytes);
}

Expand All @@ -303,10 +332,11 @@ impl ObjectFetcher {
}

/// Read and decode the whole piece, returning its raw record bytes.
fn read_and_decode_piece(&self, piece_index: PieceIndex) -> Result<Vec<u8>, Error> {
async fn read_and_decode_piece(&self, piece_index: PieceIndex) -> Result<Vec<u8>, Error> {
let piece_getter = self.piece_getter.clone();
let piece = piece_getter
.get_piece(piece_index)
.await
.ok_or_else(|| Error::StringError(format!("Fetching piece {piece_index} failed")))?;
Ok(piece.record().to_raw_record_bytes().collect())
}
Expand Down

0 comments on commit dbe8e35

Please sign in to comment.