Skip to content

Commit

Permalink
Address stream hanging, ensure num_rows_remaining decrements during r…
Browse files Browse the repository at this point in the history
…eading
  • Loading branch information
H-Plus-Time committed Jan 13, 2025
1 parent 0ea54e3 commit e88e14b
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 78 deletions.
11 changes: 1 addition & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/geoarrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ flatgeobuf_async = [
"dep:bytes",
"dep:http-range-client",
"dep:object_store",
"dep:futures"
]
gdal = ["dep:gdal"]
geos = ["dep:geos"]
Expand Down
2 changes: 1 addition & 1 deletion rust/geoarrow/src/io/flatgeobuf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod writer;

pub use reader::{FlatGeobufReader, FlatGeobufReaderBuilder, FlatGeobufReaderOptions};
#[cfg(feature = "flatgeobuf_async")]
pub use reader::{FlatGeobufStreamBuilder, FlatGeobufStreamReader};
pub use reader::{FlatGeobufStreamBuilder, FlatGeobufStream};
pub use writer::{write_flatgeobuf, write_flatgeobuf_with_options, FlatGeobufWriterOptions};
189 changes: 123 additions & 66 deletions rust/geoarrow/src/io/flatgeobuf/reader/async.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use flatgeobuf::{AsyncFeatureIter, HttpFgbReader};
use futures::task::{Context, Poll};
use futures::Stream;
use futures::{ready, FutureExt, Stream};
use futures::future::BoxFuture;
use geozero::{FeatureProcessor, FeatureProperties};
use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient};
use object_store::path::Path;
Expand Down Expand Up @@ -40,31 +40,31 @@ impl<T: AsyncHttpRangeClient> FlatGeobufStreamBuilder<T> {
}

/// Read from the FlatGeobuf file
pub async fn read(self, options: FlatGeobufReaderOptions) -> Result<FlatGeobufStreamReader<T>> {
pub async fn read(self, options: FlatGeobufReaderOptions) -> Result<FlatGeobufStream<T>> {
let (data_type, properties_schema, array_metadata) =
infer_from_header(self.reader.header())?;
if let Some((min_x, min_y, max_x, max_y)) = options.bbox {
let selection = self.reader.select_bbox(min_x, min_y, max_x, max_y).await?;
let num_rows = selection.features_count();
Ok(FlatGeobufStreamReader {
Ok(FlatGeobufStream::new(
selection,
data_type,
batch_size: options.batch_size.unwrap_or(65_536),
options.batch_size.unwrap_or(65_536),
properties_schema,
num_rows_remaining: num_rows,
num_rows,
array_metadata,
})
))
} else {
let selection = self.reader.select_all().await?;
let num_rows = selection.features_count();
Ok(FlatGeobufStreamReader {
Ok(FlatGeobufStream::new(
selection,
data_type,
batch_size: options.batch_size.unwrap_or(65_536),
options.batch_size.unwrap_or(65_536),
properties_schema,
num_rows_remaining: num_rows,
num_rows,
array_metadata,
})
))
}
}
}
Expand All @@ -83,50 +83,22 @@ impl FlatGeobufStreamBuilder<ObjectStoreWrapper> {
}
}

/// An iterator over record batches from a FlatGeobuf file.
///
/// This implements [arrow_array::RecordBatchReader], which you can use to access data.
pub struct FlatGeobufStreamReader<T: AsyncHttpRangeClient> {
selection: AsyncFeatureIter<T>,
data_type: NativeType,
batch_size: usize,
properties_schema: SchemaRef,
num_rows_remaining: Option<usize>,
array_metadata: Arc<ArrayMetadata>,
}

impl<T: AsyncHttpRangeClient> FlatGeobufStreamReader<T> {
/// Access the schema of the batches emitted from this stream.
pub fn schema(&self) -> SchemaRef {
let geom_field =
self.data_type
.to_field_with_metadata("geometry", true, &self.array_metadata);
let mut fields = self.properties_schema.fields().to_vec();
fields.push(Arc::new(geom_field));
Arc::new(Schema::new_with_metadata(
fields,
self.properties_schema.metadata().clone(),
))
}
enum StreamState<T: AsyncHttpRangeClient> {
Init(Option<FlatGeobufStreamReader<T>>),
Reading(BoxFuture<'static, Result<(FlatGeobufStreamReader<T>, Option<RecordBatch>)>>)
}

fn construct_options(&self) -> GeoTableBuilderOptions {
let coord_type = self.data_type.coord_type();
let mut batch_size = self.batch_size;
if let Some(num_rows_remaining) = self.num_rows_remaining {
batch_size = batch_size.min(num_rows_remaining);
}
GeoTableBuilderOptions::new(
coord_type,
false,
Some(batch_size),
Some(self.properties_schema.clone()),
self.num_rows_remaining,
self.array_metadata.clone(),
)
}
struct FlatGeobufStreamReader<T: AsyncHttpRangeClient> {
selection: AsyncFeatureIter<T>,
data_type: NativeType
}

async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
let options = self.construct_options();
impl<T> FlatGeobufStreamReader<T>
where
T: AsyncHttpRangeClient
{
async fn next_batch(mut self, options: GeoTableBuilderOptions) -> Result<(Self, Option<RecordBatch>)> {
let batch_size = options.batch_size;

macro_rules! impl_read {
Expand All @@ -136,7 +108,7 @@ impl<T: AsyncHttpRangeClient> FlatGeobufStreamReader<T> {
if row_count >= batch_size {
let (batches, _schema) = $builder.finish()?.into_inner();
assert_eq!(batches.len(), 1);
return Ok(Some(batches.into_iter().next().unwrap()));
return Ok((self, Some(batches.into_iter().next().unwrap())));
}

if let Some(feature) = self.selection.next().await? {
Expand All @@ -148,7 +120,7 @@ impl<T: AsyncHttpRangeClient> FlatGeobufStreamReader<T> {
$builder.feature_end(0)?;
row_count += 1;
} else {
return Ok(None);
return Ok((self, None));
}
}
}};
Expand Down Expand Up @@ -199,23 +171,108 @@ impl<T: AsyncHttpRangeClient> FlatGeobufStreamReader<T> {
}
}

impl<T> Stream for FlatGeobufStreamReader<T>
/// An iterator over record batches from a FlatGeobuf file.
///
/// This implements [arrow_array::RecordBatchReader], which you can use to access data.
pub struct FlatGeobufStream<T: AsyncHttpRangeClient> {
data_type: NativeType,
batch_size: usize,
properties_schema: SchemaRef,
num_rows_remaining: Option<usize>,
array_metadata: Arc<ArrayMetadata>,
state: StreamState<T>,
}


impl<T: AsyncHttpRangeClient> FlatGeobufStream<T> {
/// Access the schema of the batches emitted from this stream.
pub fn schema(&self) -> SchemaRef {
let geom_field =
self.data_type
.to_field_with_metadata("geometry", true, &self.array_metadata);
let mut fields = self.properties_schema.fields().to_vec();
fields.push(Arc::new(geom_field));
Arc::new(Schema::new_with_metadata(
fields,
self.properties_schema.metadata().clone(),
))
}

fn construct_options(&self) -> GeoTableBuilderOptions {
let coord_type = self.data_type.coord_type();
let mut batch_size = self.batch_size;
if let Some(num_rows_remaining) = self.num_rows_remaining {
batch_size = batch_size.min(num_rows_remaining);
}
GeoTableBuilderOptions::new(
coord_type,
false,
Some(batch_size),
Some(self.properties_schema.clone()),
self.num_rows_remaining,
self.array_metadata.clone(),
)
}

fn new(
selection: AsyncFeatureIter<T>,
data_type: NativeType,
batch_size: usize,
properties_schema: SchemaRef,
num_rows_remaining: Option<usize>,
array_metadata: Arc<ArrayMetadata>
) -> Self {
Self {
data_type,
batch_size,
properties_schema,
num_rows_remaining,
array_metadata,
state: StreamState::Init(Some(FlatGeobufStreamReader { data_type, selection})),
}
}

}

impl<T> Stream for FlatGeobufStream<T>
where
T: AsyncHttpRangeClient + Unpin + Send,
T: AsyncHttpRangeClient + Unpin + Send + 'static,
{
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let future = self.next_batch();
futures::pin_mut!(future);

match future.poll(cx) {
Poll::Ready(Ok(Some(feature))) => Poll::Ready(Some(Ok(feature))),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
// End of stream
Poll::Ready(Ok(None)) => Poll::Ready(None),
// Still waiting
Poll::Pending => Poll::Pending,
match &mut self.state {
StreamState::Init(reader) => {
let reader = reader.take().expect("lost reader");
if let Some(num_rows) = self.num_rows_remaining {
if num_rows == 0 {
return Poll::Ready(None);
}
}
let fut = reader.next_batch(self.construct_options()).boxed();
self.state = StreamState::Reading(fut);
self.poll_next(cx)
},
StreamState::Reading(f) => {
match ready!(f.poll_unpin(cx)) {
Ok((reader, maybe_batch)) => {
match maybe_batch {
Some(batch) => {
if let Some(num_rows) = self.num_rows_remaining {
self.num_rows_remaining = Some(num_rows - batch.num_rows());
}
self.state = StreamState::Init(Some(reader));
return Poll::Ready(Some(Ok(batch)))
},
// no more record batches
None => return Poll::Ready(None)
}
},
Err(err) => {
return Poll::Ready(Some(Err(err)))
}
}
},
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/geoarrow/src/io/flatgeobuf/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod sync;

pub use common::FlatGeobufReaderOptions;
#[cfg(feature = "flatgeobuf_async")]
pub use r#async::{FlatGeobufStreamBuilder, FlatGeobufStreamReader};
pub use r#async::{FlatGeobufStreamBuilder, FlatGeobufStream};
pub use sync::{FlatGeobufReader, FlatGeobufReaderBuilder};

0 comments on commit e88e14b

Please sign in to comment.