From c2cc3722a796357120507193002be267e4e569a9 Mon Sep 17 00:00:00 2001 From: Alan Chen Date: Thu, 31 Oct 2024 15:33:03 -0700 Subject: [PATCH 1/3] fix: producer async_wait error return (#500) * chore: python 3.8 EOL * fix: produce output result misconversion always resulted in None break out modules as first refactor out of giant file monoliths * chore: make lint refresh and fmt fixes * chore: update CHANGELOG.md --- .github/workflows/ci.yml | 6 +- .github/workflows/cloud.yml | 2 +- CHANGELOG.md | 81 ++++++++++++ Cargo.toml | 3 +- README.md | 8 +- fluvio/__init__.py | 6 +- integration-tests/test_fluvio_python.py | 40 ------ integration-tests/test_produce.py | 159 ++++++++++++++++++++++++ requirements.txt | 4 +- setup.py | 3 +- src/lib.rs | 51 ++------ src/produce_output.rs | 62 +++++++++ 12 files changed, 324 insertions(+), 101 deletions(-) create mode 100644 integration-tests/test_produce.py create mode 100644 src/produce_output.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a97cae9a..b949d798 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macOS-latest] # TODO: add windows-2019, - python-version: ["38", "39", "310", "311", "312"] + python-version: ["39", "310", "311", "312"] env: CIBW_SKIP: "cp36-* pp* *-win32" CIBW_ARCHS_MACOS: x86_64 universal2 @@ -132,7 +132,7 @@ jobs: matrix: os: [ubuntu-latest] rust: [stable] - python-version: ["3.11"] + python-version: ["3.12"] steps: - uses: actions/checkout@v4 - uses: actions/cache@v4 @@ -231,7 +231,7 @@ jobs: matrix: os: [macos-latest] rust: [stable] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v4 - name: Install rust ${{ matrix.rust }} diff --git a/.github/workflows/cloud.yml b/.github/workflows/cloud.yml index f224092c..fe4ab63a 100644 --- a/.github/workflows/cloud.yml +++ b/.github/workflows/cloud.yml @@ -18,7 +18,7 @@ jobs: matrix: os: [ubuntu-latest, macos-latest] rust: [stable] - python-version: ["3.8", "3.12"] + python-version: ["3.9", "3.12"] steps: - uses: actions/checkout@v4 - uses: actions/cache@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 20fd0292..1c5be5a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,87 @@ All notable changes to this project will be documented in this file. +## [0.16.5] + +### Bug Fixes + +- Produce output async_wait result misconversion always resulted in None + +### Miscellaneous Tasks + +- Python 3.8 EOL +- Update CHANGELOG.md +- README.md update +- Make lint refresh and fmt fixes + +## [0.16.4] - 2024-10-11 + +### Miscellaneous Tasks + +- Update to fluvio 0.12.0 (#495) +- Fix readme examples (#465) +- Bump pypa/gh-action-pypi-publish from 1.9.0 to 1.10.0 (#469) +- Update cc requirement from =1.1.12 to =1.1.15 (#467) + +## [0.16.3] - 2024-08-21 + +### Bug Fixes + +- Switch to rustls (#376) +- Wrap multiple-partition-consumer correctly +- Change to recommended dev build (#396) +- Bindgen dependency when cross compilation (#463) +- Publish build wheels (#464) + +### Features + +- Release GIL for most "run_block_on" calls +- Add python realization of ProduceOutput and RecordMetadata + +### Miscellaneous Tasks + +- Ci update outdated deps (#372) +- Use fixed version of cc (#379) +- Fluvio-cloud-ci fix profile cfg +- Update to fluvio 0.11.5 +- Bump cargo.toml minor ver +- Bump black from 24.1.1 to 24.3.0 (#395) +- Bump setuptools-rust from 1.8.1 to 1.9.0 (#374) +- Bump cibuildwheel from 2.16.0 to 2.17.0 (#391) +- Bump pypa/cibuildwheel from 2.16.5 to 2.17.0 (#392) +- Update async-lock requirement from 2.4.0 to 3.3.0 (#387) +- Bump black from 24.3.0 to 24.4.0 (#401) +- Bump fluvio-types from v0.11.5 to v0.11.6 (#402) +- Update webbrowser requirement from 0.8.2 to 1.0.0 (#405) +- Update to fluvio 0.11.8 +- Ci, publish, add toolchain +- Bump pip version +- Bump cibuildwheel from 2.18.1 to 2.19.0 (#428) +- Bump pypa/cibuildwheel from 2.18.1 to 2.19.0 (#427) +- Bump fluvio-types from v0.11.8 to v0.11.9 (#425) +- Update fluvio (#432) +- Update cc requirement from =1.0.83 to =1.1.0 (#433) +- Refresh docs +- Ci, push docs on tag event +- Bump black from 24.4.0 to 24.8.0 (#450) +- Bump setuptools-rust from 1.9.0 to 1.10.1 (#449) +- Bump cibuildwheel from 2.19.0 to 2.20.0 (#446) +- Bump pypa/cibuildwheel from 2.19.0 to 2.20.0 (#451) +- Fluvio update to 0.11.11 (#445) +- Ci cloud fix (#456) +- Update cc requirement from =1.1.5 to =1.1.11 (#459) +- Update cc requirement from =1.1.11 to =1.1.12 (#460) +- Use admin module instead of cli in tests (#457) + +### Build + +- Bump pypa/gh-action-pypi-publish from 1.8.11 to 1.9.0 (#431) +- Update cc requirement from =1.1.0 to =1.1.5 (#434) + +## [0.16.1] [v0.16.2] + +- publishing fix releases + ## [0.16.0] - 2023-01-31 ### Miscellaneous Tasks diff --git a/Cargo.toml b/Cargo.toml index efc5c7f0..fc113f87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,4 +44,5 @@ fluvio-sc-schema = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.1 fluvio-controlplane-metadata = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.0" } # transitive version selection -parking_lot = "0.12.3" \ No newline at end of file +parking_lot = "0.12.3" +bytes = { version = "1.8.0" } diff --git a/README.md b/README.md index 1131ed5f..3915c555 100644 --- a/README.md +++ b/README.md @@ -46,11 +46,11 @@ for i in stream: # Developer Notes -This project uses [flapigen](https://github.com/Dushistov/flapigen-rs) to -genate the C static library and -[setuptools-rust](https://github.com/PyO3/setuptools-rust) to bundle it into a +This project uses [PyO3](https://pyo3.rs) to wrap the fluvio crate. + +[setuptools-rust](https://github.com/PyO3/setuptools-rust) bundles it into a python package. For cross platform builds, - [cibuildwheel](https://github.com/joerick/cibuildwheel) is used. +[cibuildwheel](https://github.com/joerick/cibuildwheel) is used. Running the tests locally require having already setup a [fluvio locally](https://www.fluvio.io/docs/getting-started/fluvio-local/) or on diff --git a/fluvio/__init__.py b/fluvio/__init__.py index 0c67115c..c48c7a5f 100644 --- a/fluvio/__init__.py +++ b/fluvio/__init__.py @@ -97,6 +97,7 @@ def wait(self) -> typing.Optional[RecordMetadata]: This is a blocking call and may only return a `RecordMetadata` once. Any subsequent call to `wait` will return a `None` value. + Errors will be raised as exceptions of type `FluvioError`. """ res = self._inner.wait() if res is None: @@ -109,10 +110,7 @@ async def async_wait(self) -> typing.Optional[RecordMetadata]: This may only return a `RecordMetadata` once. Any subsequent call to `wait` will return a `None` value. """ - res = await self._inner.async_wait() - if res is None: - return None - return RecordMetadata(res) + return await self._inner.async_wait() class Offset: diff --git a/integration-tests/test_fluvio_python.py b/integration-tests/test_fluvio_python.py index 5aaaf2ba..cb9aabd7 100644 --- a/integration-tests/test_fluvio_python.py +++ b/integration-tests/test_fluvio_python.py @@ -390,46 +390,6 @@ def test_consume_with_array_map_example(self): class TestAsyncFluvioMethods(CommonAsyncFluvioSmartModuleTestCase): - async def test_async_produce(self): - fluvio = Fluvio.connect() - - producer = fluvio.topic_producer(self.topic) - for i in range(10): - await producer.async_send_string("FOOBAR %s " % i) - - async def test_async_produce_record_metadata(self): - fluvio = Fluvio.connect() - - producer = fluvio.topic_producer(self.topic) - - msg_strings = ["Foobar %s" % i for i in range(10)] - produce_outputs = [await producer.async_send_string(msg) for msg in msg_strings] - - records = [ - (("%s" % i).encode(), msg_string.encode()) - for i, msg_string in enumerate(msg_strings) - ] - produce_outputs.extend(await producer.async_send_all(records)) - - record_metadata = [ - await produce_output.async_wait() for produce_output in produce_outputs - ] - - partition_id = 0 - consumer = fluvio.partition_consumer(self.topic, partition_id) - messages = list( - itertools.islice(consumer.stream(Offset.beginning()), len(produce_outputs)) - ) - - for metadata, msg in zip(record_metadata, messages): - self.assertNotEqual(metadata, None) - self.assertEqual(metadata.partition_id(), partition_id) - self.assertEqual(metadata.offset(), msg.offset()) - - for produce_output in produce_outputs: - # subsequent calls to po.wait shall return None - self.assertEqual(produce_output.wait(), None) - async def test_consumer_with_interator(self): fluvio = Fluvio.connect() producer = fluvio.topic_producer(self.topic) diff --git a/integration-tests/test_produce.py b/integration-tests/test_produce.py new file mode 100644 index 00000000..cb17bcdd --- /dev/null +++ b/integration-tests/test_produce.py @@ -0,0 +1,159 @@ +from fluvio import Fluvio, Offset, FluvioConfig +from fluvio import FluvioAdmin +import unittest +import uuid +import itertools +import time + + +def create_smartmodule(sm_name, sm_path): + # Normally it would be this code, but bare wasm smartmodules + # are used in the python client testing, & only the cli supports it so far + # dry_run = False + # fluvio_admin = FluvioAdmin.connect() + # fluvio_admin.create_smartmodule(sm_name, sm_path, dry_run) + import subprocess + + subprocess.run( + f"fluvio smartmodule create {sm_name} --wasm-file {sm_path}", shell=True + ).check_returncode() + + +class CommonFluvioSetup(unittest.TestCase): + def common_setup(self, sm_path=None): + """Optionally create a smartmodule if sm_path is provided""" + self.admin = FluvioAdmin.connect() + self.topic = str(uuid.uuid4()) + self.sm_name = str(uuid.uuid4()) + self.sm_path = sm_path + + if sm_path is not None: + create_smartmodule(self.sm_name, sm_path) + + try: + self.admin.create_topic(self.topic) + except Exception as err: + print("Retrying after create_topic error {}", err) + time.sleep(5) + self.admin.create_topic(self.topic) + + # list topics to verify topic was created + max_retries = 100 + while max_retries > 0: + topic = self.admin.list_topics([self.topic]) + if len(topic) > 0: + break + max_retries -= 1 + if max_retries == 0: + self.fail("setup: Failed to create topic") + time.sleep(0.1) + + def setUp(self): + self.common_setup() + + def tearDown(self): + self.admin.delete_topic(self.topic) + time.sleep(1) + if self.sm_path is not None: + self.admin.delete_smartmodule(self.sm_name) + + +class TestFluvioProduce(CommonFluvioSetup): + def test_connect(self): + # A very simple test + Fluvio.connect() + + def test_connect_with_config(self): + config = FluvioConfig.load() + Fluvio.connect_with_config(config) + + def test_produce(self): + fluvio = Fluvio.connect() + + producer = fluvio.topic_producer(self.topic) + for i in range(10): + producer.send_string("FOOBAR %s " % i) + + def test_produce_record_metadata(self): + fluvio = Fluvio.connect() + + producer = fluvio.topic_producer(self.topic) + + msg_strings = ["Foobar %s" % i for i in range(10)] + produce_outputs = [producer.send_string(msg) for msg in msg_strings] + + records = [ + (("%s" % i).encode(), msg_string.encode()) + for i, msg_string in enumerate(msg_strings) + ] + produce_outputs.extend(producer.send_all(records)) + + record_metadata = [produce_output.wait() for produce_output in produce_outputs] + + partition_id = 0 + consumer = fluvio.partition_consumer(self.topic, partition_id) + messages = list( + itertools.islice(consumer.stream(Offset.beginning()), len(produce_outputs)) + ) + + for metadata, msg in zip(record_metadata, messages): + self.assertNotEqual(metadata, None) + self.assertEqual(metadata.partition_id(), partition_id) + self.assertEqual(metadata.offset(), msg.offset()) + + for produce_output in produce_outputs: + # subsequent calls to po.wait shall return None + self.assertEqual(produce_output.wait(), None) + + +class TestFluvioProduceAsync(unittest.IsolatedAsyncioTestCase, CommonFluvioSetup): + async def test_async_produce(self): + fluvio = Fluvio.connect() + + producer = fluvio.topic_producer(self.topic) + for i in range(10): + ret = await producer.async_send_string("FOOBAR %s " % i) + ret.wait() + + async def test_async_produce_async_wait(self): + """simple test, test_async_produce_record_metadata is more comprehensive""" + fluvio = Fluvio.connect() + + producer = fluvio.topic_producer(self.topic) + future = await producer.async_send_string("FOOBAR async async") + out_future = future.async_wait() + result = await out_future + self.assertNotEqual(result, None) + + async def test_async_produce_record_metadata(self): + fluvio = Fluvio.connect() + + producer = fluvio.topic_producer(self.topic) + + msg_strings = ["Foobar %s" % i for i in range(10)] + produce_outputs = [await producer.async_send_string(msg) for msg in msg_strings] + + records = [ + (("%s" % i).encode(), msg_string.encode()) + for i, msg_string in enumerate(msg_strings) + ] + produce_outputs.extend(await producer.async_send_all(records)) + + record_metadata = [ + await produce_output.async_wait() for produce_output in produce_outputs + ] + + partition_id = 0 + consumer = fluvio.partition_consumer(self.topic, partition_id) + messages = list( + itertools.islice(consumer.stream(Offset.beginning()), len(produce_outputs)) + ) + + for metadata, msg in zip(record_metadata, messages): + self.assertNotEqual(metadata, None) + self.assertEqual(metadata.partition_id(), partition_id) + self.assertEqual(metadata.offset(), msg.offset()) + + for produce_output in produce_outputs: + # subsequent calls to po.wait shall return None + self.assertEqual(produce_output.wait(), None) diff --git a/requirements.txt b/requirements.txt index 18e52ce1..0791429a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -flake8==7.0.0 +flake8==7.1.1 mccabe==0.7.0 msgpack==1.0.4 -pycodestyle==2.11.1 +pycodestyle==2.12.1 pyflakes==3.2.0 black==24.8.0 semantic-version==2.10.0 diff --git a/setup.py b/setup.py index 937ecc1e..3aa453d6 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="fluvio", - version="0.16.4", + version="0.16.5", long_description=open("README.md").read(), long_description_content_type="text/markdown", author="Fluvio Contributors", @@ -28,7 +28,6 @@ # that you indicate you support Python 3. These classifiers are *not* # checked by 'pip install'. See instead 'python_requires' below. "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", diff --git a/src/lib.rs b/src/lib.rs index 3e4d2eed..8946aea2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,10 +52,16 @@ use std::sync::Arc; use tracing::info; use url::Host; mod cloud; + // use crate::error::FluvioError; mod error; +mod produce_output; + +pub use produce_output::ProduceOutput; + use cloud::{CloudClient, CloudLoginError}; use error::FluvioError; + use pyo3::exceptions::{PyException, PyValueError}; use pyo3::prelude::*; use pyo3::types::PyList; @@ -638,50 +644,7 @@ impl ProducerBatchRecord { } } -#[pyclass] -pub struct ProduceOutput { - // inner is placed into an Option because the native ProduceOutput - // is consumed by the `wait` method, which is not possible on the python interface - // (only `&self` or `&mut self` is allowed) - pub inner: Option, -} - -#[pymethods] -impl ProduceOutput { - fn wait(&mut self, py: Python) -> Result, FluvioError> { - // wait on `inner` consumes `self`, but we only have a `&mut self` reference - // so we take it out of the `Option` and consume it that way - // a subsequent call to `wait` will return `None` - let inner = self.inner.take(); - Ok(inner - .map(|produce_output| { - run_block_on(produce_output.wait()) - .map(|metadata| RecordMetadata { inner: metadata }) - }) - .transpose()?) - } - - fn async_wait<'b>(&'b mut self, py: Python<'b>) -> PyResult<&'b PyAny> { - let inner = self.inner.take(); - pyo3_asyncio::async_std::future_into_py(py, async move { - let record_metadata = match inner { - Some(produce_output) => Some( - produce_output - .wait() - .await - .map(|metadata| RecordMetadata { inner: metadata }) - .map_err(FluvioError::FluvioErr)?, - ), - None => None, - }; - Ok(match record_metadata { - Some(record_metadata) => Python::with_gil(|py| record_metadata.into_py(py)), - None => Python::with_gil(|py| py.None()), - }) - }) - } -} - +#[derive(Debug)] #[pyclass] pub struct RecordMetadata { pub inner: NativeRecordMetadata, diff --git a/src/produce_output.rs b/src/produce_output.rs new file mode 100644 index 00000000..6cdda0fd --- /dev/null +++ b/src/produce_output.rs @@ -0,0 +1,62 @@ +use pyo3::prelude::*; + +use pyo3::exceptions::{PyException, PyValueError}; +use pyo3::types::PyList; + +use fluvio::{ProduceOutput as NativeProduceOutput, RecordMetadata as NativeRecordMetadata}; +use fluvio_future::{ + // io::{Stream, StreamExt}, + task::run_block_on, +}; + +use crate::error::FluvioError; +use crate::RecordMetadata; + +#[pyclass] +pub struct ProduceOutput { + // inner is placed into an Option because the native ProduceOutput + // is consumed by the `wait` method, which is not possible on the python interface + // (only `&self` or `&mut self` is allowed) + pub inner: Option, +} + +#[pymethods] +impl ProduceOutput { + fn wait(&mut self, py: Python) -> Result, FluvioError> { + // wait on `inner` consumes `self`, but we only have a `&mut self` reference + // so we take it out of the `Option` and consume it that way + // a subsequent call to `wait` will return `None` + let inner = self.inner.take(); + inner + .map(|produce_output| { + run_block_on(produce_output.wait()) + .map(|metadata| RecordMetadata { inner: metadata }) + .map_err(|err| FluvioError::FluvioErr(err)) + }) + .transpose() + } + + fn async_wait<'b>(&'b mut self, py: Python<'b>) -> PyResult<&'b PyAny> { + let inner = self.inner.take(); + pyo3_asyncio::async_std::future_into_py(py, async move { + let record_metadata = match inner { + Some(produce_output) => { + let pout = produce_output + .wait() + .await + .map(|metadata| RecordMetadata { inner: metadata }) + .map_err(|err| FluvioError::FluvioErr(err))?; + Some(pout) + } + None => None, + }; + Python::with_gil(|py| { + let out = match record_metadata { + Some(record_metadata) => record_metadata.into_py(py), + None => py.None(), + }; + Ok(out) + }) + }) + } +} From ed77a13f50ae9e96881d5e0c0c3b9759e4797ebe Mon Sep 17 00:00:00 2001 From: Alan Chen Date: Thu, 31 Oct 2024 16:34:56 -0700 Subject: [PATCH 2/3] chore: cloud ci, add more robust sync points (#501) --- integration-tests/test_fluvio_python.py | 15 ++++++++++++--- integration-tests/test_produce.py | 4 +--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/integration-tests/test_fluvio_python.py b/integration-tests/test_fluvio_python.py index cb9aabd7..b0364c25 100644 --- a/integration-tests/test_fluvio_python.py +++ b/integration-tests/test_fluvio_python.py @@ -37,9 +37,18 @@ def common_setup(self, sm_path=None): try: self.admin.create_topic(self.topic) except Exception as err: - print("Retrying after create_topic error {}", err) - time.sleep(5) - self.admin.create_topic(self.topic) + print("create_topic error {}, will try to verify", err) + + # list topics to verify topic was created + max_retries = 100 + while max_retries > 0: + topic = self.admin.list_topics([self.topic]) + if len(topic) > 0: + break + max_retries -= 1 + if max_retries == 0: + self.fail("setup: Failed to create topic") + time.sleep(0.1) def setUp(self): self.common_setup() diff --git a/integration-tests/test_produce.py b/integration-tests/test_produce.py index cb17bcdd..4ef5921d 100644 --- a/integration-tests/test_produce.py +++ b/integration-tests/test_produce.py @@ -33,9 +33,7 @@ def common_setup(self, sm_path=None): try: self.admin.create_topic(self.topic) except Exception as err: - print("Retrying after create_topic error {}", err) - time.sleep(5) - self.admin.create_topic(self.topic) + print("create_topic error {}, will try to verify", err) # list topics to verify topic was created max_retries = 100 From 9a27066dbba4dde90aa1efc6eaca9f568e87bdfd Mon Sep 17 00:00:00 2001 From: Alan Chen Date: Thu, 31 Oct 2024 17:12:37 -0700 Subject: [PATCH 3/3] chore: bump fluvio to 0.12.1 and rev crate version (#502) --- Cargo.toml | 10 +++++----- src/produce_output.rs | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fc113f87..674389d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "_fluvio_python" -version = "0.16.4" +version = "0.16.5" edition = "2021" authors = ["Fluvio Contributors "] @@ -38,10 +38,10 @@ url = "2.5.0" webbrowser = "1.0.0" fluvio-future = { version = "0.7.0", features = ["task", "io", "native_tls", "subscriber"] } -fluvio = { features = ["admin", "rustls"], git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.0" } -fluvio-types = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.0" } -fluvio-sc-schema = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.0" } -fluvio-controlplane-metadata = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.0" } +fluvio = { features = ["admin", "rustls"], git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.1" } +fluvio-types = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.1" } +fluvio-sc-schema = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.1" } +fluvio-controlplane-metadata = { git = "https://github.com/infinyon/fluvio.git", tag = "v0.12.1" } # transitive version selection parking_lot = "0.12.3" diff --git a/src/produce_output.rs b/src/produce_output.rs index 6cdda0fd..8a84d41c 100644 --- a/src/produce_output.rs +++ b/src/produce_output.rs @@ -31,7 +31,7 @@ impl ProduceOutput { .map(|produce_output| { run_block_on(produce_output.wait()) .map(|metadata| RecordMetadata { inner: metadata }) - .map_err(|err| FluvioError::FluvioErr(err)) + .map_err(FluvioError::FluvioErr) }) .transpose() } @@ -45,7 +45,7 @@ impl ProduceOutput { .wait() .await .map(|metadata| RecordMetadata { inner: metadata }) - .map_err(|err| FluvioError::FluvioErr(err))?; + .map_err(FluvioError::FluvioErr)?; Some(pout) } None => None,