Skip to content

Commit

Permalink
Add try_stream_fn
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Oct 8, 2024
1 parent 0d64232 commit fa8de9b
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 12 deletions.
54 changes: 54 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use core::{future::Future, pin::pin, task};
use futures_core::Stream;

mod stream;
mod try_stream;
mod try_yielder;
mod waker;
mod yielder;

pub use self::try_yielder::TryYielder;
pub use self::yielder::Yielder;

/// Unwrap the waker
Expand Down Expand Up @@ -85,3 +88,54 @@ where
{
stream_fn(func)
}

/// Create a new try stream
///
/// # Example
///
/// Let's yield some lyrics (Song: "Verdächtig" by Systemabsturz):
///
/// ```
/// # use futures_lite::StreamExt;
/// # use std::pin::pin;
/// # use std::convert::Infallible;
/// # futures_lite::future::block_on(async {
/// let stream = asynk_strim::try_stream_fn(|mut yielder| async move {
/// yielder.yield_ok("Fahr den Imsi-Catcher hoch").await;
/// yielder.yield_ok("Mach das Richtmikro an").await;
/// yielder.yield_ok("Bring Alexa auf den Markt").await;
/// yielder.yield_ok("Zapf den Netzknoten an").await;
/// yielder.yield_ok("Fahr den Ü-Wagen vor").await;
/// yielder.yield_ok("Kauf den Staatstrojaner ein").await;
/// yielder.yield_ok("Fake die Exit-Nodes bei Tor").await;
/// yielder.yield_ok("Ihr wollt doch alle sicher sein").await;
///
/// Ok::<_, Infallible>(())
/// });
///
/// let mut stream = pin!(stream);
/// while let Some(item) = stream.next().await {
/// println!("{item:?}");
/// }
/// # });
/// ```
#[inline]
pub fn try_stream_fn<F, Ok, Error, Fut>(func: F) -> impl Stream<Item = Result<Ok, Error>>
where
F: FnOnce(TryYielder<Ok, Error>) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
crate::try_stream::init(func)
}

/// Jokey alias for [`try_stream_fn`]
///
/// For more elaborate documentation, see [`try_stream_fn`]
#[inline]
pub fn try_strim_fn<F, Ok, Error, Fut>(func: F) -> impl Stream<Item = Result<Ok, Error>>
where
F: FnOnce(TryYielder<Ok, Error>) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
try_stream_fn(func)
}
109 changes: 109 additions & 0 deletions src/try_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use crate::{try_yielder::TryYielder, yielder::Yielder};
use core::{
future::Future,
hint::unreachable_unchecked,
marker::PhantomData,
pin::Pin,
ptr,
task::{self, Poll},
};
use futures_core::{FusedStream, Stream};
use pin_project_lite::pin_project;

#[inline]
pub fn init<F, Fut, Ok, Error>(func: F) -> impl Stream<Item = Result<Ok, Error>>
where
F: FnOnce(TryYielder<Ok, Error>) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
AsynkStrim::Initial { func: Some(func) }
}

pin_project! {
/// IMPORTANT: Never EVER EVER create this stream in the state `Initial` with the `func` parameter set to `None`
/// Doing this will trigger undefined behaviour.
///
/// IMPORTANT: Never EVER EVER construct a stream in the `MarkerStuff` state.
/// Doing this will trigger undefined behaviour.
#[project = AsynkStrimProj]
#[project(!Unpin)]
enum AsynkStrim<F, Fut, Item> {
Initial {
func: Option<F>,
},
Progress {
#[pin]
fut: Fut,
},
Done,
MarkerStuff {
_item: PhantomData<Item>,
}
}
}

impl<F, Fut, Ok, Error> Stream for AsynkStrim<F, Fut, Result<Ok, Error>>
where
F: FnOnce(TryYielder<Ok, Error>) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
type Item = Result<Ok, Error>;

#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let stream_address = ptr::from_ref(self.as_ref().get_ref()) as usize;
loop {
match self.as_mut().project() {
AsynkStrimProj::Initial { func } => {
// at the end of the function we transition into the progress state.
// this state is never initialized with `func` set to `None`.
//
// we actually only do this to be able to use `.take()` to remove the function from the future.
#[allow(unsafe_code)]
let func = unsafe { func.take().unwrap_unchecked() };
let fut = func(TryYielder::new(Yielder::new(stream_address)));

self.set(Self::Progress { fut });
}
AsynkStrimProj::Progress { fut, .. } => {
let mut out = None;
let poll_output =
crate::waker::with_context(cx.waker(), stream_address, &mut out, |cx| {
fut.poll(cx)
});

match (poll_output, out) {
(Poll::Ready(result), ..) => {
self.set(AsynkStrim::Done);
if let Err(err) = result {
break Poll::Ready(Some(Err(err)));
}
}
(Poll::Pending, Some(item)) => break Poll::Ready(Some(item)),
(Poll::Pending, None) => break Poll::Pending,
}
}
AsynkStrimProj::Done => break Poll::Ready(None),
AsynkStrimProj::MarkerStuff { .. } => {
// the state machine will never enter this state.
// documented on the state machine level.
#[allow(unsafe_code)]
unsafe {
unreachable_unchecked()
}
}
}
}
}
}

impl<F, Fut, Ok, Error> FusedStream for AsynkStrim<F, Fut, Result<Ok, Error>>
where
F: FnOnce(TryYielder<Ok, Error>) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
#[inline]
fn is_terminated(&self) -> bool {
matches!(self, Self::Done)
}
}
31 changes: 31 additions & 0 deletions src/try_yielder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::yielder::Yielder;

/// Handle to allow you to yield something from the stream
pub struct TryYielder<Ok, Error> {
yielder: Yielder<Result<Ok, Error>>,
}

impl<Ok, Error> TryYielder<Ok, Error> {
#[inline]
pub(crate) fn new(yielder: Yielder<Result<Ok, Error>>) -> Self {
Self { yielder }
}

/// Yield a result from the stream
#[inline]
pub async fn yield_result(&mut self, item: Result<Ok, Error>) {
self.yielder.yield_item(item).await;
}

/// Yield a success value from the stream
#[inline]
pub async fn yield_ok(&mut self, item: Ok) {
self.yield_result(Ok(item)).await;
}

/// Yield an error value from the stream
#[inline]
pub async fn yield_error(&mut self, item: Error) {
self.yield_result(Err(item)).await;
}
}
18 changes: 7 additions & 11 deletions src/yielder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ use core::{
task::{self, Poll},
};

struct YieldFuture<'a, Item> {
struct YieldFuture<Item> {
item: Option<Item>,
stream_address: usize,

// Here to catch some API misuse
// Nothing safety relevant. It would just panic at runtime which isn't ideal.
_lifetime_invariant: PhantomData<&'a mut ()>,
}

impl<Item> Future for YieldFuture<'_, Item> {
impl<Item> Future for YieldFuture<Item> {
type Output = ();

#[inline]
Expand Down Expand Up @@ -50,7 +46,7 @@ impl<Item> Future for YieldFuture<'_, Item> {
}
}

impl<Item> Unpin for YieldFuture<'_, Item> {}
impl<Item> Unpin for YieldFuture<Item> {}

/// Handle to allow you to yield something from the stream
pub struct Yielder<Item> {
Expand All @@ -69,12 +65,12 @@ impl<Item> Yielder<Item> {

/// Yield an item from the stream
#[inline]
pub fn yield_item(&mut self, item: Item) -> impl Future<Output = ()> + '_ {
YieldFuture {
pub async fn yield_item(&mut self, item: Item) {
let future = YieldFuture {
item: Some(item),
stream_address: self.stream_address,
};

_lifetime_invariant: PhantomData,
}
future.await;
}
}
20 changes: 20 additions & 0 deletions tests/try_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use futures_lite::stream;
use std::pin::pin;

#[test]
fn exits_early() {
let stream = pin!(asynk_strim::try_stream_fn(|mut yielder| async move {
yielder.yield_ok(42).await;
return Err("oh no");

#[allow(unreachable_code)]
yielder.yield_ok(1337).await;

Ok(())
}));

let mut stream = stream::block_on(stream);
assert_eq!(stream.next(), Some(Ok(42)));
assert_eq!(stream.next(), Some(Err("oh no")));
assert_eq!(stream.next(), None);
}
2 changes: 1 addition & 1 deletion tests/ui/double_yield.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ error[E0499]: cannot borrow `yielder` as mutable more than once at a time
4 | let _second = yielder.yield_item(141);
| ^^^^^^^ second mutable borrow occurs here
5 | });
| - first borrow might be used here, when `_first` is dropped and runs the destructor for type `impl Future<Output = ()> + '_`
| - first borrow might be used here, when `_first` is dropped and runs the destructor for type `impl Future<Output = ()>`

0 comments on commit fa8de9b

Please sign in to comment.