Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async object store support #6

Merged
merged 5 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 30 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ implementation, with some slight adjustments for ease of use in python programs.
### `ObjectStore` api

```py
from object_store import ObjectStore, ObjectMeta
from object_store import ObjectStore, ObjectMeta, Path

# we use an in-memory store for demonstration purposes.
# data will not be persisted and is not shared across store instances
store = ObjectStore("memory://")

store.put("data", b"some data")
store.put(Path("data"), b"some data")

data = store.get("data")
assert data == b"some data"

blobs = store.list()

meta: ObjectMeta = store.head("data")
meta = store.head("data")

range = store.get_range("data", start=0, length=4)
assert range == b"some"
Expand All @@ -64,6 +64,33 @@ copied = store.get("copied")
assert copied == data
```

#### Async api

```py
from object_store import ObjectStore, ObjectMeta, Path

# we use an in-memory store for demonstration purposes.
# data will not be persisted and is not shared across store instances
store = ObjectStore("memory://")

path = Path("data")
await store.put_async(path, b"some data")

data = await store.get_async(path)
assert data == b"some data"

blobs = await store.list_async()

meta = await store.head_async(path)

range = await store.get_range_async(path, start=0, length=4)
assert range == b"some"

await store.copy_async(Path("data"), Path("copied"))
copied = await store.get_async(Path("copied"))
assert copied == data
```

### Configuration

As much as possible we aim to make access to various storage backends dependent
Expand Down
1 change: 1 addition & 0 deletions object-store-internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ once_cell = "1.12.0"
object_store = { version = "0.9", features = ["azure", "aws", "gcp"] }
percent-encoding = "2"
pyo3 = { version = "0.20", default-features = false, features = ["macros"] }
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] }
thiserror = "1.0.34"
tokio = { version = "1.0", features = [
"macros",
Expand Down
208 changes: 201 additions & 7 deletions object-store-internal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod builder;
mod file;
mod utils;

use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
Expand All @@ -19,7 +20,7 @@ use pyo3::exceptions::{
PyException, PyFileExistsError, PyFileNotFoundError, PyNotImplementedError,
};
use pyo3::prelude::*;
use pyo3::{types::PyBytes, PyErr};
use pyo3::PyErr;
use tokio::runtime::Runtime;

pub use builder::ObjectStoreBuilder;
Expand Down Expand Up @@ -514,29 +515,82 @@ impl PyObjectStore {
Ok(())
}

/// Save the provided bytes to the specified location.
#[pyo3(text_signature = "($self, location, bytes)")]
fn put_async<'a>(
&'a self,
py: Python<'a>,
location: PyPath,
bytes: Vec<u8>,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.put(&location.into(), bytes.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Return the bytes that are stored at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn get(&self, location: PyPath) -> PyResult<Py<PyBytes>> {
fn get(&self, location: PyPath) -> PyResult<Cow<[u8]>> {
let obj = self
.rt
.block_on(get_bytes(self.inner.as_ref(), &location.into()))
.map_err(ObjectStoreError::from)?;
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
Ok(Cow::Owned(obj.to_vec()))
}

/// Return the bytes that are stored at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn get_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let obj = get_bytes(inner.as_ref(), &location.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(Cow::<[u8]>::Owned(obj.to_vec()))
})
}

/// Return the bytes that are stored at the specified location in the given byte range
#[pyo3(text_signature = "($self, location, start, length)")]
fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult<Py<PyBytes>> {
fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult<Cow<[u8]>> {
let range = std::ops::Range {
start,
end: start + length,
};
let obj = self
.rt
.block_on(self.inner.get_range(&location.into(), range))
.map_err(ObjectStoreError::from)?
.to_vec();
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
.map_err(ObjectStoreError::from)?;
Ok(Cow::Owned(obj.to_vec()))
}

/// Return the bytes that are stored at the specified location in the given byte range
#[pyo3(text_signature = "($self, location, start, length)")]
fn get_range_async<'a>(
&'a self,
py: Python<'a>,
location: PyPath,
start: usize,
length: usize,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
let range = std::ops::Range {
start,
end: start + length,
};

pyo3_asyncio::tokio::future_into_py(py, async move {
let obj = inner
.get_range(&location.into(), range)
.await
.map_err(ObjectStoreError::from)?;
Ok(Cow::<[u8]>::Owned(obj.to_vec()))
})
}

/// Return the metadata for the specified location
Expand All @@ -549,6 +603,19 @@ impl PyObjectStore {
Ok(meta.into())
}

/// Return the metadata for the specified location
#[pyo3(text_signature = "($self, location)")]
fn head_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let meta = inner
.head(&location.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(PyObjectMeta::from(meta))
})
}

/// Delete the object at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn delete(&self, location: PyPath) -> PyResult<()> {
Expand All @@ -558,6 +625,19 @@ impl PyObjectStore {
Ok(())
}

/// Delete the object at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn delete_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.delete(&location.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// List all the objects with the given prefix.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
Expand All @@ -576,6 +656,25 @@ impl PyObjectStore {
.collect())
}

/// List all the objects with the given prefix.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
/// of `foo/bar/x` but not of `foo/bar_baz/x`.
#[pyo3(text_signature = "($self, prefix)")]
fn list_async<'a>(&'a self, py: Python<'a>, prefix: Option<PyPath>) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let object_metas = flatten_list_stream(inner.as_ref(), prefix.map(Path::from).as_ref())
.await
.map_err(ObjectStoreError::from)?;
let py_object_metas = object_metas
.into_iter()
.map(PyObjectMeta::from)
.collect::<Vec<_>>();
Ok(py_object_metas)
})
}

/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
Expand All @@ -594,6 +693,28 @@ impl PyObjectStore {
Ok(list.into())
}

/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
///
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
/// of `foo/bar/x` but not of `foo/bar_baz/x`.
#[pyo3(text_signature = "($self, prefix)")]
fn list_with_delimiter_async<'a>(
&'a self,
py: Python<'a>,
prefix: Option<PyPath>,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let list_result = inner
.list_with_delimiter(prefix.map(Path::from).as_ref())
.await
.map_err(ObjectStoreError::from)?;
Ok(PyListResult::from(list_result))
})
}

/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
Expand All @@ -605,6 +726,21 @@ impl PyObjectStore {
Ok(())
}

/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
#[pyo3(text_signature = "($self, from, to)")]
fn copy_async<'a>(&'a self, py: Python<'a>, from: PyPath, to: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.copy(&from.into(), &to.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Copy an object from one path to another, only if destination is empty.
///
/// Will return an error if the destination already has an object.
Expand All @@ -616,6 +752,26 @@ impl PyObjectStore {
Ok(())
}

/// Copy an object from one path to another, only if destination is empty.
///
/// Will return an error if the destination already has an object.
#[pyo3(text_signature = "($self, from, to)")]
fn copy_if_not_exists_async<'a>(
&'a self,
py: Python<'a>,
from: PyPath,
to: PyPath,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.copy_if_not_exists(&from.into(), &to.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Move an object from one path to another in the same object store.
///
/// By default, this is implemented as a copy and then delete source. It may not
Expand All @@ -630,6 +786,24 @@ impl PyObjectStore {
Ok(())
}

/// Move an object from one path to another in the same object store.
///
/// By default, this is implemented as a copy and then delete source. It may not
/// check when deleting source that it was the same object that was originally copied.
///
/// If there exists an object at the destination, it will be overwritten.
#[pyo3(text_signature = "($self, from, to)")]
fn rename_async<'a>(&'a self, py: Python<'a>, from: PyPath, to: PyPath) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.rename(&from.into(), &to.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Move an object from one path to another in the same object store.
///
/// Will return an error if the destination already has an object.
Expand All @@ -641,6 +815,26 @@ impl PyObjectStore {
Ok(())
}

/// Move an object from one path to another in the same object store.
///
/// Will return an error if the destination already has an object.
#[pyo3(text_signature = "($self, from, to)")]
fn rename_if_not_exists_async<'a>(
&'a self,
py: Python<'a>,
from: PyPath,
to: PyPath,
) -> PyResult<&PyAny> {
let inner = self.inner.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
inner
.rename_if_not_exists(&from.into(), &to.into())
.await
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

pub fn __getnewargs__(&self) -> PyResult<(String, Option<HashMap<String, String>>)> {
Ok((self.root_url.clone(), self.options.clone()))
}
Expand Down
Loading
Loading