Skip to content

Commit

Permalink
Deduplicate
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Oct 9, 2024
1 parent fa8de9b commit 9233d77
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 130 deletions.
9 changes: 7 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use core::{future::Future, pin::pin, task};
use futures_core::Stream;

mod stream;
mod try_stream;
mod try_yielder;
mod waker;
mod yielder;
Expand Down Expand Up @@ -125,7 +124,13 @@ where
F: FnOnce(TryYielder<Ok, Error>) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
crate::try_stream::init(func)
let func = |mut yielder: TryYielder<_, _>| async move {
if let Err(err) = func(yielder.duplicate()).await {
yielder.yield_error(err).await;
}
};

crate::stream::init(func)
}

/// Jokey alias for [`try_stream_fn`]
Expand Down
20 changes: 12 additions & 8 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use futures_core::{FusedStream, Stream};
use pin_project_lite::pin_project;

#[inline]
pub fn init<F, Fut, Item>(func: F) -> impl Stream<Item = Item>
pub fn init<F, Fut, Yieldr, Item>(func: F) -> impl Stream<Item = Item>
where
F: FnOnce(Yielder<Item>) -> Fut,
F: FnOnce(Yieldr) -> Fut,
Fut: Future<Output = ()>,
Yieldr: From<Yielder<Item>>,
{
AsynkStrim::Initial { func: Some(func) }
}
Expand All @@ -27,7 +28,7 @@ pin_project! {
/// Doing this will trigger undefined behaviour.
#[project = AsynkStrimProj]
#[project(!Unpin)]
enum AsynkStrim<F, Fut, Item> {
enum AsynkStrim<F, Fut, Yieldr, Item> {
Initial {
func: Option<F>,
},
Expand All @@ -38,14 +39,16 @@ pin_project! {
Done,
MarkerStuff {
_item: PhantomData<Item>,
_yieldr: PhantomData<Yieldr>,
}
}
}

impl<F, Fut, Item> Stream for AsynkStrim<F, Fut, Item>
impl<F, Fut, Yieldr, Item> Stream for AsynkStrim<F, Fut, Yieldr, Item>
where
F: FnOnce(Yielder<Item>) -> Fut,
F: FnOnce(Yieldr) -> Fut,
Fut: Future<Output = ()>,
Yieldr: From<Yielder<Item>>,
{
type Item = Item;

Expand All @@ -61,7 +64,7 @@ where
// we actually only do this to be able to use `.take()` to remove the function from the future.
#[allow(unsafe_code)]
let func = unsafe { func.take().unwrap_unchecked() };
let fut = func(Yielder::new(stream_address));
let fut = func(<_>::from(Yielder::new(stream_address)));

self.set(Self::Progress { fut });
}
Expand Down Expand Up @@ -92,10 +95,11 @@ where
}
}

impl<F, Fut, Item> FusedStream for AsynkStrim<F, Fut, Item>
impl<F, Fut, Y, Item> FusedStream for AsynkStrim<F, Fut, Y, Item>
where
F: FnOnce(Yielder<Item>) -> Fut,
F: FnOnce(Y) -> Fut,
Fut: Future<Output = ()>,
Y: From<Yielder<Item>>,
{
#[inline]
fn is_terminated(&self) -> bool {
Expand Down
109 changes: 0 additions & 109 deletions src/try_stream.rs

This file was deleted.

25 changes: 14 additions & 11 deletions src/try_yielder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,29 @@ pub struct TryYielder<Ok, Error> {
}

impl<Ok, Error> TryYielder<Ok, Error> {
#[inline]
pub(crate) fn new(yielder: Yielder<Result<Ok, Error>>) -> Self {
Self { yielder }
}

/// Yield a result from the stream
#[inline]
pub async fn yield_result(&mut self, item: Result<Ok, Error>) {
self.yielder.yield_item(item).await;
pub(crate) fn duplicate(&self) -> Self {
Self {
yielder: self.yielder.duplicate(),
}
}

/// Yield a success value from the stream
#[inline]
pub async fn yield_ok(&mut self, item: Ok) {
self.yield_result(Ok(item)).await;
self.yielder.yield_item(Ok(item)).await;
}

/// Yield an error value from the stream
#[inline]
pub async fn yield_error(&mut self, item: Error) {
self.yield_result(Err(item)).await;
self.yielder.yield_item(Err(item)).await;
}
}

#[doc(hidden)]
impl<Ok, Error> From<Yielder<Result<Ok, Error>>> for TryYielder<Ok, Error> {
#[inline]
fn from(yielder: Yielder<Result<Ok, Error>>) -> Self {
Self { yielder }
}
}
7 changes: 7 additions & 0 deletions src/yielder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ impl<Item> Yielder<Item> {
}
}

pub(crate) fn duplicate(&self) -> Self {
Self {
_marker: PhantomData,
stream_address: self.stream_address,
}
}

/// Yield an item from the stream
#[inline]
pub async fn yield_item(&mut self, item: Item) {
Expand Down

0 comments on commit 9233d77

Please sign in to comment.