Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user pluggable serializers and add a demonstration ProxyStore serializer #2842

Merged
merged 22 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4cb6544
Remove assumption that _identifier attribute exists; compute identifi…
benclifford Jul 21, 2023
d5eaf87
ongoing work
benclifford Jul 21, 2023
0c329bb
Move serialization error definition into parsl.serialize
benclifford Jul 21, 2023
9360907
Merge branch 'benc-serializer-exception' into benc-serializer-pluggab…
benclifford Jul 21, 2023
e57b6c6
Add a bit of error handling and make deserialize call in only one place.
benclifford Jul 21, 2023
e3865e4
fix f-string
benclifford Jul 21, 2023
77123ee
Correct f-string
benclifford Jul 21, 2023
fe76699
Add proxystore dependency
benclifford Jul 21, 2023
6926910
add basic proxystore serializer with raw test
benclifford Jul 21, 2023
e07cc4b
rename test
benclifford Jul 21, 2023
23da73d
Use unique store names
benclifford Jul 21, 2023
70910e8
Remove spurious debug variable
benclifford Jul 21, 2023
5d827de
Move another serialization exception; fix docs build
benclifford Jul 21, 2023
c316957
Fix f-string
benclifford Jul 21, 2023
d2efa60
bugfix exception attributes
benclifford Jul 21, 2023
385d39a
check types of serialized values
benclifford Jul 21, 2023
6e99769
Merge remote-tracking branch 'origin/master' into benc-serializer-plu…
benclifford Jul 21, 2023
6c21740
more work to make tests pass
benclifford Jul 21, 2023
fe28418
Rearrange imports to only import proxystore when actually executing p…
benclifford Jul 21, 2023
3b16da8
Install proxystore for --config local tests
benclifford Jul 21, 2023
f95eceb
fixup docstring
benclifford Jul 21, 2023
4f4ee5a
Merge branch 'master' into benc-serializer-pluggable-identifier
benclifford Jul 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with workqueue_ex config

.PHONY: config_local_test
config_local_test:
pip3 install ".[monitoring]"
pip3 install ".[monitoring,proxystore]"
pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10

.PHONY: site_test
Expand Down
7 changes: 7 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ disallow_subclassing_any = True
warn_unreachable = True
disallow_untyped_defs = True

[mypy-parsl.serialize.proxystore.*]
# parsl/serialize/proxystore.py:9: error: Class cannot subclass "Pickler" (has type "Any")
disallow_subclassing_any = False

[mypy-parsl.executors.base.*]
disallow_untyped_defs = True
disallow_any_expr = True
Expand Down Expand Up @@ -198,3 +202,6 @@ ignore_missing_imports = True

[mypy-setproctitle.*]
ignore_missing_imports = True

[mypy-proxystore.*]
ignore_missing_imports = True
21 changes: 14 additions & 7 deletions parsl/serialize/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import abstractmethod
from functools import cached_property
import logging

from typing import Any
Expand All @@ -10,18 +11,24 @@ class SerializerBase:
""" Adds shared functionality for all serializer implementations
"""

_identifier: bytes

@property
@cached_property
def identifier(self) -> bytes:
"""Get that identifier that will be used to indicate in byte streams
that this class should be used for deserialization.

"""Compute identifier used in serialization header.
This will be used to indicate in byte streams that this class should
be used for deserialization.
 Serializers that use identifiers that don't align with the way this is
computed (such as the default concretes.py implementations) should
override this property with their own identifier.
Returns
-------
identifier : bytes
"""
return self._identifier
t = type(self)
m = bytes(t.__module__, encoding="utf-8")
c = bytes(t.__name__, encoding="utf-8")
return m + b' ' + c
Comment on lines +29 to +31
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be double-check, __name__ and not __qualname__ is intended?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the cases where __qualname__ and __name__ differ, this is often going to break: the module + name needs to be something importable.

One case that could work, a class defined directly inside another class without any enclosing function/closure, I would have to poke at the import side of things to see how that would work.


@abstractmethod
def serialize(self, data: Any) -> bytes:
Expand Down
6 changes: 3 additions & 3 deletions parsl/serialize/concretes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PickleSerializer(SerializerBase):
* closures, generators and coroutines
"""

_identifier = b'01'
identifier = b'01'

khk-globus marked this conversation as resolved.
Show resolved Hide resolved
def serialize(self, data: Any) -> bytes:
return pickle.dumps(data)
Expand All @@ -38,7 +38,7 @@ class DillSerializer(SerializerBase):
* closures
"""

_identifier = b'02'
identifier = b'02'

def serialize(self, data: Any) -> bytes:
return dill.dumps(data)
Expand All @@ -53,7 +53,7 @@ class DillCallableSerializer(SerializerBase):
assumption that callables are immutable and so can be cached.
"""

_identifier = b'C2'
identifier = b'C2'

@functools.lru_cache
def serialize(self, data: Any) -> bytes:
Expand Down
11 changes: 11 additions & 0 deletions parsl/serialize/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ def __init__(self, fname: str) -> None:

def __str__(self) -> str:
return f"Failed to serialize objects for an invocation of function {self.fname}. Refer {self.troubleshooting}"


class DeserializerPluginError(ParslError):
"""Failure to dynamically load a deserializer plugin.
"""

def __init__(self, header: bytes) -> None:
self.header = header

def __str__(self) -> str:
return f"Failed to load deserializer plugin for header {self.header!r}"
30 changes: 26 additions & 4 deletions parsl/serialize/facade.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import importlib
import logging
from typing import Any, Dict, List, Union

import parsl.serialize.concretes as concretes
from parsl.serialize.base import SerializerBase
from parsl.serialize.errors import DeserializerPluginError

logger = logging.getLogger(__name__)

Expand All @@ -28,6 +30,12 @@ def register_method_for_data(s: SerializerBase) -> None:
register_method_for_data(concretes.DillSerializer())


# When deserialize dynamically loads a deserializer, it will be stored here,
# rather than in the methods_for_* dictionaries, so that loading does not
# cause it to be used for future serializations.
additional_methods_for_deserialization: Dict[bytes, SerializerBase] = {}


def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes:
"""Serialize and pack function and parameters

Expand Down Expand Up @@ -75,7 +83,7 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes:

for method in methods.values():
try:
result = method._identifier + b'\n' + method.serialize(obj)
result = method.identifier + b'\n' + method.serialize(obj)
except Exception as e:
result = e
continue
Expand All @@ -101,11 +109,25 @@ def deserialize(payload: bytes) -> Any:
header, body = payload.split(b'\n', 1)

if header in methods_for_code:
result = methods_for_code[header].deserialize(body)
deserializer = methods_for_code[header]
elif header in methods_for_data:
result = methods_for_data[header].deserialize(body)
deserializer = methods_for_data[header]
elif header in additional_methods_for_deserialization:
deserializer = additional_methods_for_deserialization[header]
else:
raise TypeError("Invalid header: {!r} in data payload. Buffer is either corrupt or not created by ParslSerializer".format(header))
logger.info("Trying to dynamically load deserializer: {!r}".format(header))
# This is a user plugin point, so expect exceptions to happen.
try:
module_name, class_name = header.split(b' ', 1)
decoded_module_name = module_name.decode('utf-8')
module = importlib.import_module(decoded_module_name)
deserializer_class = getattr(module, class_name.decode('utf-8'))
deserializer = deserializer_class()
additional_methods_for_deserialization[header] = deserializer
except Exception as e:
raise DeserializerPluginError(header) from e

result = deserializer.deserialize(body)

return result

Expand Down
49 changes: 49 additions & 0 deletions parsl/serialize/proxystore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import dill
import io
import typing as t

from parsl.serialize.base import SerializerBase
from proxystore.store import Store


class ProxyStoreDeepPickler(dill.Pickler):
"""This class extends dill so that certain objects will be stored into
ProxyStore rather than serialized directly. The selection of objects is
made by a user-specified policy.
"""

def __init__(self, *args: t.Any, should_proxy: t.Callable[[t.Any], bool], store: Store, **kwargs: t.Any) -> None:
super().__init__(*args, **kwargs)
self._store = store
self._should_proxy = should_proxy

def reducer_override(self, o: t.Any) -> t.Any:
if self._should_proxy(o):
proxy = self._store.proxy(o)
return proxy.__reduce__()
else:
# fall through to dill
return NotImplemented


class ProxyStoreSerializer(SerializerBase):

def __init__(self, *, should_proxy: t.Optional[t.Callable[[t.Any], bool]] = None, store: t.Optional[Store] = None) -> None:
self._store = store
self._should_proxy = should_proxy

def serialize(self, data: t.Any) -> bytes:
assert self._store is not None
assert self._should_proxy is not None

assert data is not None

f = io.BytesIO()
pickler = ProxyStoreDeepPickler(file=f, store=self._store, should_proxy=self._should_proxy)
pickler.dump(data)
return f.getvalue()

def deserialize(self, body: bytes) -> t.Any:
# because we aren't customising deserialization, use regular
# dill for deserialization
return dill.loads(body)
82 changes: 82 additions & 0 deletions parsl/tests/test_serialization/test_proxystore_configured.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
import pytest
import uuid

import parsl
from parsl.serialize.facade import additional_methods_for_deserialization, methods_for_data, register_method_for_data
from parsl.tests.configs.htex_local import fresh_config


logger = logging.getLogger(__name__)


def local_setup():
from parsl.serialize.proxystore import ProxyStoreSerializer
from proxystore.store import Store, register_store
from proxystore.connectors.file import FileConnector

parsl.load(fresh_config())

store = Store(name='parsl_store_' + str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp"))
register_store(store)

s = ProxyStoreSerializer(store=store, should_proxy=policy_example)

global previous_methods
previous_methods = methods_for_data.copy()

# get rid of all data serialization methods, in preparation for using only
# proxystore. put all the old methods as additional methods used only for
# deserialization, because those will be needed to deserialize the results,
# which will be serialized using the default serializer set.
additional_methods_for_deserialization.update(previous_methods)
methods_for_data.clear()

register_method_for_data(s)
logger.info(f"BENC: methods for data: {methods_for_data}")


def local_teardown():
parsl.dfk().cleanup()
parsl.clear()

methods_for_data.clear()
methods_for_data.update(previous_methods)

additional_methods_for_deserialization.clear()


@parsl.python_app
def identity(o):
return o


@parsl.python_app
def is_proxy(o) -> bool:
from proxystore.proxy import Proxy
return isinstance(o, Proxy)


def policy_example(o):
"""Example policy will proxy only lists."""
return isinstance(o, frozenset)


@pytest.mark.local
def test_proxystore_via_apps():
from proxystore.proxy import Proxy

# check roundtrip for an int, which should not be proxystored according
# to example_policy()
roundtripped_7 = identity(7).result()
assert roundtripped_7 == 7
assert not isinstance(roundtripped_7, Proxy)

# check roundtrip for a list, which should be proxystored according
# to example_policy()
v = frozenset([1, 2, 3])

assert is_proxy(v).result()

roundtripped_v = identity(v).result()
assert roundtripped_v == v
38 changes: 38 additions & 0 deletions parsl/tests/test_serialization/test_proxystore_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest
import uuid


def policy_example(o):
"""Example policy will proxy only lists."""
return isinstance(o, list)


@pytest.mark.local
def test_proxystore_nonglobal():
"""Check that values are roundtripped, for both proxied and non-proxied types.
"""
# import in function, because proxystore is not importable in base parsl
# installation.
from parsl.serialize.proxystore import ProxyStoreSerializer
from proxystore.proxy import Proxy
from proxystore.store import Store, register_store
from proxystore.connectors.file import FileConnector

store = Store(name='parsl_store_' + str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp"))
register_store(store)

s = ProxyStoreSerializer(store=store, should_proxy=policy_example)

# check roundtrip for an int, which will not be proxystored
s_7 = s.serialize(7)
assert isinstance(s_7, bytes)
roundtripped_7 = s.deserialize(s_7)
assert roundtripped_7 == 7
assert not isinstance(roundtripped_7, Proxy)

v = [1, 2, 3]
s_v = s.serialize(v)
assert isinstance(s_7, bytes)
roundtripped_v = s.deserialize(s_v)
assert roundtripped_v == v
assert isinstance(roundtripped_v, Proxy)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'azure' : ['azure<=4', 'msrestazure'],
'workqueue': ['work_queue'],
'flux': ['pyyaml', 'cffi', 'jsonschema'],
'proxystore': ['proxystore'],
# Disabling psi-j since github direct links are not allowed by pypi
# 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl']
}
Expand Down
Loading