Skip to content

Commit

Permalink
Allow threads (#15)
Browse files Browse the repository at this point in the history
* Allow threads

* Revert changes
  • Loading branch information
adriangb authored Aug 5, 2024
1 parent 6144879 commit 1a2f6bc
Showing 1 changed file with 99 additions and 74 deletions.
173 changes: 99 additions & 74 deletions object-store-internal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,13 @@ impl PyObjectStore {

/// Save the provided bytes to the specified location.
#[pyo3(text_signature = "($self, location, bytes)")]
fn put(&self, location: PyPath, bytes: Vec<u8>) -> PyResult<()> {
self.rt
.block_on(self.inner.put(&location.into(), bytes.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
fn put(&self, py: Python, location: PyPath, bytes: Vec<u8>) -> PyResult<()> {
py.allow_threads(|| {
self.rt
.block_on(self.inner.put(&location.into(), bytes.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Save the provided bytes to the specified location.
Expand All @@ -535,13 +537,13 @@ impl PyObjectStore {

/// Return the bytes that are stored at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn get(&self, location: PyPath) -> PyResult<Cow<[u8]>> {
let obj = self
.rt
.block_on(get_bytes(self.inner.as_ref(), &location.into()))
.map_err(ObjectStoreError::from)?;
Ok(Cow::Owned(obj.to_vec()))
}
fn get(&self, py: Python, location: PyPath) -> PyResult<Cow<[u8]>> {
py.allow_threads(|| {
let obj = self
.rt
.block_on(get_bytes(self.inner.as_ref(), &location.into()))
.map_err(ObjectStoreError::from)?;
Ok(Cow::Owned(obj.to_vec()))

/// Return the bytes that are stored at the specified location.
#[pyo3(text_signature = "($self, location)")]
Expand All @@ -557,17 +559,24 @@ impl PyObjectStore {

/// Return the bytes that are stored at the specified location in the given byte range
#[pyo3(text_signature = "($self, location, start, length)")]
fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult<Cow<[u8]>> {
let range = std::ops::Range {
start,
end: start + length,
};
let obj = self
.rt
.block_on(self.inner.get_range(&location.into(), range))
.map_err(ObjectStoreError::from)?;
Ok(Cow::Owned(obj.to_vec()))
}
fn get_range(
&self,
py: Python,
location: PyPath,
start: usize,
length: usize,
) -> PyResult<Cow<[u8]>> {
py.allow_threads(|| {
let range = std::ops::Range {
start,
end: start + length,
};
let obj = self
.rt
.block_on(self.inner.get_range(&location.into(), range))
.map_err(ObjectStoreError::from)?
.to_vec();
Ok(Cow::Owned(obj.to_vec()))

/// Return the bytes that are stored at the specified location in the given byte range
#[pyo3(text_signature = "($self, location, start, length)")]
Expand Down Expand Up @@ -595,12 +604,14 @@ impl PyObjectStore {

/// Return the metadata for the specified location
#[pyo3(text_signature = "($self, location)")]
fn head(&self, location: PyPath) -> PyResult<PyObjectMeta> {
let meta = self
.rt
.block_on(self.inner.head(&location.into()))
.map_err(ObjectStoreError::from)?;
Ok(meta.into())
fn head(&self, py: Python, location: PyPath) -> PyResult<PyObjectMeta> {
py.allow_threads(|| {
let meta = self
.rt
.block_on(self.inner.head(&location.into()))
.map_err(ObjectStoreError::from)?;
Ok(meta.into())
})
}

/// Return the metadata for the specified location
Expand All @@ -618,11 +629,13 @@ impl PyObjectStore {

/// Delete the object at the specified location.
#[pyo3(text_signature = "($self, location)")]
fn delete(&self, location: PyPath) -> PyResult<()> {
self.rt
.block_on(self.inner.delete(&location.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
fn delete(&self, py: Python, location: PyPath) -> PyResult<()> {
py.allow_threads(|| {
self.rt
.block_on(self.inner.delete(&location.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Delete the object at the specified location.
Expand All @@ -643,17 +656,19 @@ impl PyObjectStore {
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
/// of `foo/bar/x` but not of `foo/bar_baz/x`.
#[pyo3(text_signature = "($self, prefix)")]
fn list(&self, prefix: Option<PyPath>) -> PyResult<Vec<PyObjectMeta>> {
Ok(self
.rt
.block_on(flatten_list_stream(
self.inner.as_ref(),
prefix.map(Path::from).as_ref(),
))
.map_err(ObjectStoreError::from)?
.into_iter()
.map(PyObjectMeta::from)
.collect())
fn list(&self, py: Python, prefix: Option<PyPath>) -> PyResult<Vec<PyObjectMeta>> {
py.allow_threads(|| {
Ok(self
.rt
.block_on(flatten_list_stream(
self.inner.as_ref(),
prefix.map(Path::from).as_ref(),
))
.map_err(ObjectStoreError::from)?
.into_iter()
.map(PyObjectMeta::from)
.collect())
})
}

/// List all the objects with the given prefix.
Expand Down Expand Up @@ -682,15 +697,17 @@ impl PyObjectStore {
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
/// of `foo/bar/x` but not of `foo/bar_baz/x`.
#[pyo3(text_signature = "($self, prefix)")]
fn list_with_delimiter(&self, prefix: Option<PyPath>) -> PyResult<PyListResult> {
let list = self
.rt
.block_on(
self.inner
.list_with_delimiter(prefix.map(Path::from).as_ref()),
)
.map_err(ObjectStoreError::from)?;
Ok(list.into())
fn list_with_delimiter(&self, py: Python, prefix: Option<PyPath>) -> PyResult<PyListResult> {
py.allow_threads(|| {
let list = self
.rt
.block_on(
self.inner
.list_with_delimiter(prefix.map(Path::from).as_ref()),
)
.map_err(ObjectStoreError::from)?;
Ok(list.into())
})
}

/// List objects with the given prefix and an implementation specific
Expand Down Expand Up @@ -719,11 +736,13 @@ impl PyObjectStore {
///
/// If there exists an object at the destination, it will be overwritten.
#[pyo3(text_signature = "($self, from, to)")]
fn copy(&self, from: PyPath, to: PyPath) -> PyResult<()> {
self.rt
.block_on(self.inner.copy(&from.into(), &to.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
fn copy(&self, py: Python, from: PyPath, to: PyPath) -> PyResult<()> {
py.allow_threads(|| {
self.rt
.block_on(self.inner.copy(&from.into(), &to.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Copy an object from one path to another in the same object store.
Expand All @@ -745,11 +764,13 @@ impl PyObjectStore {
///
/// Will return an error if the destination already has an object.
#[pyo3(text_signature = "($self, from, to)")]
fn copy_if_not_exists(&self, from: PyPath, to: PyPath) -> PyResult<()> {
self.rt
.block_on(self.inner.copy_if_not_exists(&from.into(), &to.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
fn copy_if_not_exists(&self, py: Python, from: PyPath, to: PyPath) -> PyResult<()> {
py.allow_threads(|| {
self.rt
.block_on(self.inner.copy_if_not_exists(&from.into(), &to.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Copy an object from one path to another, only if destination is empty.
Expand Down Expand Up @@ -779,11 +800,13 @@ impl PyObjectStore {
///
/// If there exists an object at the destination, it will be overwritten.
#[pyo3(text_signature = "($self, from, to)")]
fn rename(&self, from: PyPath, to: PyPath) -> PyResult<()> {
self.rt
.block_on(self.inner.rename(&from.into(), &to.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
fn rename(&self, py: Python, from: PyPath, to: PyPath) -> PyResult<()> {
py.allow_threads(|| {
self.rt
.block_on(self.inner.rename(&from.into(), &to.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Move an object from one path to another in the same object store.
Expand All @@ -808,11 +831,13 @@ impl PyObjectStore {
///
/// Will return an error if the destination already has an object.
#[pyo3(text_signature = "($self, from, to)")]
fn rename_if_not_exists(&self, from: PyPath, to: PyPath) -> PyResult<()> {
self.rt
.block_on(self.inner.rename_if_not_exists(&from.into(), &to.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
fn rename_if_not_exists(&self, py: Python, from: PyPath, to: PyPath) -> PyResult<()> {
py.allow_threads(|| {
self.rt
.block_on(self.inner.rename_if_not_exists(&from.into(), &to.into()))
.map_err(ObjectStoreError::from)?;
Ok(())
})
}

/// Move an object from one path to another in the same object store.
Expand Down

0 comments on commit 1a2f6bc

Please sign in to comment.