Skip to content

Commit

Permalink
Add try_stream_fn (#6)
Browse files Browse the repository at this point in the history
* Add try_stream_fn

* Deduplicate

* Remove duplicate functions
  • Loading branch information
aumetra authored Oct 9, 2024
1 parent 0d64232 commit 2c51192
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 28 deletions.
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ Features:
- one dependency (besides `futures-core` which I don't count since it provides the `Stream` definition)
- `no_std`-compatible, zero allocations

> [!IMPORTANT]
> This crate adds a wrapper around the wakers that contains data and pointers needed to yield items.
> Crates like [`embassy`](https://embassy.dev) use a similar approach and will therefore clash with us.
>
> If you run into this issue (which will manifest as a runtime panic), you can use the `unwrap_waker` function.
> This function will wrap a future and remove the waker wrapper.
>
> While you can't use the yielder inside the unwrapped future, stuff like `embassy` should work again.
### ⚠ Important

This crate adds a wrapper around the wakers that contains data and pointers needed to yield items.
Crates like [`embassy`](https://embassy.dev) use a similar approach and will therefore clash with us.

If you run into this issue (which will manifest as a runtime panic), you can use the `unwrap_waker` function.
This function will wrap a future and remove the waker wrapper.

While you can't use the yielder inside the unwrapped future, stuff like `embassy` should work again.

## Example

Expand Down
56 changes: 56 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use core::{future::Future, pin::pin, task};
use futures_core::Stream;

mod stream;
mod try_yielder;
mod waker;
mod yielder;

pub use self::try_yielder::TryYielder;
pub use self::yielder::Yielder;

/// Unwrap the waker
Expand Down Expand Up @@ -85,3 +87,57 @@ where
{
stream_fn(func)
}

/// Create a new try stream
///
/// # Example
///
/// Let's yield some lyrics (Song: "Archbombe" by Systemabsturz):
///
/// ```
/// # use futures_lite::StreamExt;
/// # use std::pin::pin;
/// # use std::convert::Infallible;
/// # futures_lite::future::block_on(async {
/// let stream = asynk_strim::try_stream_fn(|mut yielder| async move {
/// yielder.yield_ok("Meine Programme habe ich mal ausgecheckt").await;
/// yielder.yield_ok("Dass ich mit Zündern reden kann finde ich suspekt").await;
/// yielder.yield_ok("Meine Codezeilen haben anfangs Hippies geschrieben").await;
/// yielder.yield_error("Von ihrem Pazifismus ist nicht viel geblieben").await;
/// yielder.yield_ok("Ich bin echt nicht glücklich und nicht einverstanden").await;
///
/// Err("Ich als Bombensteuerung soll auf Menschen landen")
/// });
///
/// let mut stream = pin!(stream);
/// while let Some(item) = stream.next().await {
/// println!("{item:?}");
/// }
/// # });
/// ```
#[inline]
pub fn try_stream_fn<F, Ok, Error, Fut>(func: F) -> impl Stream<Item = Result<Ok, Error>>
where
F: FnOnce(TryYielder<Ok, Error>) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
crate::stream::init(|mut yielder: TryYielder<_, _>| async move {
// trivially copyable. bit-wise copy is fine.
#[allow(unsafe_code)]
if let Err(err) = func(unsafe { core::ptr::read(&yielder) }).await {
yielder.yield_error(err).await;
}
})
}

/// Jokey alias for [`try_stream_fn`]
///
/// For more elaborate documentation, see [`try_stream_fn`]
#[inline]
pub fn try_strim_fn<F, Ok, Error, Fut>(func: F) -> impl Stream<Item = Result<Ok, Error>>
where
F: FnOnce(TryYielder<Ok, Error>) -> Fut,
Fut: Future<Output = Result<(), Error>>,
{
try_stream_fn(func)
}
20 changes: 12 additions & 8 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use futures_core::{FusedStream, Stream};
use pin_project_lite::pin_project;

#[inline]
pub fn init<F, Fut, Item>(func: F) -> impl Stream<Item = Item>
pub fn init<F, Fut, Yieldr, Item>(func: F) -> impl Stream<Item = Item>
where
F: FnOnce(Yielder<Item>) -> Fut,
F: FnOnce(Yieldr) -> Fut,
Fut: Future<Output = ()>,
Yieldr: From<Yielder<Item>>,
{
AsynkStrim::Initial { func: Some(func) }
}
Expand All @@ -27,7 +28,7 @@ pin_project! {
/// Doing this will trigger undefined behaviour.
#[project = AsynkStrimProj]
#[project(!Unpin)]
enum AsynkStrim<F, Fut, Item> {
enum AsynkStrim<F, Fut, Yieldr, Item> {
Initial {
func: Option<F>,
},
Expand All @@ -38,14 +39,16 @@ pin_project! {
Done,
MarkerStuff {
_item: PhantomData<Item>,
_yieldr: PhantomData<Yieldr>,
}
}
}

impl<F, Fut, Item> Stream for AsynkStrim<F, Fut, Item>
impl<F, Fut, Yieldr, Item> Stream for AsynkStrim<F, Fut, Yieldr, Item>
where
F: FnOnce(Yielder<Item>) -> Fut,
F: FnOnce(Yieldr) -> Fut,
Fut: Future<Output = ()>,
Yieldr: From<Yielder<Item>>,
{
type Item = Item;

Expand All @@ -61,7 +64,7 @@ where
// we actually only do this to be able to use `.take()` to remove the function from the future.
#[allow(unsafe_code)]
let func = unsafe { func.take().unwrap_unchecked() };
let fut = func(Yielder::new(stream_address));
let fut = func(<_>::from(Yielder::new(stream_address)));

self.set(Self::Progress { fut });
}
Expand Down Expand Up @@ -92,10 +95,11 @@ where
}
}

impl<F, Fut, Item> FusedStream for AsynkStrim<F, Fut, Item>
impl<F, Fut, Y, Item> FusedStream for AsynkStrim<F, Fut, Y, Item>
where
F: FnOnce(Yielder<Item>) -> Fut,
F: FnOnce(Y) -> Fut,
Fut: Future<Output = ()>,
Y: From<Yielder<Item>>,
{
#[inline]
fn is_terminated(&self) -> bool {
Expand Down
28 changes: 28 additions & 0 deletions src/try_yielder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::yielder::Yielder;

/// Handle to allow you to yield something from the stream
pub struct TryYielder<Ok, Error> {
yielder: Yielder<Result<Ok, Error>>,
}

impl<Ok, Error> TryYielder<Ok, Error> {
/// Yield a success value from the stream
#[inline]
pub async fn yield_ok(&mut self, item: Ok) {
self.yielder.yield_item(Ok(item)).await;
}

/// Yield an error value from the stream
#[inline]
pub async fn yield_error(&mut self, item: Error) {
self.yielder.yield_item(Err(item)).await;
}
}

#[doc(hidden)]
impl<Ok, Error> From<Yielder<Result<Ok, Error>>> for TryYielder<Ok, Error> {
#[inline]
fn from(yielder: Yielder<Result<Ok, Error>>) -> Self {
Self { yielder }
}
}
18 changes: 7 additions & 11 deletions src/yielder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ use core::{
task::{self, Poll},
};

struct YieldFuture<'a, Item> {
struct YieldFuture<Item> {
item: Option<Item>,
stream_address: usize,

// Here to catch some API misuse
// Nothing safety relevant. It would just panic at runtime which isn't ideal.
_lifetime_invariant: PhantomData<&'a mut ()>,
}

impl<Item> Future for YieldFuture<'_, Item> {
impl<Item> Future for YieldFuture<Item> {
type Output = ();

#[inline]
Expand Down Expand Up @@ -50,7 +46,7 @@ impl<Item> Future for YieldFuture<'_, Item> {
}
}

impl<Item> Unpin for YieldFuture<'_, Item> {}
impl<Item> Unpin for YieldFuture<Item> {}

/// Handle to allow you to yield something from the stream
pub struct Yielder<Item> {
Expand All @@ -69,12 +65,12 @@ impl<Item> Yielder<Item> {

/// Yield an item from the stream
#[inline]
pub fn yield_item(&mut self, item: Item) -> impl Future<Output = ()> + '_ {
YieldFuture {
pub async fn yield_item(&mut self, item: Item) {
let future = YieldFuture {
item: Some(item),
stream_address: self.stream_address,
};

_lifetime_invariant: PhantomData,
}
future.await;
}
}
20 changes: 20 additions & 0 deletions tests/try_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use futures_lite::stream;
use std::pin::pin;

#[test]
fn exits_early() {
let stream = pin!(asynk_strim::try_stream_fn(|mut yielder| async move {
yielder.yield_ok(42).await;
return Err("oh no");

#[allow(unreachable_code)]
yielder.yield_ok(1337).await;

Ok(())
}));

let mut stream = stream::block_on(stream);
assert_eq!(stream.next(), Some(Ok(42)));
assert_eq!(stream.next(), Some(Err("oh no")));
assert_eq!(stream.next(), None);
}
2 changes: 1 addition & 1 deletion tests/ui/double_yield.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ error[E0499]: cannot borrow `yielder` as mutable more than once at a time
4 | let _second = yielder.yield_item(141);
| ^^^^^^^ second mutable borrow occurs here
5 | });
| - first borrow might be used here, when `_first` is dropped and runs the destructor for type `impl Future<Output = ()> + '_`
| - first borrow might be used here, when `_first` is dropped and runs the destructor for type `impl Future<Output = ()>`

0 comments on commit 2c51192

Please sign in to comment.