Skip to content

Commit

Permalink
feat(python): Implement bindings to IPC writer (#586)
Browse files Browse the repository at this point in the history
This PR implements bindings to the IPC writer in the nanoarrow C
library. This adds:

- An `ipc.StreamWriter()` class roughly mirroring pyarrow's
`ipc.Stream()`
- `Schema.serialize()` and `Array.serialize()` to match pyarrow's
`serialize()` methods.

```python
import io
import nanoarrow as na 
from nanoarrow.ipc import StreamWriter, InputStream

out = io.BytesIO()
writer = StreamWriter.from_writable(out)
writer.write_stream(InputStream.example())

na.Array(InputStream.from_readable(out.getvalue()))
#> nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
#> {'some_col': 1}
#> {'some_col': 2}
#> {'some_col': 3}
```

---------

Co-authored-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
paleolimbot and jorisvandenbossche authored Sep 14, 2024
1 parent 43001a9 commit 3aa0ec1
Show file tree
Hide file tree
Showing 10 changed files with 699 additions and 75 deletions.
4 changes: 2 additions & 2 deletions dev/benchmarks/python/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ def fixture_path(self, name):
return os.path.join(self.fixtures_dir, name)

def read_fixture_file(self, name):
with ipc.Stream.from_path(self.fixture_path(name)) as in_stream:
with ipc.InputStream.from_path(self.fixture_path(name)) as in_stream:
list(na.c_array_stream(in_stream))

def read_fixture_buffer(self, name):
f = io.BytesIO(self.fixture_buffer[name])
with ipc.Stream.from_readable(f) as in_stream:
with ipc.InputStream.from_readable(f) as in_stream:
list(na.c_array_stream(in_stream))

def time_read_float64_basic_file(self):
Expand Down
195 changes: 183 additions & 12 deletions python/src/nanoarrow/_ipc_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@
# cython: linetrace=True

from libc.stdint cimport uint8_t, int64_t, uintptr_t
from libc.errno cimport EIO
from libc.errno cimport EIO, EAGAIN
from libc.stdio cimport snprintf
from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF
from cpython cimport Py_buffer, PyBuffer_FillInfo

from nanoarrow_c cimport (
ArrowArrayStream,
ArrowArrayView,
ArrowSchema,
ArrowErrorCode,
ArrowError,
NANOARROW_OK,
ArrowArrayStream,
)

from nanoarrow._schema cimport CSchema
from nanoarrow._array cimport CArrayView
from nanoarrow._utils cimport Error


cdef extern from "nanoarrow_ipc.h" nogil:
struct ArrowIpcInputStream:
Expand All @@ -48,18 +54,43 @@ cdef extern from "nanoarrow_ipc.h" nogil:
ArrowArrayStream* out, ArrowIpcInputStream* input_stream,
ArrowIpcArrayStreamReaderOptions* options)

struct ArrowIpcOutputStream:
ArrowErrorCode (*write)(ArrowIpcOutputStream* stream, const void* buf,
int64_t buf_size_bytes, int64_t* size_written_out,
ArrowError* error)
void (*release)(ArrowIpcOutputStream* stream)
void* private_data

cdef class PyInputStreamPrivate:
struct ArrowIpcWriter:
void* private_data

ArrowErrorCode ArrowIpcWriterInit(ArrowIpcWriter* writer,
ArrowIpcOutputStream* output_stream)
void ArrowIpcWriterReset(ArrowIpcWriter* writer)
ArrowErrorCode ArrowIpcWriterWriteSchema(ArrowIpcWriter* writer,
const ArrowSchema* in_,
ArrowError* error)
ArrowErrorCode ArrowIpcWriterWriteArrayView(ArrowIpcWriter* writer,
const ArrowArrayView* in_,
ArrowError* error)

ArrowErrorCode ArrowIpcWriterWriteArrayStream(ArrowIpcWriter* writer,
ArrowArrayStream* in_,
ArrowError* error)

cdef class PyStreamPrivate:
cdef object _obj
cdef bint _close_obj
cdef void* _addr
cdef Py_ssize_t _size_bytes
cdef bint _buffer_readonly

def __cinit__(self, obj, close_obj=False):
def __cinit__(self, obj, bint buffer_readonly, bint close_obj=False):
self._obj = obj
self._close_obj = close_obj
self._addr = NULL
self._size_bytes = 0
self._buffer_readonly = buffer_readonly

@property
def obj(self):
Expand All @@ -78,14 +109,16 @@ cdef class PyInputStreamPrivate:
return self._size_bytes

# Implement the buffer protocol so that this object can be used as
# the argument to xxx.readinto(). This ensures that no extra copies
# (beyond any buffering done by the upstream file-like object) are held
# since the upstream object has access to the preallocated output buffer.
# In this case, the preallocation is done by the ArrowArrayStream
# the argument to xxx.readinto() or xxx.write(). This ensures that
# no extra copies (beyond any buffering done by the upstream file-like object)
# are held since the upstream object has access to the preallocated output buffer.
# In the read case, the preallocation is done by the ArrowArrayStream
# implementation before issuing each read call (two per message, with
# an extra call for a RecordBatch message to get the actual buffer data).
# In the write case, this will be a view of whatever information was provided to
# the write callback.
def __getbuffer__(self, Py_buffer* buffer, int flags):
PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, 0, flags)
PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, self._buffer_readonly, flags)

def __releasebuffer__(self, Py_buffer* buffer):
pass
Expand All @@ -100,8 +133,16 @@ cdef ArrowErrorCode py_input_stream_read(ArrowIpcInputStream* stream, uint8_t* b
stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)

try:
size_read_out[0] = stream_private.obj.readinto(stream_private)
return NANOARROW_OK
# Non-blocking streams may return None here, or buffered
# wrappers of them may raise BufferedIOError
read_result = stream_private.obj.readinto(stream_private)

if read_result is None:
size_read_out[0] = 0
return EAGAIN
else:
size_read_out[0] = read_result
return NANOARROW_OK
except Exception as e:
cls = type(e).__name__.encode()
msg = str(e).encode()
Expand All @@ -126,6 +167,51 @@ cdef void py_input_stream_release(ArrowIpcInputStream* stream) noexcept nogil:
stream.release = NULL



cdef ArrowErrorCode py_output_stream_write(ArrowIpcOutputStream* stream, const void* buf,
int64_t buf_size_bytes, int64_t* size_written_out,
ArrowError* error) noexcept nogil:

with gil:
stream_private = <object>stream.private_data
stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)

try:
# Non-blocking streams may return None here, or buffered
# wrappers of them may raise BufferedIOError
write_result = stream_private.obj.write(stream_private)

# Non-blocking streams may return None here
if write_result is None:
size_written_out[0] = 0
return EAGAIN
else:
size_written_out[0] = write_result
return NANOARROW_OK
except Exception as e:
cls = type(e).__name__.encode()
msg = str(e).encode()
snprintf(
error.message,
sizeof(error.message),
"%s: %s",
<const char*>cls,
<const char*>msg
)
return EIO

cdef void py_output_stream_release(ArrowIpcOutputStream* stream) noexcept nogil:
with gil:
stream_private = <object>stream.private_data
if stream_private.close_obj:
stream_private.obj.close()

Py_DECREF(stream_private)

stream.private_data = NULL
stream.release = NULL


cdef class CIpcInputStream:
cdef ArrowIpcInputStream _stream

Expand All @@ -150,7 +236,11 @@ cdef class CIpcInputStream:
@staticmethod
def from_readable(obj, close_obj=False):
cdef CIpcInputStream stream = CIpcInputStream()
cdef PyInputStreamPrivate private_data = PyInputStreamPrivate(obj, close_obj)
cdef PyStreamPrivate private_data = PyStreamPrivate(
obj,
buffer_readonly=False,
close_obj=close_obj
)

stream._stream.private_data = <PyObject*>private_data
Py_INCREF(private_data)
Expand All @@ -166,3 +256,84 @@ def init_array_stream(CIpcInputStream input_stream, uintptr_t out):
cdef int code = ArrowIpcArrayStreamReaderInit(out_ptr, &input_stream._stream, NULL)
if code != NANOARROW_OK:
raise RuntimeError(f"ArrowIpcArrayStreamReaderInit() failed with code [{code}]")


cdef class CIpcOutputStream:
cdef ArrowIpcOutputStream _stream

def __cinit__(self):
self._stream.release = NULL

def is_valid(self):
return self._stream.release != NULL

def __dealloc__(self):
# Duplicating release() to avoid Python API calls in the deallocator
if self._stream.release != NULL:
self._stream.release(&self._stream)

def release(self):
if self._stream.release != NULL:
self._stream.release(&self._stream)
return True
else:
return False

@staticmethod
def from_writable(obj, close_obj=False):
cdef CIpcOutputStream stream = CIpcOutputStream()
cdef PyStreamPrivate private_data = PyStreamPrivate(
obj,
buffer_readonly=True,
close_obj=close_obj
)

stream._stream.private_data = <PyObject*>private_data
Py_INCREF(private_data)
stream._stream.write = &py_output_stream_write
stream._stream.release = &py_output_stream_release
return stream


cdef class CIpcWriter:
cdef ArrowIpcWriter _writer

def __cinit__(self, CIpcOutputStream stream):
self._writer.private_data = NULL
if not stream.is_valid():
raise ValueError("Can't create writer from released stream")

cdef int code = ArrowIpcWriterInit(&self._writer, &stream._stream)
Error.raise_error_not_ok("ArrowIpcWriterInit()", code)

def is_valid(self):
return self._writer.private_data != NULL

def __dealloc__(self):
if self._writer.private_data != NULL:
ArrowIpcWriterReset(&self._writer)

def release(self):
if self._writer.private_data != NULL:
ArrowIpcWriterReset(&self._writer)

def write_schema(self, CSchema schema):
cdef Error error = Error()
cdef int code = ArrowIpcWriterWriteSchema(&self._writer, schema._ptr, &error.c_error)
error.raise_message_not_ok("ArrowIpcWriterWriteSchema()", code)

def write_array_view(self, CArrayView array_view):
cdef Error error = Error()
cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, array_view._ptr, &error.c_error)
error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)

def write_array_stream(self, uintptr_t stream_addr):
cdef ArrowArrayStream* array_stream = <ArrowArrayStream*>stream_addr
cdef Error error = Error()
cdef int code = ArrowIpcWriterWriteArrayStream(&self._writer, array_stream, &error.c_error)
error.raise_message_not_ok("ArrowIpcWriterWriteArrayStream()", code)

def write_end_of_stream(self):
cdef Error error = Error()
cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, NULL, &error.c_error)
error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)
25 changes: 24 additions & 1 deletion python/src/nanoarrow/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import itertools
from functools import cached_property
from typing import Iterable, Tuple
from typing import Iterable, Tuple, Union

from nanoarrow._array import CArray, CArrayView
from nanoarrow._array_stream import CMaterializedArrayStream
Expand Down Expand Up @@ -542,6 +542,29 @@ def __iter__(self):
"to iterate over elements of this Array"
)

def serialize(self, dst=None) -> Union[bytes, None]:
"""Write this Array into dst as zero or more encapsulated IPC messages
Parameters
----------
dst : file-like, optional
If present, a file-like object into which the chunks of this array
should be serialized. If omitted, this will create a ``io.BytesIO()``
and return the serialized result.
"""
from nanoarrow.ipc import StreamWriter

if dst is None:
import io

with io.BytesIO() as dst:
writer = StreamWriter.from_writable(dst)
writer.write_stream(self, write_schema=False)
return dst.getvalue()
else:
writer = StreamWriter.from_writable(dst)
writer.write_stream(self, write_schema=False)

def to_string(self, width_hint=80, items_hint=10) -> str:
cls_name = _repr_utils.make_class_label(self, module="nanoarrow")
len_text = f"[{len(self)}]"
Expand Down
24 changes: 12 additions & 12 deletions python/src/nanoarrow/array_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,17 @@ def from_readable(obj):
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow.ipc import Stream
>>> with na.ArrayStream.from_readable(Stream.example_bytes()) as stream:
>>> from nanoarrow.ipc import InputStream
>>> with na.ArrayStream.from_readable(InputStream.example_bytes()) as stream:
... stream.read_all()
nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
{'some_col': 1}
{'some_col': 2}
{'some_col': 3}
"""
from nanoarrow.ipc import Stream
from nanoarrow.ipc import InputStream

with Stream.from_readable(obj) as ipc_stream:
with InputStream.from_readable(obj) as ipc_stream:
return ArrayStream(ipc_stream)

@staticmethod
Expand All @@ -233,11 +233,11 @@ def from_path(obj, *args, **kwargs):
>>> import tempfile
>>> import os
>>> import nanoarrow as na
>>> from nanoarrow.ipc import Stream
>>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
... nbytes = f.write(Stream.example_bytes())
... nbytes = f.write(InputStream.example_bytes())
...
... with na.ArrayStream.from_path(path) as stream:
... stream.read_all()
Expand All @@ -246,9 +246,9 @@ def from_path(obj, *args, **kwargs):
{'some_col': 2}
{'some_col': 3}
"""
from nanoarrow.ipc import Stream
from nanoarrow.ipc import InputStream

with Stream.from_path(obj, *args, **kwargs) as ipc_stream:
with InputStream.from_path(obj, *args, **kwargs) as ipc_stream:
return ArrayStream(ipc_stream)

@staticmethod
Expand All @@ -261,11 +261,11 @@ def from_url(obj, *args, **kwargs):
>>> import tempfile
>>> import os
>>> import nanoarrow as na
>>> from nanoarrow.ipc import Stream
>>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
... nbytes = f.write(Stream.example_bytes())
... nbytes = f.write(InputStream.example_bytes())
...
... uri = pathlib.Path(path).as_uri()
... with na.ArrayStream.from_url(uri) as stream:
Expand All @@ -275,7 +275,7 @@ def from_url(obj, *args, **kwargs):
{'some_col': 2}
{'some_col': 3}
"""
from nanoarrow.ipc import Stream
from nanoarrow.ipc import InputStream

with Stream.from_url(obj, *args, **kwargs) as ipc_stream:
with InputStream.from_url(obj, *args, **kwargs) as ipc_stream:
return ArrayStream(ipc_stream)
Loading

0 comments on commit 3aa0ec1

Please sign in to comment.