Skip to content

Commit

Permalink
Implement serialize/deserialize many objects (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
faph authored Sep 18, 2023
2 parents af0b431 + 6909f53 commit faece61
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 22 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/test-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ jobs:
fail-fast: false
matrix:
python-version:
- '3.7'
- '3.8'
- '3.9'
- '3.10'
- '3.11'
Expand Down
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,15 @@ keywords = [
]

# Minimum supported Python version
requires-python = ">=3.7"
requires-python = ">=3.9"
# All runtime dependencies that must be packaged, pin major version only.
dependencies = [
"avro~=1.11",
"fastavro~=1.8", # TODO: consider moving Avro-related dependencies to optional dependencies
"importlib-metadata<4; python_version<'3.8'",
"memoization~=0.4",
"orjson~=3.0",
"pluggy~=1.2",
"py-avro-schema~=2.2",
"py-avro-schema~=3.0",
"python-dateutil~=2.8",
]

Expand Down
45 changes: 35 additions & 10 deletions src/py_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import datetime
import enum
import importlib
import importlib.metadata
import inspect
import logging
import uuid
from collections.abc import Iterable, Iterator
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union, cast

import avro.schema
Expand All @@ -33,15 +35,8 @@
import py_adapter._schema
import py_adapter.plugin

try:
from importlib import metadata
except ImportError: # pragma: no cover
# Python < 3.8
import importlib_metadata as metadata # type: ignore


#: Library version, e.g. 1.0.0, taken from Git tags
__version__ = metadata.version("py-adapter")
__version__ = importlib.metadata.version("py-adapter")


logger = logging.getLogger(__package__)
Expand Down Expand Up @@ -93,7 +88,7 @@ def serialize(obj: Any, *, format: str, writer_schema: bytes = b"") -> bytes:
Serialize an object using a serialization format supported by **py-adapter**
:param obj: Python object to serialize
:param format: Serialization format as supported by a **py-adpater** plugin, e.g. ``JSON``.
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize")
Expand All @@ -102,13 +97,27 @@ def serialize(obj: Any, *, format: str, writer_schema: bytes = b"") -> bytes:
return data


def serialize_many(objs: Iterable[Any], *, format: str, writer_schema: bytes = b"") -> bytes:
"""
Serialize multiple objects using a serialization format supported by **py-adapter**
:param objs: Python objects to serialize
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize_many")
basic_objs = (to_basic_type(obj) for obj in objs)
data = serialize_fn(objs=basic_objs, writer_schema=writer_schema)
return data


def deserialize(data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"") -> Obj:
"""
Deserialize bytes as a Python object of a given type from a serialization format supported by **py-adapter**
:param data: Serialized data
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adpater** plugin, e.g. ``JSON``.
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
"""
deserialize_fn = py_adapter.plugin.plugin_hook(format, "deserialize")
Expand All @@ -117,6 +126,22 @@ def deserialize(data: bytes, py_type: Type[Obj], *, format: str, writer_schema:
return obj


def deserialize_many(data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"") -> Iterator[Obj]:
"""
Deserialize bytes as an iterator over Python objects of a given type from a serialization format supported by
**py-adapter**
:param data: Serialized data
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
"""
deserialize_fn = py_adapter.plugin.plugin_hook(format, "deserialize_many")
basic_objs = deserialize_fn(data=data, writer_schema=writer_schema)
objs = (from_basic_type(basic_obj, py_type) for basic_obj in basic_objs)
return objs


class _Adapter(abc.ABC):
"""Interface for an adapter"""

Expand Down
24 changes: 24 additions & 0 deletions src/py_adapter/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import functools
import logging
import sys
from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING

import pluggy
Expand Down Expand Up @@ -111,6 +112,18 @@ def serialize(obj: "py_adapter.Basic", writer_schema: bytes) -> bytes:
raise NotImplementedError()


@_hookspec(firstresult=True)
def serialize_many(objs: Iterable["py_adapter.Basic"], writer_schema: bytes) -> bytes:
"""
Hook specification. Serialize multiple Python objects of basic types to the format supported by the implementing
plugin.
:param objs: Python objects to serialize
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
raise NotImplementedError()


@_hookspec(firstresult=True)
def deserialize(data: bytes, writer_schema: bytes) -> "py_adapter.Basic":
"""
Expand All @@ -120,3 +133,14 @@ def deserialize(data: bytes, writer_schema: bytes) -> "py_adapter.Basic":
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
"""
raise NotImplementedError()


@_hookspec(firstresult=True)
def deserialize_many(data: bytes, writer_schema: bytes) -> Iterator["py_adapter.Basic"]:
"""
Hook specification. Deserialize data as an iterator over objects of basic Python types
:param data: Bytes to deserialize
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
"""
raise NotImplementedError()
40 changes: 39 additions & 1 deletion src/py_adapter/plugin/_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

import io
from collections.abc import Iterable, Iterator

import orjson

Expand Down Expand Up @@ -42,6 +43,26 @@ def serialize(obj: py_adapter.Basic, writer_schema: bytes) -> bytes:
return data


@py_adapter.plugin.hook
def serialize_many(objs: Iterable[py_adapter.Basic], writer_schema: bytes) -> bytes:
"""
Serialize multiple Python objects of basic types as Avro container file format.
:param objs: Python objects to serialize
:param writer_schema: Avro schema to serialize the data with, as JSON bytes.
"""
import fastavro.write

data_stream = io.BytesIO()
# TODO: generate schema if not provided
schema_obj = fastavro.parse_schema(orjson.loads(writer_schema))
fastavro.write.writer(data_stream, schema=schema_obj, records=objs)
data_stream.flush()
data_stream.seek(0)
data = data_stream.read()
return data


@py_adapter.plugin.hook
def deserialize(data: bytes, writer_schema: bytes) -> py_adapter.Basic:
"""
Expand All @@ -57,5 +78,22 @@ def deserialize(data: bytes, writer_schema: bytes) -> py_adapter.Basic:
data_stream = io.BytesIO(data)
# TODO: add support for reader schema, if provided
# TODO: add support for reader of data with embedded (writer) schema
basic_obj = fastavro.read.schemaless_reader(data_stream, writer_schema=writer_schema_obj)
basic_obj = fastavro.read.schemaless_reader(data_stream, writer_schema=writer_schema_obj, reader_schema=None)
return basic_obj


@py_adapter.plugin.hook
def deserialize_many(data: bytes, writer_schema: bytes) -> Iterator[py_adapter.Basic]:
"""
Deserialize Avro container file format data as an iterator over objects of basic Python types
:param data: Bytes to deserialize
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
"""
import fastavro.read

# TODO: make it fail if writer_schema is provided?
data_stream = io.BytesIO(data)
# TODO: add support for reader schema, if provided
basic_objs = fastavro.read.reader(data_stream, reader_schema=None)
return basic_objs
27 changes: 27 additions & 0 deletions src/py_adapter/plugin/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
JSON serializer/deserializer **py-adapter** plugin
"""
from collections.abc import Iterable, Iterator

import py_adapter
import py_adapter.plugin
Expand All @@ -30,6 +31,19 @@ def serialize(obj: py_adapter.Basic, writer_schema: bytes) -> bytes:
return orjson.dumps(obj)


@py_adapter.plugin.hook
def serialize_many(objs: Iterable[py_adapter.Basic], writer_schema: bytes) -> bytes:
"""
Serialize multiple Python objects of basic types as Newline Delimited JSON (NDJSON).
:param objs: Python objects to serialize
:param writer_schema: Schema to serialize the data with. Not used with JSON serialization.
"""
import orjson

return b"\n".join(orjson.dumps(obj) for obj in objs)


@py_adapter.plugin.hook
def deserialize(data: bytes, writer_schema: bytes) -> py_adapter.Basic:
"""
Expand All @@ -41,3 +55,16 @@ def deserialize(data: bytes, writer_schema: bytes) -> py_adapter.Basic:
import orjson

return orjson.loads(data)


@py_adapter.plugin.hook
def deserialize_many(data: bytes, writer_schema: bytes) -> Iterator[py_adapter.Basic]:
"""
Deserialize Newline Delimited JSON (NDJSON) data as an iterator over objects of basic Python types
:param data: Bytes to deserialize
:param writer_schema: Schema used to serialize the data with. Not used with JSON serialization.
"""
import orjson

return (orjson.loads(line) for line in data.splitlines())
5 changes: 2 additions & 3 deletions tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,17 @@ def test_serialize_stream_avro(ship_obj, ship_class):
assert obj_out == ship_obj


@pytest.mark.skip("TODO")
def test_serialize_many_json(ship_obj, ship_class):
ship_objs = [ship_obj, ship_obj]
data = py_adapter.serialize_many(ship_objs, format="JSON")
objs_out = list(py_adapter.deserialize_many(data, ship_class, format="JSON"))
assert objs_out == ship_objs


@pytest.mark.skip("TODO")
def test_serialize_many_avro(ship_obj, ship_class):
writer_schema = pas.generate(ship_class, options=pas.Option.LOGICAL_JSON_STRING | pas.Option.MILLISECONDS)
ship_objs = [ship_obj, ship_obj]
data = py_adapter.serialize_many(ship_objs, format="Avro")
data = py_adapter.serialize_many(ship_objs, format="Avro", writer_schema=writer_schema)
objs_out = list(py_adapter.deserialize_many(data, ship_class, format="Avro"))
assert objs_out == ship_objs

Expand Down
4 changes: 1 addition & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@
# Tox virtual environment manager for testing and quality assurance

[tox]
envlist = py37, py38, py39, py310, py311, py312, linting, docs
envlist = py39, py310, py311, py312, linting, docs
isolated_build = True
# Developers may not have all Python versions
skip_missing_interpreters = true

[gh-actions]
# Mapping from GitHub Actions Python versions to Tox environments
python =
3.7: py37
3.8: py38
3.9: py39
3.10: py310
3.11: py311, linting, docs
Expand Down

0 comments on commit faece61

Please sign in to comment.