diff --git a/Cargo.lock b/Cargo.lock index d111385..9fe8dd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -656,6 +656,7 @@ dependencies = [ "once_cell", "percent-encoding", "pyo3", + "pyo3-asyncio", "reqwest", "thiserror", "tokio", @@ -839,6 +840,19 @@ dependencies = [ "unindent", ] +[[package]] +name = "pyo3-asyncio" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea6b68e93db3622f3bb3bf363246cf948ed5375afe7abff98ccbdd50b184995" +dependencies = [ + "futures", + "once_cell", + "pin-project-lite", + "pyo3", + "tokio", +] + [[package]] name = "pyo3-build-config" version = "0.20.2" diff --git a/README.md b/README.md index 252c5f9..f484b7c 100644 --- a/README.md +++ b/README.md @@ -41,20 +41,20 @@ implementation, with some slight adjustments for ease of use in python programs. ### `ObjectStore` api ```py -from object_store import ObjectStore, ObjectMeta +from object_store import ObjectStore, ObjectMeta, Path # we use an in-memory store for demonstration purposes. # data will not be persisted and is not shared across store instances store = ObjectStore("memory://") -store.put("data", b"some data") +store.put(Path("data"), b"some data") data = store.get("data") assert data == b"some data" blobs = store.list() -meta: ObjectMeta = store.head("data") +meta = store.head("data") range = store.get_range("data", start=0, length=4) assert range == b"some" @@ -64,6 +64,33 @@ copied = store.get("copied") assert copied == data ``` +#### Async api + +```py +from object_store import ObjectStore, ObjectMeta, Path + +# we use an in-memory store for demonstration purposes. +# data will not be persisted and is not shared across store instances +store = ObjectStore("memory://") + +path = Path("data") +await store.put_async(path, b"some data") + +data = await store.get_async(path) +assert data == b"some data" + +blobs = await store.list_async() + +meta = await store.head_async(path) + +range = await store.get_range_async(path, start=0, length=4) +assert range == b"some" + +await store.copy_async(Path("data"), Path("copied")) +copied = await store.get_async(Path("copied")) +assert copied == data +``` + ### Configuration As much as possible we aim to make access to various storage backends dependent diff --git a/object-store-internal/Cargo.toml b/object-store-internal/Cargo.toml index 8f6ae81..013c28b 100644 --- a/object-store-internal/Cargo.toml +++ b/object-store-internal/Cargo.toml @@ -13,6 +13,7 @@ once_cell = "1.12.0" object_store = { version = "0.9", features = ["azure", "aws", "gcp"] } percent-encoding = "2" pyo3 = { version = "0.20", default-features = false, features = ["macros"] } +pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] } thiserror = "1.0.34" tokio = { version = "1.0", features = [ "macros", diff --git a/object-store-internal/src/lib.rs b/object-store-internal/src/lib.rs index 78e6c63..ec3e5e5 100644 --- a/object-store-internal/src/lib.rs +++ b/object-store-internal/src/lib.rs @@ -2,6 +2,7 @@ mod builder; mod file; mod utils; +use std::borrow::Cow; use std::collections::HashMap; use std::fmt; use std::sync::Arc; @@ -19,7 +20,7 @@ use pyo3::exceptions::{ PyException, PyFileExistsError, PyFileNotFoundError, PyNotImplementedError, }; use pyo3::prelude::*; -use pyo3::{types::PyBytes, PyErr}; +use pyo3::PyErr; use tokio::runtime::Runtime; pub use builder::ObjectStoreBuilder; @@ -514,19 +515,49 @@ impl PyObjectStore { Ok(()) } + /// Save the provided bytes to the specified location. + #[pyo3(text_signature = "($self, location, bytes)")] + fn put_async<'a>( + &'a self, + py: Python<'a>, + location: PyPath, + bytes: Vec, + ) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + inner + .put(&location.into(), bytes.into()) + .await + .map_err(ObjectStoreError::from)?; + Ok(()) + }) + } + /// Return the bytes that are stored at the specified location. #[pyo3(text_signature = "($self, location)")] - fn get(&self, location: PyPath) -> PyResult> { + fn get(&self, location: PyPath) -> PyResult> { let obj = self .rt .block_on(get_bytes(self.inner.as_ref(), &location.into())) .map_err(ObjectStoreError::from)?; - Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py))) + Ok(Cow::Owned(obj.to_vec())) + } + + /// Return the bytes that are stored at the specified location. + #[pyo3(text_signature = "($self, location)")] + fn get_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + let obj = get_bytes(inner.as_ref(), &location.into()) + .await + .map_err(ObjectStoreError::from)?; + Ok(Cow::<[u8]>::Owned(obj.to_vec())) + }) } /// Return the bytes that are stored at the specified location in the given byte range #[pyo3(text_signature = "($self, location, start, length)")] - fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult> { + fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult> { let range = std::ops::Range { start, end: start + length, @@ -534,9 +565,32 @@ impl PyObjectStore { let obj = self .rt .block_on(self.inner.get_range(&location.into(), range)) - .map_err(ObjectStoreError::from)? - .to_vec(); - Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py))) + .map_err(ObjectStoreError::from)?; + Ok(Cow::Owned(obj.to_vec())) + } + + /// Return the bytes that are stored at the specified location in the given byte range + #[pyo3(text_signature = "($self, location, start, length)")] + fn get_range_async<'a>( + &'a self, + py: Python<'a>, + location: PyPath, + start: usize, + length: usize, + ) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + let range = std::ops::Range { + start, + end: start + length, + }; + + pyo3_asyncio::tokio::future_into_py(py, async move { + let obj = inner + .get_range(&location.into(), range) + .await + .map_err(ObjectStoreError::from)?; + Ok(Cow::<[u8]>::Owned(obj.to_vec())) + }) } /// Return the metadata for the specified location @@ -549,6 +603,19 @@ impl PyObjectStore { Ok(meta.into()) } + /// Return the metadata for the specified location + #[pyo3(text_signature = "($self, location)")] + fn head_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + let meta = inner + .head(&location.into()) + .await + .map_err(ObjectStoreError::from)?; + Ok(PyObjectMeta::from(meta)) + }) + } + /// Delete the object at the specified location. #[pyo3(text_signature = "($self, location)")] fn delete(&self, location: PyPath) -> PyResult<()> { @@ -558,6 +625,19 @@ impl PyObjectStore { Ok(()) } + /// Delete the object at the specified location. + #[pyo3(text_signature = "($self, location)")] + fn delete_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + inner + .delete(&location.into()) + .await + .map_err(ObjectStoreError::from)?; + Ok(()) + }) + } + /// List all the objects with the given prefix. /// /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix @@ -576,6 +656,25 @@ impl PyObjectStore { .collect()) } + /// List all the objects with the given prefix. + /// + /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix + /// of `foo/bar/x` but not of `foo/bar_baz/x`. + #[pyo3(text_signature = "($self, prefix)")] + fn list_async<'a>(&'a self, py: Python<'a>, prefix: Option) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + let object_metas = flatten_list_stream(inner.as_ref(), prefix.map(Path::from).as_ref()) + .await + .map_err(ObjectStoreError::from)?; + let py_object_metas = object_metas + .into_iter() + .map(PyObjectMeta::from) + .collect::>(); + Ok(py_object_metas) + }) + } + /// List objects with the given prefix and an implementation specific /// delimiter. Returns common prefixes (directories) in addition to object /// metadata. @@ -594,6 +693,28 @@ impl PyObjectStore { Ok(list.into()) } + /// List objects with the given prefix and an implementation specific + /// delimiter. Returns common prefixes (directories) in addition to object + /// metadata. + /// + /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix + /// of `foo/bar/x` but not of `foo/bar_baz/x`. + #[pyo3(text_signature = "($self, prefix)")] + fn list_with_delimiter_async<'a>( + &'a self, + py: Python<'a>, + prefix: Option, + ) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + let list_result = inner + .list_with_delimiter(prefix.map(Path::from).as_ref()) + .await + .map_err(ObjectStoreError::from)?; + Ok(PyListResult::from(list_result)) + }) + } + /// Copy an object from one path to another in the same object store. /// /// If there exists an object at the destination, it will be overwritten. @@ -605,6 +726,21 @@ impl PyObjectStore { Ok(()) } + /// Copy an object from one path to another in the same object store. + /// + /// If there exists an object at the destination, it will be overwritten. + #[pyo3(text_signature = "($self, from, to)")] + fn copy_async<'a>(&'a self, py: Python<'a>, from: PyPath, to: PyPath) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + inner + .copy(&from.into(), &to.into()) + .await + .map_err(ObjectStoreError::from)?; + Ok(()) + }) + } + /// Copy an object from one path to another, only if destination is empty. /// /// Will return an error if the destination already has an object. @@ -616,6 +752,26 @@ impl PyObjectStore { Ok(()) } + /// Copy an object from one path to another, only if destination is empty. + /// + /// Will return an error if the destination already has an object. + #[pyo3(text_signature = "($self, from, to)")] + fn copy_if_not_exists_async<'a>( + &'a self, + py: Python<'a>, + from: PyPath, + to: PyPath, + ) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + inner + .copy_if_not_exists(&from.into(), &to.into()) + .await + .map_err(ObjectStoreError::from)?; + Ok(()) + }) + } + /// Move an object from one path to another in the same object store. /// /// By default, this is implemented as a copy and then delete source. It may not @@ -630,6 +786,24 @@ impl PyObjectStore { Ok(()) } + /// Move an object from one path to another in the same object store. + /// + /// By default, this is implemented as a copy and then delete source. It may not + /// check when deleting source that it was the same object that was originally copied. + /// + /// If there exists an object at the destination, it will be overwritten. + #[pyo3(text_signature = "($self, from, to)")] + fn rename_async<'a>(&'a self, py: Python<'a>, from: PyPath, to: PyPath) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + inner + .rename(&from.into(), &to.into()) + .await + .map_err(ObjectStoreError::from)?; + Ok(()) + }) + } + /// Move an object from one path to another in the same object store. /// /// Will return an error if the destination already has an object. @@ -641,6 +815,26 @@ impl PyObjectStore { Ok(()) } + /// Move an object from one path to another in the same object store. + /// + /// Will return an error if the destination already has an object. + #[pyo3(text_signature = "($self, from, to)")] + fn rename_if_not_exists_async<'a>( + &'a self, + py: Python<'a>, + from: PyPath, + to: PyPath, + ) -> PyResult<&PyAny> { + let inner = self.inner.clone(); + pyo3_asyncio::tokio::future_into_py(py, async move { + inner + .rename_if_not_exists(&from.into(), &to.into()) + .await + .map_err(ObjectStoreError::from)?; + Ok(()) + }) + } + pub fn __getnewargs__(&self) -> PyResult<(String, Option>)> { Ok((self.root_url.clone(), self.options.clone())) } diff --git a/object-store-internal/src/utils.rs b/object-store-internal/src/utils.rs index 8d841ea..08cd48f 100644 --- a/object-store-internal/src/utils.rs +++ b/object-store-internal/src/utils.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use bytes::Bytes; use futures::future::{join_all, BoxFuture, FutureExt}; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; @@ -82,6 +83,6 @@ pub async fn delete_dir(storage: &DynObjectStore, prefix: &Path) -> ObjectStoreR } /// get bytes from a location -pub async fn get_bytes(storage: &DynObjectStore, path: &Path) -> ObjectStoreResult> { - Ok(storage.get(path).await?.bytes().await?.into()) +pub async fn get_bytes(storage: &DynObjectStore, path: &Path) -> ObjectStoreResult { + storage.get(path).await?.bytes().await } diff --git a/object-store/python/object_store/_internal.pyi b/object-store/python/object_store/_internal.pyi index c3d6bd3..aad72e1 100644 --- a/object-store/python/object_store/_internal.pyi +++ b/object-store/python/object_store/_internal.pyi @@ -131,36 +131,70 @@ class ObjectStore: ) -> None: ... def get(self, location: Path) -> bytes: """Return the bytes that are stored at the specified location.""" + async def get_async(self, location: Path) -> bytes: + """Return the bytes that are stored at the specified location.""" def get_range(self, location: Path, start: int, length: int) -> bytes: """Return the bytes that are stored at the specified location in the given byte range.""" + async def get_range_async(self, location: Path, start: int, length: int) -> bytes: + """Return the bytes that are stored at the specified location in the given byte range.""" def put(self, location: Path, bytes: bytes) -> None: """Save the provided bytes to the specified location.""" + async def put_async(self, location: Path, bytes: bytes) -> None: + """Save the provided bytes to the specified location.""" def list(self, prefix: Path | None) -> list[ObjectMeta]: """List all the objects with the given prefix. + Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix + of `foo/bar/x` but not of `foo/bar_baz/x`. + """ + async def list_async(self, prefix: Path | None) -> list[ObjectMeta]: + """List all the objects with the given prefix. + Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of `foo/bar_baz/x`. """ def head(self, location: Path) -> ObjectMeta: """Return the metadata for the specified location""" + async def head_async(self, location: Path) -> ObjectMeta: + """Return the metadata for the specified location""" def list_with_delimiter(self, prefix: Path | None) -> ListResult: """List objects with the given prefix and an implementation specific delimiter. Returns common prefixes (directories) in addition to object metadata. + Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix + of `foo/bar/x` but not of `foo/bar_baz/x`. + """ + async def list_with_delimiter_async(self, prefix: Path | None) -> ListResult: + """List objects with the given prefix and an implementation specific + delimiter. Returns common prefixes (directories) in addition to object + metadata. + Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of `foo/bar_baz/x`. """ def delete(self, location: Path) -> None: """Delete the object at the specified location.""" + async def delete_async(self, location: Path) -> None: + """Delete the object at the specified location.""" def copy(self, src: Path, dst: Path) -> None: """Copy an object from one path to another in the same object store. + If there exists an object at the destination, it will be overwritten. + """ + async def copy_async(self, src: Path, dst: Path) -> None: + """Copy an object from one path to another in the same object store. + If there exists an object at the destination, it will be overwritten. """ def copy_if_not_exists(self, src: Path, dst: Path) -> None: """Copy an object from one path to another, only if destination is empty. + Will return an error if the destination already has an object. + """ + async def copy_if_not_exists_async(self, src: Path, dst: Path) -> None: + """Copy an object from one path to another, only if destination is empty. + Will return an error if the destination already has an object. """ def rename(self, src: Path, dst: Path) -> None: @@ -169,11 +203,24 @@ class ObjectStore: By default, this is implemented as a copy and then delete source. It may not check when deleting source that it was the same object that was originally copied. + If there exists an object at the destination, it will be overwritten. + """ + async def rename_async(self, src: Path, dst: Path) -> None: + """Move an object from one path to another in the same object store. + + By default, this is implemented as a copy and then delete source. It may not + check when deleting source that it was the same object that was originally copied. + If there exists an object at the destination, it will be overwritten. """ def rename_if_not_exists(self, src: Path, dst: Path) -> None: """Move an object from one path to another in the same object store. + Will return an error if the destination already has an object. + """ + async def rename_if_not_exists_async(self, src: Path, dst: Path) -> None: + """Move an object from one path to another in the same object store. + Will return an error if the destination already has an object. """