Skip to content

Commit

Permalink
do not write u64_le size prefix
Browse files Browse the repository at this point in the history
this makes this fully incompatible with 0.12 and requires a new version!
  • Loading branch information
rklaehn committed Apr 8, 2024
1 parent 40a030e commit a69452d
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 98 deletions.
3 changes: 1 addition & 2 deletions examples/encode_decode_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ const BLOCK_SIZE: BlockSize = BlockSize::from_chunk_log(4);
#[tokio::main]
async fn main() -> io::Result<()> {
// The file we want to serve
let mut file = tokio::fs::File::open("video.mp4").await?;
let mut file = iroh_io::File::open("video.mp4".into()).await?;
// Create an outboard for the file, using the current size
let mut ob = PreOrderOutboard::<BytesMut>::create(&mut file, BLOCK_SIZE).await?;
// Encode the first 100000 bytes of the file
let ranges = ByteRanges::from(0..100000);
let ranges = round_up_to_chunks(&ranges);
// Stream of data to client. Needs to implement `io::Write`. We just use a vec here.
let mut to_client = BytesMut::new();
let file = iroh_io::File::open("video.mp4".into()).await?;
encode_ranges_validated(file, &mut ob, &ranges, &mut to_client).await?;

// Stream of data from client. Needs to implement `io::Read`. We just wrap the vec in a cursor.
Expand Down
2 changes: 1 addition & 1 deletion examples/encode_decode_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() -> io::Result<()> {
root,
data: vec![],
};
decode_ranges(&ranges, from_server, &mut decoded, &mut ob)?;
decode_ranges(from_server, &ranges, &mut decoded, &mut ob)?;

// the first 100000 bytes of the file should now be in `decoded`
// in addition, the required part of the tree to validate that the data is
Expand Down
38 changes: 15 additions & 23 deletions src/io/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use blake3::guts::parent_cv;
use bytes::{Bytes, BytesMut};
use iroh_io::AsyncStreamWriter;
use smallvec::SmallVec;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

pub use super::BaoContentItem;
use crate::{
Expand Down Expand Up @@ -104,24 +104,19 @@ pub trait CreateOutboard {
/// This requires the outboard to have a default implementation, which is
/// the case for the memory implementations.
#[allow(async_fn_in_trait)]
async fn create(
mut data: impl AsyncRead + AsyncSeek + Unpin,
block_size: BlockSize,
) -> io::Result<Self>
async fn create(mut data: impl AsyncSliceReader, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized,
{
use tokio::io::AsyncSeekExt;
let size = data.seek(io::SeekFrom::End(0)).await?;
data.rewind().await?;
let size = data.len().await?;
Self::create_sized(data, size, block_size).await
}

/// create an outboard from a data source. This requires the outboard to
/// have a default implementation, which is the case for the memory
/// implementations.
fn create_sized(
data: impl AsyncRead + Unpin,
data: impl AsyncSliceReader,
size: u64,
block_size: BlockSize,
) -> impl Future<Output = io::Result<Self>>
Expand All @@ -135,7 +130,7 @@ pub trait CreateOutboard {
/// such as a file based one. It also does not require [AsyncSeek] on the data.
///
/// It will however only include data up the the current tree size.
fn init_from(&mut self, data: impl AsyncRead + Unpin) -> impl Future<Output = io::Result<()>>;
fn init_from(&mut self, data: impl AsyncSliceReader) -> impl Future<Output = io::Result<()>>;
}

impl<'b, O: Outboard> Outboard for &'b mut O {
Expand Down Expand Up @@ -235,7 +230,7 @@ impl<W: AsyncSliceWriter> OutboardMut for PostOrderOutboard<W> {

impl<W: AsyncSliceWriter> CreateOutboard for PreOrderOutboard<W> {
async fn create_sized(
data: impl AsyncRead + Unpin,
data: impl AsyncSliceReader,
size: u64,
block_size: BlockSize,
) -> io::Result<Self>
Expand All @@ -250,7 +245,7 @@ impl<W: AsyncSliceWriter> CreateOutboard for PreOrderOutboard<W> {
Ok(res)
}

async fn init_from(&mut self, data: impl AsyncRead + Unpin) -> io::Result<()> {
async fn init_from(&mut self, data: impl AsyncSliceReader) -> io::Result<()> {
let mut this = self;
let root = outboard(data, this.tree, &mut this).await?;
this.root = root;
Expand All @@ -261,7 +256,7 @@ impl<W: AsyncSliceWriter> CreateOutboard for PreOrderOutboard<W> {

impl<W: AsyncSliceWriter> CreateOutboard for PostOrderOutboard<W> {
async fn create_sized(
data: impl AsyncRead + Unpin,
data: impl AsyncSliceReader,
size: u64,
block_size: BlockSize,
) -> io::Result<Self>
Expand All @@ -276,7 +271,7 @@ impl<W: AsyncSliceWriter> CreateOutboard for PostOrderOutboard<W> {
Ok(res)
}

async fn init_from(&mut self, data: impl AsyncRead + Unpin) -> io::Result<()> {
async fn init_from(&mut self, data: impl AsyncSliceReader) -> io::Result<()> {
let mut this = self;
let root = outboard(data, this.tree, &mut this).await?;
this.root = root;
Expand Down Expand Up @@ -477,8 +472,6 @@ where
{
let mut encoded = encoded;
let tree = outboard.tree();
// write header
encoded.write(tree.size.to_le_bytes().as_slice()).await?;
for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
match item {
BaoChunk::Parent { node, .. } => {
Expand Down Expand Up @@ -530,8 +523,6 @@ where
let mut encoded = encoded;
let tree = outboard.tree();
let ranges = truncate_ranges(ranges, tree.size());
// write header
encoded.write(tree.size.to_le_bytes().as_slice()).await?;
for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
match item {
BaoChunk::Parent {
Expand Down Expand Up @@ -648,7 +639,7 @@ fn read_parent(buf: &[u8]) -> (blake3::Hash, blake3::Hash) {
/// Unlike [outboard_post_order], this will work with any outboard
/// implementation, but it is not guaranteed that writes are sequential.
pub async fn outboard(
data: impl AsyncRead + Unpin,
data: impl AsyncSliceReader,
tree: BaoTree,
mut outboard: impl OutboardMut,
) -> io::Result<blake3::Hash> {
Expand All @@ -660,11 +651,12 @@ pub async fn outboard(
/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size.
async fn outboard_impl(
tree: BaoTree,
mut data: impl AsyncRead + Unpin,
mut data: impl AsyncSliceReader,
mut outboard: impl OutboardMut,
buffer: &mut [u8],
) -> io::Result<blake3::Hash> {
// do not allocate for small trees
let mut offset: u64 = 0;
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
debug_assert!(buffer.len() == tree.chunk_group_bytes());
for item in tree.post_order_chunks_iter() {
Expand All @@ -682,9 +674,9 @@ async fn outboard_impl(
start_chunk,
..
} => {
let buf = &mut buffer[..size];
data.read_exact(buf).await?;
let hash = hash_subtree(start_chunk.0, buf, is_root);
let buf = data.read_at(offset, size).await?;
offset += size as u64;
let hash = hash_subtree(start_chunk.0, &buf, is_root);
stack.push(hash);
}
}
Expand Down
38 changes: 20 additions & 18 deletions src/io/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! The traits to perform positioned io are re-exported from
//! [positioned-io](https://crates.io/crates/positioned-io).
use std::{
io::{self, Read, Seek, Write},
io::{self, Read, Write},
result,
};

Expand Down Expand Up @@ -73,19 +73,23 @@ pub trait OutboardMut: Sized {
/// In complex real applications, you might want to do this manually.
pub trait CreateOutboard {
/// Create an outboard from a data source.
fn create(mut data: impl Read + Seek, block_size: BlockSize) -> io::Result<Self>
fn create(data: impl ReadAt + Size, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized,
{
let size = data.seek(io::SeekFrom::End(0))?;
data.rewind()?;
let Some(size) = data.size()? else {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unable to measure the size",
));
};
Self::create_sized(data, size, block_size)
}

/// create an outboard from a data source. This requires the outboard to
/// have a default implementation, which is the case for the memory
/// implementations.
fn create_sized(data: impl Read, size: u64, block_size: BlockSize) -> io::Result<Self>
fn create_sized(data: impl ReadAt, size: u64, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized;

Expand All @@ -96,7 +100,7 @@ pub trait CreateOutboard {
/// such as a file based one. It also does not require [Seek] on the data.
///
/// It will however only include data up the the current tree size.
fn init_from(&mut self, data: impl Read) -> io::Result<()>;
fn init_from(&mut self, data: impl ReadAt) -> io::Result<()>;
}

impl<O: OutboardMut> OutboardMut for &mut O {
Expand Down Expand Up @@ -172,7 +176,7 @@ impl<W: WriteAt> OutboardMut for PreOrderOutboard<W> {
}

impl<W: WriteAt> CreateOutboard for PreOrderOutboard<W> {
fn create_sized(data: impl Read, size: u64, block_size: BlockSize) -> io::Result<Self>
fn create_sized(data: impl ReadAt, size: u64, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized,
{
Expand All @@ -186,7 +190,7 @@ impl<W: WriteAt> CreateOutboard for PreOrderOutboard<W> {
Ok(res)
}

fn init_from(&mut self, data: impl Read) -> io::Result<()> {
fn init_from(&mut self, data: impl ReadAt) -> io::Result<()> {
let mut this = self;
let root = outboard(data, this.tree, &mut this)?;
this.root = root;
Expand All @@ -196,7 +200,7 @@ impl<W: WriteAt> CreateOutboard for PreOrderOutboard<W> {
}

impl<W: WriteAt> CreateOutboard for PostOrderOutboard<W> {
fn create_sized(data: impl Read, size: u64, block_size: BlockSize) -> io::Result<Self>
fn create_sized(data: impl ReadAt, size: u64, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized,
{
Expand All @@ -210,7 +214,7 @@ impl<W: WriteAt> CreateOutboard for PostOrderOutboard<W> {
Ok(res)
}

fn init_from(&mut self, data: impl Read) -> io::Result<()> {
fn init_from(&mut self, data: impl ReadAt) -> io::Result<()> {
let mut this = self;
let root = outboard(data, this.tree, &mut this)?;
this.root = root;
Expand Down Expand Up @@ -387,8 +391,6 @@ pub fn encode_ranges<D: ReadAt + Size, O: Outboard, W: Write>(
let mut encoded = encoded;
let tree = outboard.tree();
let mut buffer = vec![0u8; tree.chunk_group_bytes()];
// write header
encoded.write_all(tree.size.to_le_bytes().as_slice())?;
for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
match item {
BaoChunk::Parent { node, .. } => {
Expand Down Expand Up @@ -431,8 +433,6 @@ pub fn encode_ranges_validated<D: ReadAt + Size, O: Outboard, W: Write>(
let mut out_buf = Vec::new();
// canonicalize ranges
let ranges = truncate_ranges(ranges, tree.size());
// write header
encoded.write_all(tree.size.to_le_bytes().as_slice())?;
for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
match item {
BaoChunk::Parent {
Expand Down Expand Up @@ -504,8 +504,8 @@ pub fn encode_ranges_validated<D: ReadAt + Size, O: Outboard, W: Write>(
/// If you do not want to update an outboard, use [super::outboard::EmptyOutboard] as
/// the outboard.
pub fn decode_ranges<R, O, W>(
ranges: &ChunkRangesRef,
encoded: R,
ranges: &ChunkRangesRef,
mut target: W,
mut outboard: O,
) -> std::result::Result<(), DecodeError>
Expand Down Expand Up @@ -533,7 +533,7 @@ where
/// Unlike [outboard_post_order], this will work with any outboard
/// implementation, but it is not guaranteed that writes are sequential.
pub fn outboard(
data: impl Read,
data: impl ReadAt,
tree: BaoTree,
mut outboard: impl OutboardMut,
) -> io::Result<blake3::Hash> {
Expand All @@ -545,12 +545,13 @@ pub fn outboard(
/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size.
fn outboard_impl(
tree: BaoTree,
mut data: impl Read,
data: impl ReadAt,
mut outboard: impl OutboardMut,
buffer: &mut [u8],
) -> io::Result<blake3::Hash> {
// do not allocate for small trees
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
let mut offset = 0;
debug_assert!(buffer.len() == tree.chunk_group_bytes());
for item in tree.post_order_chunks_iter() {
match item {
Expand All @@ -568,7 +569,8 @@ fn outboard_impl(
..
} => {
let buf = &mut buffer[..size];
data.read_exact(buf)?;
data.read_exact_at(offset, buf)?;
offset += size as u64;
let hash = hash_subtree(start_chunk.0, buf, is_root);
stack.push(hash);
}
Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,12 @@
//! knowledge of the tree geometry (total data size and block size). A common
//! way to get this information is to have the block size as a common parameter
//! of both sides, and send the total data size as a prefix of the encoded data.
//! E.g. the original bao crate uses a little endian u64 as the prefix.
//!
//! This function will perform validation in any case, there is no variant
//! that skips validation since that would defeat the purpose of verified
//! streaming.
//!
//! The original bao crate uses a little endian u64 as the prefix.
//!
//! ## Simple end to end example
//!
//! ```no_run
Expand Down Expand Up @@ -112,7 +111,7 @@
//! root,
//! data: vec![],
//! };
//! decode_ranges(&ranges, from_server, &mut decoded, &mut ob)?;
//! decode_ranges(from_server, &ranges, &mut decoded, &mut ob)?;
//!
//! // the first 100000 bytes of the file should now be in `decoded`
//! // in addition, the required part of the tree to validate that the data is
Expand Down
7 changes: 4 additions & 3 deletions src/rec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ mod test_support {
) -> (Vec<u8>, blake3::Hash) {
let mut res = Vec::new();
let size = data.len() as u64;
res.extend_from_slice(&size.to_le_bytes());
// canonicalize the ranges
let ranges = truncate_ranges(ranges, size);
let hash = encode_selected_rec(
Expand Down Expand Up @@ -534,7 +533,8 @@ mod tests {
let chunk_start = ChunkNum::full_chunks(start as u64);
let chunk_end = ChunkNum::chunks(end as u64).max(chunk_start + 1);
let ranges = ChunkRanges::from(chunk_start..chunk_end);
let actual_encoded = encode_ranges_reference(&data, &ranges, BlockSize::ZERO).0;
let mut actual_encoded = encode_ranges_reference(&data, &ranges, BlockSize::ZERO).0;
actual_encoded.splice(..0, size.to_le_bytes().into_iter());
prop_assert_eq!(expected_encoded, actual_encoded);
}

Expand All @@ -549,7 +549,8 @@ mod tests {
let chunk_start = ChunkNum::full_chunks(start as u64);
let chunk_end = ChunkNum::chunks(end as u64).max(chunk_start + 1);
let ranges = ChunkRanges::from(chunk_start..chunk_end);
let (encoded, hash) = encode_ranges_reference(&data, &ranges, BlockSize::ZERO);
let (mut encoded, hash) = encode_ranges_reference(&data, &ranges, BlockSize::ZERO);
encoded.splice(..0, size.to_le_bytes().into_iter());
let bao_hash = bao::Hash::from(*hash.as_bytes());
let mut decoder =
bao::decode::SliceDecoder::new(Cursor::new(&encoded), &bao_hash, start as u64, 1);
Expand Down
Loading

0 comments on commit a69452d

Please sign in to comment.