Skip to content

Commit

Permalink
Async object store support (#6)
Browse files Browse the repository at this point in the history
* feat: Async support

* fix: Fix Bytes output for returned get

* Update object-store/src/lib.rs

* fix: fix visibility
  • Loading branch information
kylebarron authored Jun 9, 2024
1 parent 445e9d7 commit 8ed1da1
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 12 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 30 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ implementation, with some slight adjustments for ease of use in python programs.
### `ObjectStore` api

```py
from object_store import ObjectStore, ObjectMeta
from object_store import ObjectStore, ObjectMeta, Path

# we use an in-memory store for demonstration purposes.
# data will not be persisted and is not shared across store instances
store = ObjectStore("memory://")

store.put("data", b"some data")
store.put(Path("data"), b"some data")

data = store.get("data")
assert data == b"some data"

blobs = store.list()

meta: ObjectMeta = store.head("data")
meta = store.head("data")

range = store.get_range("data", start=0, length=4)
assert range == b"some"
Expand All @@ -64,6 +64,33 @@ copied = store.get("copied")
assert copied == data
```

#### Async api

```py
from object_store import ObjectStore, ObjectMeta, Path

# we use an in-memory store for demonstration purposes.
# data will not be persisted and is not shared across store instances
store = ObjectStore("memory://")

path = Path("data")
await store.put_async(path, b"some data")

data = await store.get_async(path)
assert data == b"some data"

blobs = await store.list_async()

meta = await store.head_async(path)

range = await store.get_range_async(path, start=0, length=4)
assert range == b"some"

await store.copy_async(Path("data"), Path("copied"))
copied = await store.get_async(Path("copied"))
assert copied == data
```

### Configuration

As much as possible we aim to make access to various storage backends dependent
Expand Down
1 change: 1 addition & 0 deletions object-store-internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ once_cell = "1.12.0"
object_store = { version = "0.9", features = ["azure", "aws", "gcp"] }
percent-encoding = "2"
pyo3 = { version = "0.20", default-features = false, features = ["macros"] }
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] }
thiserror = "1.0.34"
tokio = { version = "1.0", features = [
"macros",
Expand Down
208 changes: 201 additions & 7 deletions object-store-internal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod builder;
mod file;
mod utils;

use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
Expand All @@ -19,7 +20,7 @@ use pyo3::exceptions::{
PyException, PyFileExistsError, PyFileNotFoundError, PyNotImplementedError,
};
use pyo3::prelude::*;
use pyo3::{types::PyBytes, PyErr};
use pyo3::PyErr;
use tokio::runtime::Runtime;

pub use builder::ObjectStoreBuilder;
Expand Down Expand Up @@ -514,29 +515,82 @@ impl PyObjectStore {
Ok(())
}

/// Save the provided bytes to the specified location.
#[pyo3(text_signature = "($self, location, bytes)")]
fn put_async<'a>(
&'a self,
py: Python<'a>,
location: PyPath,
bytes: Vec<u8>,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.put(&location.into(), bytes.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Return the bytes that are stored at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn get(&self, location: PyPath) -> PyResult<Py<PyBytes>> {
fn get(&self, location: PyPath) -> PyResult<Cow<[u8]>> {
let obj = self
.rt
.block_on(get_bytes(self.inner.as_ref(), &location.into()))
.map_err(ObjectStoreError::from)?;
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
Ok(Cow::Owned(obj.to_vec()))
}

/// Return the bytes that are stored at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn get_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let obj = get_bytes(inner.as_ref(), &location.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(Cow::<[u8]>::Owned(obj.to_vec()))
})
}

/// Return the bytes that are stored at the specified location in the given byte range
#[pyo3(text_signature = "($self, location, start, length)")]
fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult<Py<PyBytes>> {
fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult<Cow<[u8]>> {
let range = std::ops::Range {
start,
end: start + length,
};
let obj = self
.rt
.block_on(self.inner.get_range(&location.into(), range))
.map_err(ObjectStoreError::from)?
.to_vec();
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
.map_err(ObjectStoreError::from)?;
Ok(Cow::Owned(obj.to_vec()))
}

/// Return the bytes that are stored at the specified location in the given byte range
#[pyo3(text_signature = "($self, location, start, length)")]
fn get_range_async<'a>(
&'a self,
py: Python<'a>,
location: PyPath,
start: usize,
length: usize,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
let range = std::ops::Range {
start,
end: start + length,
};

pyo3_asyncio::tokio::future_into_py(py, async move {
let obj = inner
.get_range(&location.into(), range)
.await
.map_err(ObjectStoreError::from)?;
Ok(Cow::<[u8]>::Owned(obj.to_vec()))
})
}

/// Return the metadata for the specified location
Expand All @@ -549,6 +603,19 @@ impl PyObjectStore {
Ok(meta.into())
}

/// Return the metadata for the specified location
#[pyo3(text_signature = "($self, location)")]
fn head_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let meta = inner
.head(&location.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(PyObjectMeta::from(meta))
})
}

/// Delete the object at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn delete(&self, location: PyPath) -> PyResult<()> {
Expand All @@ -558,6 +625,19 @@ impl PyObjectStore {
Ok(())
}

/// Delete the object at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn delete_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.delete(&location.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// List all the objects with the given prefix.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
Expand All @@ -576,6 +656,25 @@ impl PyObjectStore {
.collect())
}

/// List all the objects with the given prefix.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
/// of `foo/bar/x` but not of `foo/bar_baz/x`.
#[pyo3(text_signature = "($self, prefix)")]
fn list_async<'a>(&'a self, py: Python<'a>, prefix: Option<PyPath>) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let object_metas = flatten_list_stream(inner.as_ref(), prefix.map(Path::from).as_ref())
.await
.map_err(ObjectStoreError::from)?;
let py_object_metas = object_metas
.into_iter()
.map(PyObjectMeta::from)
.collect::<Vec<_>>();
Ok(py_object_metas)
})
}

/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
Expand All @@ -594,6 +693,28 @@ impl PyObjectStore {
Ok(list.into())
}

/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
/// of `foo/bar/x` but not of `foo/bar_baz/x`.
#[pyo3(text_signature = "($self, prefix)")]
fn list_with_delimiter_async<'a>(
&'a self,
py: Python<'a>,
prefix: Option<PyPath>,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let list_result = inner
.list_with_delimiter(prefix.map(Path::from).as_ref())
.await
.map_err(ObjectStoreError::from)?;
Ok(PyListResult::from(list_result))
})
}

/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
Expand All @@ -605,6 +726,21 @@ impl PyObjectStore {
Ok(())
}

/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
#[pyo3(text_signature = "($self, from, to)")]
fn copy_async<'a>(&'a self, py: Python<'a>, from: PyPath, to: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.copy(&from.into(), &to.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Copy an object from one path to another, only if destination is empty.
///
/// Will return an error if the destination already has an object.
Expand All @@ -616,6 +752,26 @@ impl PyObjectStore {
Ok(())
}

/// Copy an object from one path to another, only if destination is empty.
///
/// Will return an error if the destination already has an object.
#[pyo3(text_signature = "($self, from, to)")]
fn copy_if_not_exists_async<'a>(
&'a self,
py: Python<'a>,
from: PyPath,
to: PyPath,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.copy_if_not_exists(&from.into(), &to.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Move an object from one path to another in the same object store.
///
/// By default, this is implemented as a copy and then delete source. It may not
Expand All @@ -630,6 +786,24 @@ impl PyObjectStore {
Ok(())
}

/// Move an object from one path to another in the same object store.
///
/// By default, this is implemented as a copy and then delete source. It may not
/// check when deleting source that it was the same object that was originally copied.
///
/// If there exists an object at the destination, it will be overwritten.
#[pyo3(text_signature = "($self, from, to)")]
fn rename_async<'a>(&'a self, py: Python<'a>, from: PyPath, to: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.rename(&from.into(), &to.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Move an object from one path to another in the same object store.
///
/// Will return an error if the destination already has an object.
Expand All @@ -641,6 +815,26 @@ impl PyObjectStore {
Ok(())
}

/// Move an object from one path to another in the same object store.
///
/// Will return an error if the destination already has an object.
#[pyo3(text_signature = "($self, from, to)")]
fn rename_if_not_exists_async<'a>(
&'a self,
py: Python<'a>,
from: PyPath,
to: PyPath,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.rename_if_not_exists(&from.into(), &to.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

pub fn __getnewargs__(&self) -> PyResult<(String, Option<HashMap<String, String>>)> {
Ok((self.root_url.clone(), self.options.clone()))
}
Expand Down
Loading

0 comments on commit 8ed1da1

Please sign in to comment.