Skip to content

Commit

Permalink
Python: Use pyo3-object_store for reading data from remote object sto…
Browse files Browse the repository at this point in the history
…res (#849)
  • Loading branch information
kylebarron authored Nov 13, 2024
1 parent 8bad0f6 commit 9c99a1f
Show file tree
Hide file tree
Showing 15 changed files with 125 additions and 767 deletions.
15 changes: 15 additions & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions python/geoarrow-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ default = ["async"]
async = [
"dep:futures",
"dep:object_store",
"dep:pyo3-object_store",
"parquet/object_store",
"dep:pyo3-async-runtimes",
"geoarrow/flatgeobuf_async",
Expand All @@ -48,6 +49,7 @@ pyo3-arrow = { workspace = true }
pyo3-async-runtimes = { version = "0.22", features = [
"tokio-runtime",
], optional = true }
pyo3-object_store = { version = "0.1.0-beta.1", optional = true }
pythonize = "0.22"
geo = "0.28"
geo-traits = { workspace = true }
Expand Down
46 changes: 21 additions & 25 deletions python/geoarrow-io/python/geoarrow/rust/io/_io.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ from __future__ import annotations
from pathlib import Path
from typing import (
BinaryIO,
Dict,
List,
Optional,
Sequence,
Expand All @@ -24,15 +23,15 @@ from .enums import GeoParquetEncoding
from .types import BboxCovering, GeoParquetEncodingT

class ParquetFile:
def __init__(self, path: str, fs: ObjectStore) -> None:
def __init__(self, path: str, store: ObjectStore) -> None:
"""
Construct a new ParquetFile
This will synchronously fetch metadata from the provided path
Args:
path: a string URL to read from.
fs: the file system interface to read from.
store: the file system interface to read from.
Returns:
A new ParquetFile object.
Expand Down Expand Up @@ -133,15 +132,15 @@ class ParquetFile:
"""

class ParquetDataset:
def __init__(self, paths: Sequence[str], fs: ObjectStore) -> None:
def __init__(self, paths: Sequence[str], store: ObjectStore) -> None:
"""
Construct a new ParquetDataset
This will synchronously fetch metadata from all listed files.
Args:
paths: a list of string URLs to read from.
fs: the file system interface to read from.
store: the file system interface to read from.
Returns:
A new ParquetDataset object.
Expand Down Expand Up @@ -241,9 +240,6 @@ class ParquetWriter:
table: _description_
"""

class ObjectStore:
def __init__(self, root: str, options: Optional[Dict[str, str]] = None) -> None: ...

def read_csv(
file: str | Path | BinaryIO,
geometry_column_name: str,
Expand All @@ -265,7 +261,7 @@ def read_csv(
def read_flatgeobuf(
file: Union[str, Path, BinaryIO],
*,
fs: Optional[ObjectStore] = None,
store: Optional[ObjectStore] = None,
batch_size: int = 65536,
bbox: Tuple[float, float, float, float] | None = None,
) -> Table:
Expand Down Expand Up @@ -309,15 +305,15 @@ def read_flatgeobuf(
"aws_secret_access_key": "...",
"aws_region": "..."
}
fs = ObjectStore('s3://bucket', options=options)
table = read_flatgeobuf("path/in/bucket.fgb", fs=fs)
store = ObjectStore('s3://bucket', options=options)
table = read_flatgeobuf("path/in/bucket.fgb", store=store)
```
Args:
file: the path to the file or a Python file object in binary read mode.
Other args:
fs: an ObjectStore instance for this url. This is required only if the file is at a remote
store: an ObjectStore instance for this url. This is required only if the file is at a remote
location.
batch_size: the number of rows to include in each internal batch of the table.
bbox: A spatial filter for reading rows, of the format (minx, miny, maxx, maxy). If set to
Expand All @@ -330,7 +326,7 @@ def read_flatgeobuf(
async def read_flatgeobuf_async(
path: str,
*,
fs: Optional[ObjectStore] = None,
store: Optional[ObjectStore] = None,
batch_size: int = 65536,
bbox: Tuple[float, float, float, float] | None = None,
) -> Table:
Expand Down Expand Up @@ -358,17 +354,17 @@ async def read_flatgeobuf_async(
"aws_secret_access_key": "...",
"aws_region": "..."
}
fs = ObjectStore('s3://bucket', options=options)
table = await read_flatgeobuf_async("path/in/bucket.fgb", fs=fs)
store = ObjectStore('s3://bucket', options=options)
table = await read_flatgeobuf_async("path/in/bucket.fgb", store=store)
```
Args:
path: the url or relative path to a remote FlatGeobuf file. If an argument is passed for
`fs`, this should be a path fragment relative to the root passed to the `ObjectStore`
`store`, this should be a path fragment relative to the root passed to the `ObjectStore`
constructor.
Other args:
fs: an ObjectStore instance for this url. This is required for non-HTTP urls.
store: an ObjectStore instance for this url. This is required for non-HTTP urls.
batch_size: the number of rows to include in each internal batch of the table.
bbox: A spatial filter for reading rows, of the format (minx, miny, maxx, maxy). If set to
`None`, no spatial filtering will be performed.
Expand Down Expand Up @@ -409,7 +405,7 @@ def read_geojson_lines(
def read_parquet(
path: Union[str, Path, BinaryIO],
*,
fs: Optional[ObjectStore] = None,
store: Optional[ObjectStore] = None,
batch_size: int = 65536,
) -> Table:
"""
Expand Down Expand Up @@ -443,13 +439,13 @@ def read_parquet(
"aws_secret_access_key": "...",
"aws_region": "..."
}
fs = ObjectStore('s3://bucket', options=options)
table = read_parquet("path/in/bucket.parquet", fs=fs)
store = ObjectStore('s3://bucket', options=options)
table = read_parquet("path/in/bucket.parquet", store=store)
```
Args:
path: the path to the file
fs: the ObjectStore to read from. Defaults to None.
store: the ObjectStore to read from. Defaults to None.
batch_size: the number of rows to include in each internal batch of the table.
Returns:
Expand All @@ -459,7 +455,7 @@ def read_parquet(
async def read_parquet_async(
path: Union[str, Path, BinaryIO],
*,
fs: Optional[ObjectStore] = None,
store: Optional[ObjectStore] = None,
batch_size: int = 65536,
) -> Table:
"""
Expand All @@ -486,13 +482,13 @@ async def read_parquet_async(
"aws_secret_access_key": "...",
"aws_region": "..."
}
fs = ObjectStore('s3://bucket', options=options)
table = await read_parquet_async("path/in/bucket.parquet", fs=fs)
store = ObjectStore('s3://bucket', options=options)
table = await read_parquet_async("path/in/bucket.parquet", store=store)
```
Args:
path: the path to the file
fs: the ObjectStore to read from. Defaults to None.
store: the ObjectStore to read from. Defaults to None.
batch_size: the number of rows to include in each internal batch of the table.
Returns:
Expand Down
6 changes: 3 additions & 3 deletions python/geoarrow-io/src/io/flatgeobuf/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::future_into_py;

#[pyfunction]
#[pyo3(signature = (path, *, fs=None, batch_size=65536, bbox=None))]
#[pyo3(signature = (path, *, store=None, batch_size=65536, bbox=None))]
pub fn read_flatgeobuf_async(
py: Python,
path: PyObject,
fs: Option<PyObject>,
store: Option<PyObject>,
batch_size: usize,
bbox: Option<(f64, f64, f64, f64)>,
) -> PyGeoArrowResult<PyObject> {
let reader = construct_reader(py, path, fs)?;
let reader = construct_reader(py, path, store)?;
match reader {
AnyFileReader::Async(async_reader) => {
let fut = future_into_py(py, async move {
Expand Down
36 changes: 21 additions & 15 deletions python/geoarrow-io/src/io/flatgeobuf/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,37 @@ use pyo3::prelude::*;
use pyo3_arrow::input::AnyRecordBatch;

#[pyfunction]
#[pyo3(signature = (file, *, fs=None, batch_size=65536, bbox=None))]
#[pyo3(signature = (file, *, store=None, batch_size=65536, bbox=None))]
pub fn read_flatgeobuf(
py: Python,
file: PyObject,
fs: Option<PyObject>,
store: Option<PyObject>,
batch_size: usize,
bbox: Option<(f64, f64, f64, f64)>,
) -> PyGeoArrowResult<PyObject> {
let reader = construct_reader(py, file, fs)?;
let reader = construct_reader(py, file, store)?;
match reader {
#[cfg(feature = "async")]
AnyFileReader::Async(async_reader) => async_reader.runtime.block_on(async move {
use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async;
AnyFileReader::Async(async_reader) => {
use crate::runtime::get_runtime;

let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let table = _read_flatgeobuf_async(async_reader.store, async_reader.path, options)
.await
.map_err(PyGeoArrowError::GeoArrowError)?;
let runtime = get_runtime(py)?;

Ok(table_to_pytable(table).to_arro3(py)?)
}),
runtime.block_on(async move {
use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async;

let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let table = _read_flatgeobuf_async(async_reader.store, async_reader.path, options)
.await
.map_err(PyGeoArrowError::GeoArrowError)?;

Ok(table_to_pytable(table).to_arro3(py)?)
})
}
AnyFileReader::Sync(mut sync_reader) => {
let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
Expand Down
22 changes: 6 additions & 16 deletions python/geoarrow-io/src/io/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,22 @@ use std::sync::Arc;

use crate::error::PyGeoArrowResult;
#[cfg(feature = "async")]
use crate::io::object_store::PyObjectStore;
#[cfg(feature = "async")]
use object_store::http::HttpBuilder;
#[cfg(feature = "async")]
use object_store::path::Path;
#[cfg(feature = "async")]
use object_store::{ClientOptions, ObjectStore};
use pyo3::exceptions::PyValueError;
#[cfg(feature = "async")]
use pyo3_object_store::PyObjectStore;
use sync::FileReader;

use pyo3::prelude::*;
#[cfg(feature = "async")]
use tokio::runtime::Runtime;
use url::Url;

#[cfg(feature = "async")]
pub struct AsyncFileReader {
pub store: Arc<dyn ObjectStore>,
pub path: Path,
pub runtime: Arc<Runtime>,
}

pub enum AnyFileReader {
Expand All @@ -39,16 +35,15 @@ pub enum AnyFileReader {
pub fn construct_reader(
py: Python,
file: PyObject,
fs: Option<PyObject>,
store: Option<PyObject>,
) -> PyGeoArrowResult<AnyFileReader> {
// If the user passed an object store instance, use that
#[cfg(feature = "async")]
if let Some(fs) = fs {
let fs = fs.extract::<PyObjectStore>(py)?;
if let Some(store) = store {
let store = store.extract::<PyObjectStore>(py)?;
let path = file.extract::<String>(py)?;
let async_reader = AsyncFileReader {
store: fs.inner,
runtime: fs.rt,
store: store.into_inner(),
path: path.into(),
};
return Ok(AnyFileReader::Async(async_reader));
Expand All @@ -70,13 +65,8 @@ pub fn construct_reader(
.build()?;
let path = url.path().trim_start_matches('/');

let runtime = Arc::new(
tokio::runtime::Runtime::new()
.map_err(|err| PyValueError::new_err(err.to_string()))?,
);
let async_reader = AsyncFileReader {
store: Arc::new(store),
runtime,
path: path.into(),
};
return Ok(AnyFileReader::Async(async_reader));
Expand Down
2 changes: 0 additions & 2 deletions python/geoarrow-io/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ pub mod flatgeobuf;
pub mod geojson;
pub mod geojson_lines;
pub mod input;
#[cfg(feature = "async")]
pub mod object_store;
pub mod parquet;
#[cfg(feature = "async")]
pub mod postgis;
Expand Down
Loading

0 comments on commit 9c99a1f

Please sign in to comment.