Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: FlatGeobuf async stream #966

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft

Conversation

kylebarron
Copy link
Member

@kylebarron kylebarron commented Dec 23, 2024

I don't really know how to implement this.

I'd like to implement something like RecordBatchStream, a struct that implements Stream<Item = Result<RecordBatch>>. But I don't understand the low-level implementation of Stream and poll_next well enough to do that.

FlatGeobuf exposes the AsyncFeatureIter struct, which implements next but does not implement Stream.

Ideally I want a high level API to transform the AsyncFeatureIter into a stream. I tried to use https://github.com/tokio-rs/async-stream but got stuck because AsyncFeatureIter does not implement Stream.

Additionally, even if I'm able to implement this method using async-stream, I really want the struct itself to implement Stream rather than having a method that returns an opaque impl Stream.

@H-Plus-Time maybe you have some thoughts on this? Ideally we would be able to plumb this through to JS like we have with the GeoParquet reader as well.

@H-Plus-Time
Copy link
Contributor

H-Plus-Time commented Dec 24, 2024

Hmm, ok, I think it's not too difficult to impl Stream<FgbFeature> (it's a matter of calling poll on the fut returned by next, matching on that and so on), but rolling that up to a RecordBatch is somewhat unclear to me 🤔.

This'd be ok-ish for a Stream<Result<FgbFeature, Error>> (supposedly, though it does look correct):

use flatgeobuf::{FgbFeature, Error, HttpFgbReader, AsyncFeatureIter};
use futures::stream::Stream;
use std::pin::Pin;
use futures::task::{Context, Poll};
use futures::Future;

pub struct FgbStream<T>
where
    T: AsyncRead + AsyncSeek + Unpin + Send,
{
    iter: AsyncFeatureIter<T>,
}

impl<T> Stream for FgbStream<T>
where
    T: AsyncRead + AsyncSeek + Unpin + Send,
{
    type Item = Result<FgbFeature, Error>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        // Use the async next method of AsyncFeatureIter
        let future = self.iter.next();
        futures::pin_mut!(future);

        match future.poll(cx) {
            Poll::Ready(Some(Ok(feature))) => Poll::Ready(Some(Ok(feature))),
            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
            Poll::Ready(None) => Poll::Ready(None), // End of stream
            Poll::Pending => Poll::Pending, // Still waiting
        }
    }
}

@kylebarron
Copy link
Member Author

Thanks for the input! That's really cool to see how to use pin and poll.

I think in theory it's not too hard to wrap up into a record batch. You just continue calling next until you've read batch_size rows. async-stream might be able to help with that too

@H-Plus-Time
Copy link
Contributor

Yeah, it looks like a fresh GeoTableBuilder per batch, and more or less what you're doing in read_flatgeobuf_async.

No need for the stream of FgbFeatures I think:

impl<T> Stream for FgbStream<T>
where
    T: AsyncRead + AsyncSeek + Unpin + Send,
{
    type Item = Result<RecordBatch, Error>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
      let fut = self.read_batch();
      // Rest of the original
    }
    async fn read_batch(&mut self) {
      // Construct GeoTableBuilder, conventional loop call of next on the iter, stopping at batch_size.
    }

I'd say read_batch would be 95% source identical to read_flatgeobuf_async, it's really just the batch limiting, and grabbing the 0th record batch vs returning the GeoTable. Really depends if you want to avoid doubling up on stream interfaces 🤷 (that or one of the approaches runs into lifetime/Send issues).

@kylebarron
Copy link
Member Author

Ok cool that's a big help to get me unblocked. Hopefully the rest can be very similar to #933

@kylebarron
Copy link
Member Author

It compiles at least! Now to see if it works!

@kylebarron
Copy link
Member Author

kylebarron commented Dec 24, 2024

I tried running the test locally but it just hangs forever 🥲 ; I must be doing something wrong

@H-Plus-Time
Copy link
Contributor

Ahah! I bet it's the else case in next_batch - if the batch size is never reached (which I think it isn't with US counties, there's something like 30k of them if I recall correctly), at some point, iter.next() will get you None, but you'll still have a non-zero number of rows, in which case you should do the same routine as when you hit the threshold (the next call to next_batch will get you row_count=0, so Ok(None) is good in that case).

That being said... The hanging is a bit weird 🤔, handling short (and trailing, I'm fairly certain it'd truncate to exactly a multiple of the batch size with the current code) batches might not completely solve it.

@H-Plus-Time
Copy link
Contributor

Ah, ok, this makes sense now - the fut of next_batch needs to be long-lived (rather than constructed in poll_next repeatedly), and /that/ needs to be polled (going by examples in the parquet stream implementation, ready!(fut.poll_unpin(cx)) replaces the thing you're matching against).

So you pretty much have either:

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) {
  // Initialise the fut for the next batch, loop poll it (the approach used in the parquet stream reader)
 let fut = self.next_batch();
 // Probably necessary
 futures::pin_mut!(fut);
 loop {
   match ready!(fut.poll_unpin(cx)) {
     Some(Ok(batch)) => {
       // Break out of the loop
       return Poll::Ready(Some(Ok(batch)));
     },
     // Error case as per usual
     None => {
       // Close out the stream
       return Poll::Ready(None);
     }
     // No need for an explicit pending case
   }
 }

Or:

  1. Store the fut, .boxed() on the stream struct, in an Option (so you can start it out as None).
  2. On the first poll, initialise the fut and just return Poll:Pending immediately.
  3. Borrow it, ready!(fut.poll_unpin(cx)), if it happens to be ready and the result is non-None, swap in a new fut.
  4. Ready and None, same deal as above, break out of the stream

@H-Plus-Time
Copy link
Contributor

Alright, update on this - I (finally) have a functional version of this (lodging a PR shortly). Indirecting via an Option (to avoid constant simultaneous borrowing problems), and dropping the next_batch method into a child struct (with no references or shared lifetime with the parent) seems to be what makes this work.

@kylebarron kylebarron mentioned this pull request Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants