From 1a2f6bc2bc624eb6e5833a88916da4ed2e3ba779 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 Aug 2024 14:58:22 -0500 Subject: [PATCH] Allow threads (#15) * Allow threads * Revert changes --- object-store-internal/src/lib.rs | 173 ++++++++++++++++++------------- 1 file changed, 99 insertions(+), 74 deletions(-) diff --git a/object-store-internal/src/lib.rs b/object-store-internal/src/lib.rs index 9639993..6b8b5a1 100644 --- a/object-store-internal/src/lib.rs +++ b/object-store-internal/src/lib.rs @@ -508,11 +508,13 @@ impl PyObjectStore { /// Save the provided bytes to the specified location. #[pyo3(text_signature = "($self, location, bytes)")] - fn put(&self, location: PyPath, bytes: Vec) -> PyResult<()> { - self.rt - .block_on(self.inner.put(&location.into(), bytes.into())) - .map_err(ObjectStoreError::from)?; - Ok(()) + fn put(&self, py: Python, location: PyPath, bytes: Vec) -> PyResult<()> { + py.allow_threads(|| { + self.rt + .block_on(self.inner.put(&location.into(), bytes.into())) + .map_err(ObjectStoreError::from)?; + Ok(()) + }) } /// Save the provided bytes to the specified location. @@ -535,13 +537,13 @@ impl PyObjectStore { /// Return the bytes that are stored at the specified location. #[pyo3(text_signature = "($self, location)")] - fn get(&self, location: PyPath) -> PyResult> { - let obj = self - .rt - .block_on(get_bytes(self.inner.as_ref(), &location.into())) - .map_err(ObjectStoreError::from)?; - Ok(Cow::Owned(obj.to_vec())) - } + fn get(&self, py: Python, location: PyPath) -> PyResult> { + py.allow_threads(|| { + let obj = self + .rt + .block_on(get_bytes(self.inner.as_ref(), &location.into())) + .map_err(ObjectStoreError::from)?; + Ok(Cow::Owned(obj.to_vec())) /// Return the bytes that are stored at the specified location. #[pyo3(text_signature = "($self, location)")] @@ -557,17 +559,24 @@ impl PyObjectStore { /// Return the bytes that are stored at the specified location in the given byte range #[pyo3(text_signature = "($self, location, start, length)")] - fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult> { - let range = std::ops::Range { - start, - end: start + length, - }; - let obj = self - .rt - .block_on(self.inner.get_range(&location.into(), range)) - .map_err(ObjectStoreError::from)?; - Ok(Cow::Owned(obj.to_vec())) - } + fn get_range( + &self, + py: Python, + location: PyPath, + start: usize, + length: usize, + ) -> PyResult> { + py.allow_threads(|| { + let range = std::ops::Range { + start, + end: start + length, + }; + let obj = self + .rt + .block_on(self.inner.get_range(&location.into(), range)) + .map_err(ObjectStoreError::from)? + .to_vec(); + Ok(Cow::Owned(obj.to_vec())) /// Return the bytes that are stored at the specified location in the given byte range #[pyo3(text_signature = "($self, location, start, length)")] @@ -595,12 +604,14 @@ impl PyObjectStore { /// Return the metadata for the specified location #[pyo3(text_signature = "($self, location)")] - fn head(&self, location: PyPath) -> PyResult { - let meta = self - .rt - .block_on(self.inner.head(&location.into())) - .map_err(ObjectStoreError::from)?; - Ok(meta.into()) + fn head(&self, py: Python, location: PyPath) -> PyResult { + py.allow_threads(|| { + let meta = self + .rt + .block_on(self.inner.head(&location.into())) + .map_err(ObjectStoreError::from)?; + Ok(meta.into()) + }) } /// Return the metadata for the specified location @@ -618,11 +629,13 @@ impl PyObjectStore { /// Delete the object at the specified location. #[pyo3(text_signature = "($self, location)")] - fn delete(&self, location: PyPath) -> PyResult<()> { - self.rt - .block_on(self.inner.delete(&location.into())) - .map_err(ObjectStoreError::from)?; - Ok(()) + fn delete(&self, py: Python, location: PyPath) -> PyResult<()> { + py.allow_threads(|| { + self.rt + .block_on(self.inner.delete(&location.into())) + .map_err(ObjectStoreError::from)?; + Ok(()) + }) } /// Delete the object at the specified location. @@ -643,17 +656,19 @@ impl PyObjectStore { /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix /// of `foo/bar/x` but not of `foo/bar_baz/x`. #[pyo3(text_signature = "($self, prefix)")] - fn list(&self, prefix: Option) -> PyResult> { - Ok(self - .rt - .block_on(flatten_list_stream( - self.inner.as_ref(), - prefix.map(Path::from).as_ref(), - )) - .map_err(ObjectStoreError::from)? - .into_iter() - .map(PyObjectMeta::from) - .collect()) + fn list(&self, py: Python, prefix: Option) -> PyResult> { + py.allow_threads(|| { + Ok(self + .rt + .block_on(flatten_list_stream( + self.inner.as_ref(), + prefix.map(Path::from).as_ref(), + )) + .map_err(ObjectStoreError::from)? + .into_iter() + .map(PyObjectMeta::from) + .collect()) + }) } /// List all the objects with the given prefix. @@ -682,15 +697,17 @@ impl PyObjectStore { /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix /// of `foo/bar/x` but not of `foo/bar_baz/x`. #[pyo3(text_signature = "($self, prefix)")] - fn list_with_delimiter(&self, prefix: Option) -> PyResult { - let list = self - .rt - .block_on( - self.inner - .list_with_delimiter(prefix.map(Path::from).as_ref()), - ) - .map_err(ObjectStoreError::from)?; - Ok(list.into()) + fn list_with_delimiter(&self, py: Python, prefix: Option) -> PyResult { + py.allow_threads(|| { + let list = self + .rt + .block_on( + self.inner + .list_with_delimiter(prefix.map(Path::from).as_ref()), + ) + .map_err(ObjectStoreError::from)?; + Ok(list.into()) + }) } /// List objects with the given prefix and an implementation specific @@ -719,11 +736,13 @@ impl PyObjectStore { /// /// If there exists an object at the destination, it will be overwritten. #[pyo3(text_signature = "($self, from, to)")] - fn copy(&self, from: PyPath, to: PyPath) -> PyResult<()> { - self.rt - .block_on(self.inner.copy(&from.into(), &to.into())) - .map_err(ObjectStoreError::from)?; - Ok(()) + fn copy(&self, py: Python, from: PyPath, to: PyPath) -> PyResult<()> { + py.allow_threads(|| { + self.rt + .block_on(self.inner.copy(&from.into(), &to.into())) + .map_err(ObjectStoreError::from)?; + Ok(()) + }) } /// Copy an object from one path to another in the same object store. @@ -745,11 +764,13 @@ impl PyObjectStore { /// /// Will return an error if the destination already has an object. #[pyo3(text_signature = "($self, from, to)")] - fn copy_if_not_exists(&self, from: PyPath, to: PyPath) -> PyResult<()> { - self.rt - .block_on(self.inner.copy_if_not_exists(&from.into(), &to.into())) - .map_err(ObjectStoreError::from)?; - Ok(()) + fn copy_if_not_exists(&self, py: Python, from: PyPath, to: PyPath) -> PyResult<()> { + py.allow_threads(|| { + self.rt + .block_on(self.inner.copy_if_not_exists(&from.into(), &to.into())) + .map_err(ObjectStoreError::from)?; + Ok(()) + }) } /// Copy an object from one path to another, only if destination is empty. @@ -779,11 +800,13 @@ impl PyObjectStore { /// /// If there exists an object at the destination, it will be overwritten. #[pyo3(text_signature = "($self, from, to)")] - fn rename(&self, from: PyPath, to: PyPath) -> PyResult<()> { - self.rt - .block_on(self.inner.rename(&from.into(), &to.into())) - .map_err(ObjectStoreError::from)?; - Ok(()) + fn rename(&self, py: Python, from: PyPath, to: PyPath) -> PyResult<()> { + py.allow_threads(|| { + self.rt + .block_on(self.inner.rename(&from.into(), &to.into())) + .map_err(ObjectStoreError::from)?; + Ok(()) + }) } /// Move an object from one path to another in the same object store. @@ -808,11 +831,13 @@ impl PyObjectStore { /// /// Will return an error if the destination already has an object. #[pyo3(text_signature = "($self, from, to)")] - fn rename_if_not_exists(&self, from: PyPath, to: PyPath) -> PyResult<()> { - self.rt - .block_on(self.inner.rename_if_not_exists(&from.into(), &to.into())) - .map_err(ObjectStoreError::from)?; - Ok(()) + fn rename_if_not_exists(&self, py: Python, from: PyPath, to: PyPath) -> PyResult<()> { + py.allow_threads(|| { + self.rt + .block_on(self.inner.rename_if_not_exists(&from.into(), &to.into())) + .map_err(ObjectStoreError::from)?; + Ok(()) + }) } /// Move an object from one path to another in the same object store.