Skip to content

Commit

Permalink
update test
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Dec 24, 2024
1 parent abdd4ac commit 0ea54e3
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 161 deletions.
4 changes: 2 additions & 2 deletions rust/geoarrow/src/io/flatgeobuf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod reader;
mod writer;

#[cfg(feature = "flatgeobuf_async")]
pub use reader::{read_flatgeobuf_async, FlatGeobufStreamBuilder, FlatGeobufStreamReader};
pub use reader::{FlatGeobufReader, FlatGeobufReaderBuilder, FlatGeobufReaderOptions};
#[cfg(feature = "flatgeobuf_async")]
pub use reader::{FlatGeobufStreamBuilder, FlatGeobufStreamReader};
pub use writer::{write_flatgeobuf, write_flatgeobuf_with_options, FlatGeobufWriterOptions};
201 changes: 43 additions & 158 deletions rust/geoarrow/src/io/flatgeobuf/reader/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,22 @@ use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use flatgeobuf::{AsyncFeatureIter, GeometryType, HttpFgbReader};
use flatgeobuf::{AsyncFeatureIter, HttpFgbReader};
use futures::task::{Context, Poll};
use futures::Stream;
use geozero::{FeatureProcessor, FeatureProperties};
use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient};
use object_store::path::Path;
use object_store::ObjectStore;

use crate::algorithm::native::DowncastTable;
use crate::array::metadata::ArrayMetadata;
use crate::array::*;
use crate::datatypes::{Dimension, NativeType};
use crate::error::{GeoArrowError, Result};
use crate::io::flatgeobuf::reader::common::{
infer_from_header, infer_schema, parse_crs, FlatGeobufReaderOptions,
};
use crate::io::flatgeobuf::reader::common::{infer_from_header, FlatGeobufReaderOptions};
use crate::io::flatgeobuf::reader::object_store_reader::ObjectStoreWrapper;
use crate::io::geozero::array::GeometryStreamBuilder;
use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions};
use crate::table::Table;

/// A builder for [FlatGeobufReader]
pub struct FlatGeobufStreamBuilder<T: AsyncHttpRangeClient> {
Expand All @@ -38,7 +34,7 @@ impl<T: AsyncHttpRangeClient> FlatGeobufStreamBuilder<T> {
}

/// Create a new [FlatGeobufStreamBuilder] directly from a client.
pub async fn open_with(reader: T, url: &str) -> Result<Self> {
pub async fn new_from_client(reader: T, url: &str) -> Result<Self> {
let client = AsyncBufferedHttpRangeClient::with(reader, url);
Self::new(client).await
}
Expand Down Expand Up @@ -75,7 +71,7 @@ impl<T: AsyncHttpRangeClient> FlatGeobufStreamBuilder<T> {

impl FlatGeobufStreamBuilder<ObjectStoreWrapper> {
/// Create a [FlatGeobufStreamBuilder] from an [ObjectStore] instance.
pub async fn open_store(store: Arc<dyn ObjectStore>, location: Path) -> Result<Self> {
pub async fn new_from_store(store: Arc<dyn ObjectStore>, location: Path) -> Result<Self> {
let head = store.head(&location).await?;
let object_store_wrapper = ObjectStoreWrapper {
reader: store,
Expand Down Expand Up @@ -224,168 +220,57 @@ where
}
}

// impl<T: AsyncHttpRangeClient + Unpin + Send + 'static> Stream for FlatGeobufStreamReader<T> {
// type Item = Result<RecordBatch>;
// }

// fn fgb_to_stream<T: AsyncHttpRangeClient>(selection: AsyncFeatureIter<T>) ->

/// Read a FlatGeobuf file to a Table asynchronously from object storage.
pub async fn read_flatgeobuf_async(
reader: Arc<dyn ObjectStore>,
location: Path,
options: FlatGeobufReaderOptions,
) -> Result<Table> {
let head = reader.head(&location).await?;

let object_store_wrapper = ObjectStoreWrapper {
reader,
location,
size: head.size,
};
let async_client: AsyncBufferedHttpRangeClient<ObjectStoreWrapper> =
AsyncBufferedHttpRangeClient::with(object_store_wrapper, "");

let reader = HttpFgbReader::new(async_client).await.unwrap();

let header = reader.header();
if header.has_m() | header.has_t() | header.has_tm() {
return Err(GeoArrowError::General(
"Only XY and XYZ dimensions are supported".to_string(),
));
}
let has_z = header.has_z();

let schema = infer_schema(header);
let geometry_type = header.geometry_type();
let array_metadata = parse_crs(header.crs());

let mut selection = if let Some((min_x, min_y, max_x, max_y)) = options.bbox {
reader.select_bbox(min_x, min_y, max_x, max_y).await?
} else {
reader.select_all().await?
};

let features_count = selection.features_count();

let options = GeoTableBuilderOptions::new(
options.coord_type,
true,
options.batch_size,
Some(schema),
features_count,
array_metadata,
);

macro_rules! impl_read {
($builder:ty, $dim:expr) => {{
let mut builder = GeoTableBuilder::<$builder>::new_with_options($dim, options);
while let Some(feature) = selection.next().await? {
feature.process_properties(&mut builder)?;
builder.properties_end()?;

builder.push_geometry(feature.geometry_trait()?.as_ref())?;

builder.feature_end(0)?;
}
selection.process_features(&mut builder).await?;
builder.finish()
}};
}

match (geometry_type, has_z) {
(GeometryType::Point, false) => {
impl_read!(PointBuilder, Dimension::XY)
}
(GeometryType::LineString, false) => {
impl_read!(LineStringBuilder, Dimension::XY)
}
(GeometryType::Polygon, false) => {
impl_read!(PolygonBuilder, Dimension::XY)
}
(GeometryType::MultiPoint, false) => {
impl_read!(MultiPointBuilder, Dimension::XY)
}
(GeometryType::MultiLineString, false) => impl_read!(MultiLineStringBuilder, Dimension::XY),
(GeometryType::MultiPolygon, false) => impl_read!(MultiPolygonBuilder, Dimension::XY),
(GeometryType::Unknown, false) => {
let mut builder =
GeoTableBuilder::<GeometryStreamBuilder>::new_with_options(Dimension::XY, options);
selection.process_features(&mut builder).await?;
let table = builder.finish()?;
table.downcast()
}
(GeometryType::Point, true) => {
impl_read!(PointBuilder, Dimension::XYZ)
}
(GeometryType::LineString, true) => {
impl_read!(LineStringBuilder, Dimension::XYZ)
}
(GeometryType::Polygon, true) => {
impl_read!(PolygonBuilder, Dimension::XYZ)
}
(GeometryType::MultiPoint, true) => {
impl_read!(MultiPointBuilder, Dimension::XYZ)
}
(GeometryType::MultiLineString, true) => impl_read!(MultiLineStringBuilder, Dimension::XYZ),
(GeometryType::MultiPolygon, true) => impl_read!(MultiPolygonBuilder, Dimension::XYZ),
(GeometryType::Unknown, true) => {
let mut builder =
GeoTableBuilder::<GeometryStreamBuilder>::new_with_options(Dimension::XYZ, options);
selection.process_features(&mut builder).await?;
let table = builder.finish()?;
table.downcast()
}
// TODO: Parse into a GeometryCollection array and then downcast to a single-typed array if possible.
geom_type => Err(GeoArrowError::NotYetImplemented(format!(
"Parsing FlatGeobuf from {:?} geometry type not yet supported",
geom_type
))),
}
}

#[cfg(test)]
mod test {
use std::env::current_dir;

use crate::table::Table;

use super::*;
use futures::TryStreamExt;
use object_store::local::LocalFileSystem;

#[tokio::test]
async fn test_countries() {
let fs = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap());
let options = FlatGeobufReaderOptions::default();
let table =
read_flatgeobuf_async(fs, Path::from("fixtures/flatgeobuf/countries.fgb"), options)
.await
.unwrap();
assert_eq!(table.len(), 179);
}

#[tokio::test]
async fn test_countries_bbox() {
let fs = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap());
let options = FlatGeobufReaderOptions {
bbox: Some((0., -90., 180., 90.)),
..Default::default()
};
let table =
read_flatgeobuf_async(fs, Path::from("fixtures/flatgeobuf/countries.fgb"), options)
.await
.unwrap();
assert_eq!(table.len(), 133);
}

#[tokio::test]
async fn test_nz_buildings() {
let fs = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap());
let store = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap());
let options = FlatGeobufReaderOptions::default();
let _table = read_flatgeobuf_async(
fs,
Path::from("fixtures/flatgeobuf/nz-building-outlines-small.fgb"),
options,
let builder = FlatGeobufStreamBuilder::new_from_store(
store,
Path::from("fixtures/flatgeobuf/countries.fgb"),
)
.await
.unwrap();
let reader = builder.read(options).await.unwrap();
let schema = reader.schema();
let batches = reader.try_collect::<Vec<_>>().await.unwrap();
let table = Table::try_new(batches, schema).unwrap();
assert_eq!(table.len(), 179);
}

// #[tokio::test]
// async fn test_countries_bbox() {
// let fs = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap());
// let options = FlatGeobufReaderOptions {
// bbox: Some((0., -90., 180., 90.)),
// ..Default::default()
// };
// let table =
// read_flatgeobuf_async(fs, Path::from("fixtures/flatgeobuf/countries.fgb"), options)
// .await
// .unwrap();
// assert_eq!(table.len(), 133);
// }

// #[tokio::test]
// async fn test_nz_buildings() {
// let fs = Arc::new(LocalFileSystem::new_with_prefix(current_dir().unwrap()).unwrap());
// let options = FlatGeobufReaderOptions::default();
// let _table = read_flatgeobuf_async(
// fs,
// Path::from("fixtures/flatgeobuf/nz-building-outlines-small.fgb"),
// options,
// )
// .await
// .unwrap();
// }
}
2 changes: 1 addition & 1 deletion rust/geoarrow/src/io/flatgeobuf/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod sync;

pub use common::FlatGeobufReaderOptions;
#[cfg(feature = "flatgeobuf_async")]
pub use r#async::{read_flatgeobuf_async, FlatGeobufStreamBuilder, FlatGeobufStreamReader};
pub use r#async::{FlatGeobufStreamBuilder, FlatGeobufStreamReader};
pub use sync::{FlatGeobufReader, FlatGeobufReaderBuilder};

0 comments on commit 0ea54e3

Please sign in to comment.