Skip to content

Commit

Permalink
Python: Update flatgeobuf reader and add object-store-python integr…
Browse files Browse the repository at this point in the history
…ation (#536)

Fixes first problem in
#534. Uses my own branch
pending roeap/object-store-python#11

Todo:

- [x] Add minimal type hinting for `ObjectStore` class.
  • Loading branch information
kylebarron authored Mar 6, 2024
1 parent b7e07be commit 7ab771d
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 32 deletions.
19 changes: 19 additions & 0 deletions python/core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions python/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ arrow = { version = "50", features = ["ffi"] }
bytes = "1"
flatgeobuf = { version = "4.1.0", default-features = false }
object_store = { version = "0.9.0", features = ["aws", "azure", "gcp", "http"] }
object_store_python = { git = "https://github.com/kylebarron/object-store-python", branch = "kyle/expose-inner", package = "object-store-internal" }
# object_store_python = { git = "https://github.com/roeap/object-store-python", rev = "445e9d7fa238fc3cd31cc2820caee0d8e10fedb8", package = "object-store-internal" }
parquet = "50"
pyo3 = { version = "0.20.0", features = [
"abi3-py38",
Expand Down
12 changes: 8 additions & 4 deletions python/core/python/geoarrow/rust/core/_rust.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1354,15 +1354,16 @@ def read_csv(
def read_flatgeobuf(
file: Union[str, Path, BinaryIO],
*,
fs: Optional[ObjectStore] = None,
batch_size: int = 65536,
bbox: Tuple[IntFloat, IntFloat, IntFloat, IntFloat] | None = None,
bbox: Tuple[float, float, float, float] | None = None,
) -> GeoTable: ...
async def read_flatgeobuf_async(
url: str,
path: str,
fs: ObjectStore,
*,
batch_size: int = 65536,
options: Dict[str, str] | None = None,
bbox: Tuple[IntFloat, IntFloat, IntFloat, IntFloat] | None = None,
bbox: Tuple[float, float, float, float] | None = None,
) -> GeoTable: ...
def read_geojson(
file: Union[str, Path, BinaryIO], *, batch_size: int = 65536
Expand Down Expand Up @@ -1415,6 +1416,9 @@ def write_ipc_stream(
) -> None: ...
def write_parquet(table: ArrowStreamExportable, file: str) -> None: ...

class ObjectStore:
def __init__(self, root: str, options: Optional[Dict[str, str]] = None) -> None: ...

# Interop
def from_ewkb(
input: ArrowArrayExportable,
Expand Down
94 changes: 66 additions & 28 deletions python/core/src/io/flatgeobuf.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,99 @@
use std::collections::HashMap;

use crate::error::{PyGeoArrowError, PyGeoArrowResult};
use crate::io::file::{BinaryFileReader, BinaryFileWriter};
use crate::table::GeoTable;
use flatgeobuf::FgbWriterOptions;
use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async;
use geoarrow::io::flatgeobuf::write_flatgeobuf_with_options as _write_flatgeobuf;
use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions};
use object_store::{parse_url, parse_url_opts};
use pyo3::exceptions::PyValueError;
use object_store_python::PyObjectStore;
use pyo3::prelude::*;
use url::Url;

/// Read a FlatGeobuf file from a path on disk into a GeoTable.
///
/// Example:
///
/// Reading a remote file.
///
/// ```py
/// from geoarrow.rust.core import ObjectStore, read_flatgeobuf_async
///
/// options = {
/// "aws_access_key_id": "...",
/// "aws_secret_access_key": "...",
/// }
/// fs = ObjectStore('s3://bucket', options=options)
/// table = read_flatgeobuf("path/in/bucket.fgb", fs)
/// ```
///
/// Args:
/// file: the path to the file or a Python file object in binary read mode.
///
/// Other args:
/// fs: an ObjectStore instance for this url. This is required only if the file is at a remote
/// location.
/// batch_size: the number of rows to include in each internal batch of the table.
/// bbox: A spatial filter for reading rows, of the format (minx, miny, maxx, maxy). If set to
/// `None`, no spatial filtering will be performed.
///
/// Returns:
/// Table from FlatGeobuf file.
#[pyfunction]
#[pyo3(signature = (file, *, batch_size=65536, bbox=None))]
#[pyo3(signature = (file, *, fs=None, batch_size=65536, bbox=None))]
pub fn read_flatgeobuf(
py: Python,
file: PyObject,
fs: Option<PyObjectStore>,
batch_size: usize,
bbox: Option<(f64, f64, f64, f64)>,
) -> PyGeoArrowResult<GeoTable> {
let mut reader = file.extract::<BinaryFileReader>(py)?;
let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let table = _read_flatgeobuf(&mut reader, options)?;
Ok(GeoTable(table))
if let Some(fs) = fs {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let path = file.extract::<String>(py)?;
let table = _read_flatgeobuf_async(fs.inner, path.into(), options)
.await
.map_err(PyGeoArrowError::GeoArrowError)?;

Ok(GeoTable(table))
})
} else {
let mut reader = file.extract::<BinaryFileReader>(py)?;
let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let table = _read_flatgeobuf(&mut reader, options)?;
Ok(GeoTable(table))
}
}

/// Read a FlatGeobuf file from a url into a GeoTable.
///
/// Example:
///
/// ```py
/// from geoarrow.rust.core import ObjectStore, read_flatgeobuf_async
///
/// options = {
/// "aws_access_key_id": "...",
/// "aws_secret_access_key": "...",
/// }
/// fs = ObjectStore('s3://bucket', options=options)
/// table = await read_flatgeobuf_async("path/in/bucket.fgb", fs)
/// ```
///
/// Args:
/// url: the url to a remote FlatGeobuf file
/// fs: an ObjectStore instance for this url.
///
/// Other args:
/// batch_size: the number of rows to include in each internal batch of the table.
Expand All @@ -55,31 +103,21 @@ pub fn read_flatgeobuf(
/// Returns:
/// Table from FlatGeobuf file.
#[pyfunction]
#[pyo3(signature = (url, *, batch_size=65536, options=None, bbox=None))]
#[pyo3(signature = (path, fs, *, batch_size=65536, bbox=None))]
pub fn read_flatgeobuf_async(
py: Python,
url: String,
path: String,
fs: PyObjectStore,
batch_size: usize,
options: Option<HashMap<String, String>>,
bbox: Option<(f64, f64, f64, f64)>,
) -> PyGeoArrowResult<PyObject> {
let fut = pyo3_asyncio::tokio::future_into_py(py, async move {
let url = Url::parse(&url).map_err(|err| PyValueError::new_err(err.to_string()))?;
let (reader, location) = if let Some(options) = options {
parse_url_opts(&url, options)
} else {
parse_url(&url)
}
.map_err(|err| PyValueError::new_err(err.to_string()))?;
// dbg!(&reader);
// dbg!(&location);

let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let table = _read_flatgeobuf_async(reader, location, options)
let table = _read_flatgeobuf_async(fs.inner, path.into(), options)
.await
.map_err(PyGeoArrowError::GeoArrowError)?;

Expand Down
2 changes: 2 additions & 0 deletions python/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ fn _rust(_py: Python, m: &PyModule) -> PyResult<()> {

// IO

m.add_class::<object_store_python::PyObjectStore>()?;

m.add_function(wrap_pyfunction!(crate::io::csv::read_csv, m)?)?;
m.add_function(wrap_pyfunction!(crate::io::flatgeobuf::read_flatgeobuf, m)?)?;
m.add_function(wrap_pyfunction!(
Expand Down

0 comments on commit 7ab771d

Please sign in to comment.