diff --git a/src/lib.rs b/src/lib.rs index 1f47254..82ef375 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,9 +9,12 @@ use core::{future::Future, pin::pin, task}; use futures_core::Stream; mod stream; +mod try_stream; +mod try_yielder; mod waker; mod yielder; +pub use self::try_yielder::TryYielder; pub use self::yielder::Yielder; /// Unwrap the waker @@ -85,3 +88,54 @@ where { stream_fn(func) } + +/// Create a new try stream +/// +/// # Example +/// +/// Let's yield some lyrics (Song: "Verdächtig" by Systemabsturz): +/// +/// ``` +/// # use futures_lite::StreamExt; +/// # use std::pin::pin; +/// # use std::convert::Infallible; +/// # futures_lite::future::block_on(async { +/// let stream = asynk_strim::try_stream_fn(|mut yielder| async move { +/// yielder.yield_ok("Fahr den Imsi-Catcher hoch").await; +/// yielder.yield_ok("Mach das Richtmikro an").await; +/// yielder.yield_ok("Bring Alexa auf den Markt").await; +/// yielder.yield_ok("Zapf den Netzknoten an").await; +/// yielder.yield_ok("Fahr den Ü-Wagen vor").await; +/// yielder.yield_ok("Kauf den Staatstrojaner ein").await; +/// yielder.yield_ok("Fake die Exit-Nodes bei Tor").await; +/// yielder.yield_ok("Ihr wollt doch alle sicher sein").await; +/// +/// Ok::<_, Infallible>(()) +/// }); +/// +/// let mut stream = pin!(stream); +/// while let Some(item) = stream.next().await { +/// println!("{item:?}"); +/// } +/// # }); +/// ``` +#[inline] +pub fn try_stream_fn(func: F) -> impl Stream> +where + F: FnOnce(TryYielder) -> Fut, + Fut: Future>, +{ + crate::try_stream::init(func) +} + +/// Jokey alias for [`try_stream_fn`] +/// +/// For more elaborate documentation, see [`try_stream_fn`] +#[inline] +pub fn try_strim_fn(func: F) -> impl Stream> +where + F: FnOnce(TryYielder) -> Fut, + Fut: Future>, +{ + try_stream_fn(func) +} diff --git a/src/try_stream.rs b/src/try_stream.rs new file mode 100644 index 0000000..8908edf --- /dev/null +++ b/src/try_stream.rs @@ -0,0 +1,109 @@ +use crate::{try_yielder::TryYielder, yielder::Yielder}; +use core::{ + future::Future, + hint::unreachable_unchecked, + marker::PhantomData, + pin::Pin, + ptr, + task::{self, Poll}, +}; +use futures_core::{FusedStream, Stream}; +use pin_project_lite::pin_project; + +#[inline] +pub fn init(func: F) -> impl Stream> +where + F: FnOnce(TryYielder) -> Fut, + Fut: Future>, +{ + AsynkStrim::Initial { func: Some(func) } +} + +pin_project! { + /// IMPORTANT: Never EVER EVER create this stream in the state `Initial` with the `func` parameter set to `None` + /// Doing this will trigger undefined behaviour. + /// + /// IMPORTANT: Never EVER EVER construct a stream in the `MarkerStuff` state. + /// Doing this will trigger undefined behaviour. + #[project = AsynkStrimProj] + #[project(!Unpin)] + enum AsynkStrim { + Initial { + func: Option, + }, + Progress { + #[pin] + fut: Fut, + }, + Done, + MarkerStuff { + _item: PhantomData, + } + } +} + +impl Stream for AsynkStrim> +where + F: FnOnce(TryYielder) -> Fut, + Fut: Future>, +{ + type Item = Result; + + #[inline] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let stream_address = ptr::from_ref(self.as_ref().get_ref()) as usize; + loop { + match self.as_mut().project() { + AsynkStrimProj::Initial { func } => { + // at the end of the function we transition into the progress state. + // this state is never initialized with `func` set to `None`. + // + // we actually only do this to be able to use `.take()` to remove the function from the future. + #[allow(unsafe_code)] + let func = unsafe { func.take().unwrap_unchecked() }; + let fut = func(TryYielder::new(Yielder::new(stream_address))); + + self.set(Self::Progress { fut }); + } + AsynkStrimProj::Progress { fut, .. } => { + let mut out = None; + let poll_output = + crate::waker::with_context(cx.waker(), stream_address, &mut out, |cx| { + fut.poll(cx) + }); + + match (poll_output, out) { + (Poll::Ready(result), ..) => { + self.set(AsynkStrim::Done); + if let Err(err) = result { + break Poll::Ready(Some(Err(err))); + } + } + (Poll::Pending, Some(item)) => break Poll::Ready(Some(item)), + (Poll::Pending, None) => break Poll::Pending, + } + } + AsynkStrimProj::Done => break Poll::Ready(None), + AsynkStrimProj::MarkerStuff { .. } => { + // the state machine will never enter this state. + // documented on the state machine level. + #[allow(unsafe_code)] + unsafe { + unreachable_unchecked() + } + } + } + } + } +} + +impl FusedStream for AsynkStrim> +where + F: FnOnce(TryYielder) -> Fut, + Fut: Future>, +{ + #[inline] + fn is_terminated(&self) -> bool { + matches!(self, Self::Done) + } +} diff --git a/src/try_yielder.rs b/src/try_yielder.rs new file mode 100644 index 0000000..84f77d4 --- /dev/null +++ b/src/try_yielder.rs @@ -0,0 +1,31 @@ +use crate::yielder::Yielder; + +/// Handle to allow you to yield something from the stream +pub struct TryYielder { + yielder: Yielder>, +} + +impl TryYielder { + #[inline] + pub(crate) fn new(yielder: Yielder>) -> Self { + Self { yielder } + } + + /// Yield a result from the stream + #[inline] + pub async fn yield_result(&mut self, item: Result) { + self.yielder.yield_item(item).await; + } + + /// Yield a success value from the stream + #[inline] + pub async fn yield_ok(&mut self, item: Ok) { + self.yield_result(Ok(item)).await; + } + + /// Yield an error value from the stream + #[inline] + pub async fn yield_error(&mut self, item: Error) { + self.yield_result(Err(item)).await; + } +} diff --git a/src/yielder.rs b/src/yielder.rs index 5632328..1ee1ba6 100644 --- a/src/yielder.rs +++ b/src/yielder.rs @@ -5,16 +5,12 @@ use core::{ task::{self, Poll}, }; -struct YieldFuture<'a, Item> { +struct YieldFuture { item: Option, stream_address: usize, - - // Here to catch some API misuse - // Nothing safety relevant. It would just panic at runtime which isn't ideal. - _lifetime_invariant: PhantomData<&'a mut ()>, } -impl Future for YieldFuture<'_, Item> { +impl Future for YieldFuture { type Output = (); #[inline] @@ -50,7 +46,7 @@ impl Future for YieldFuture<'_, Item> { } } -impl Unpin for YieldFuture<'_, Item> {} +impl Unpin for YieldFuture {} /// Handle to allow you to yield something from the stream pub struct Yielder { @@ -69,12 +65,12 @@ impl Yielder { /// Yield an item from the stream #[inline] - pub fn yield_item(&mut self, item: Item) -> impl Future + '_ { - YieldFuture { + pub async fn yield_item(&mut self, item: Item) { + let future = YieldFuture { item: Some(item), stream_address: self.stream_address, + }; - _lifetime_invariant: PhantomData, - } + future.await; } } diff --git a/tests/try_stream.rs b/tests/try_stream.rs new file mode 100644 index 0000000..69bc7a9 --- /dev/null +++ b/tests/try_stream.rs @@ -0,0 +1,20 @@ +use futures_lite::stream; +use std::pin::pin; + +#[test] +fn exits_early() { + let stream = pin!(asynk_strim::try_stream_fn(|mut yielder| async move { + yielder.yield_ok(42).await; + return Err("oh no"); + + #[allow(unreachable_code)] + yielder.yield_ok(1337).await; + + Ok(()) + })); + + let mut stream = stream::block_on(stream); + assert_eq!(stream.next(), Some(Ok(42))); + assert_eq!(stream.next(), Some(Err("oh no"))); + assert_eq!(stream.next(), None); +} diff --git a/tests/ui/double_yield.stderr b/tests/ui/double_yield.stderr index 4df0d69..83b7caa 100644 --- a/tests/ui/double_yield.stderr +++ b/tests/ui/double_yield.stderr @@ -6,4 +6,4 @@ error[E0499]: cannot borrow `yielder` as mutable more than once at a time 4 | let _second = yielder.yield_item(141); | ^^^^^^^ second mutable borrow occurs here 5 | }); - | - first borrow might be used here, when `_first` is dropped and runs the destructor for type `impl Future + '_` + | - first borrow might be used here, when `_first` is dropped and runs the destructor for type `impl Future`