Skip to content

Commit

Permalink
Merge branch 'master' into benc-docs-polaris
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Jul 27, 2023
2 parents 5b1da7c + 11a0ded commit 41a5174
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with workqueue_ex config

.PHONY: config_local_test
config_local_test:
pip3 install ".[monitoring]"
pip3 install ".[monitoring,proxystore]"
pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10

.PHONY: site_test
Expand Down
4 changes: 2 additions & 2 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ Exceptions
parsl.errors.OptionalModuleMissing
parsl.executors.errors.ExecutorError
parsl.executors.errors.ScalingFailed
parsl.executors.errors.SerializationError
parsl.executors.errors.DeserializationError
parsl.executors.errors.BadMessage
parsl.dataflow.errors.DataFlowException
parsl.dataflow.errors.BadCheckpoint
Expand All @@ -162,6 +160,8 @@ Exceptions
parsl.channels.errors.FileCopyException
parsl.executors.high_throughput.errors.WorkerLost
parsl.executors.high_throughput.interchange.ManagerLost
parsl.serialize.errors.DeserializationError
parsl.serialize.errors.SerializationError

Internal
========
Expand Down
7 changes: 7 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ disallow_subclassing_any = True
warn_unreachable = True
disallow_untyped_defs = True

[mypy-parsl.serialize.proxystore.*]
# parsl/serialize/proxystore.py:9: error: Class cannot subclass "Pickler" (has type "Any")
disallow_subclassing_any = False

[mypy-parsl.executors.base.*]
disallow_untyped_defs = True
disallow_any_expr = True
Expand Down Expand Up @@ -198,3 +202,6 @@ ignore_missing_imports = True

[mypy-setproctitle.*]
ignore_missing_imports = True

[mypy-proxystore.*]
ignore_missing_imports = True
24 changes: 0 additions & 24 deletions parsl/executors/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,6 @@ def __str__(self):
return f"Executor {self.executor.label} failed to scale due to: {self.reason}"


class DeserializationError(ParslError):
""" Failure at the Deserialization of results/exceptions from remote workers
"""

def __init__(self, reason):
self.reason = reason

def __str__(self):
return "Failed to deserialize return objects. Reason:{}".format(self.reason)


class SerializationError(ParslError):
""" Failure to serialize data arguments for the tasks
"""

def __init__(self, fname):
self.fname = fname
self.troubleshooting = "https://parsl.readthedocs.io/en/latest/faq.html#addressing-serializationerror"

def __str__(self):
return "Failed to serialize data objects for {}. Refer {} ".format(self.fname,
self.troubleshooting)


class BadMessage(ParslError):
""" Mangled/Poorly formatted/Unsupported message received
"""
Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
from parsl.executors.status_handling import NoStatusHandlingExecutor
from parsl.executors.flux.execute_parsl_task import __file__ as _WORKER_PATH
from parsl.executors.flux.flux_instance_manager import __file__ as _MANAGER_PATH
from parsl.executors.errors import SerializationError, ScalingFailed
from parsl.executors.errors import ScalingFailed
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider
from parsl.serialize import pack_apply_message, deserialize
from parsl.serialize.errors import SerializationError
from parsl.app.errors import AppException


Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import math

from parsl.serialize import pack_apply_message, deserialize
from parsl.serialize.errors import SerializationError, DeserializationError
from parsl.app.errors import RemoteExceptionWrapper
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput import interchange
from parsl.executors.errors import (
BadMessage, ScalingFailed,
DeserializationError, SerializationError,
UnsupportedFeatureError
)

Expand Down
21 changes: 14 additions & 7 deletions parsl/serialize/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import abstractmethod
from functools import cached_property
import logging

from typing import Any
Expand All @@ -10,18 +11,24 @@ class SerializerBase:
""" Adds shared functionality for all serializer implementations
"""

_identifier: bytes

@property
@cached_property
def identifier(self) -> bytes:
"""Get that identifier that will be used to indicate in byte streams
that this class should be used for deserialization.
"""Compute identifier used in serialization header.
This will be used to indicate in byte streams that this class should
be used for deserialization.
 Serializers that use identifiers that don't align with the way this is
computed (such as the default concretes.py implementations) should
override this property with their own identifier.
Returns
-------
identifier : bytes
"""
return self._identifier
t = type(self)
m = bytes(t.__module__, encoding="utf-8")
c = bytes(t.__name__, encoding="utf-8")
return m + b' ' + c

@abstractmethod
def serialize(self, data: Any) -> bytes:
Expand Down
6 changes: 3 additions & 3 deletions parsl/serialize/concretes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PickleSerializer(SerializerBase):
* closures, generators and coroutines
"""

_identifier = b'01'
identifier = b'01'

def serialize(self, data: Any) -> bytes:
return pickle.dumps(data)
Expand All @@ -38,7 +38,7 @@ class DillSerializer(SerializerBase):
* closures
"""

_identifier = b'02'
identifier = b'02'

def serialize(self, data: Any) -> bytes:
return dill.dumps(data)
Expand All @@ -53,7 +53,7 @@ class DillCallableSerializer(SerializerBase):
assumption that callables are immutable and so can be cached.
"""

_identifier = b'C2'
identifier = b'C2'

@functools.lru_cache
def serialize(self, data: Any) -> bytes:
Expand Down
35 changes: 35 additions & 0 deletions parsl/serialize/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from parsl.errors import ParslError


class DeserializationError(ParslError):
"""Failure at the deserialization of results/exceptions from remote workers.
"""

def __init__(self, reason: str) -> None:
self.reason = reason

def __str__(self) -> str:
return f"Failed to deserialize objects. Reason: {self.reason}"


class SerializationError(ParslError):
"""Failure to serialize task objects.
"""

def __init__(self, fname: str) -> None:
self.fname = fname
self.troubleshooting = "https://parsl.readthedocs.io/en/latest/faq.html#addressing-serializationerror"

def __str__(self) -> str:
return f"Failed to serialize objects for an invocation of function {self.fname}. Refer {self.troubleshooting}"


class DeserializerPluginError(ParslError):
"""Failure to dynamically load a deserializer plugin.
"""

def __init__(self, header: bytes) -> None:
self.header = header

def __str__(self) -> str:
return f"Failed to load deserializer plugin for header {self.header!r}"
30 changes: 26 additions & 4 deletions parsl/serialize/facade.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import importlib
import logging
from typing import Any, Dict, List, Union

import parsl.serialize.concretes as concretes
from parsl.serialize.base import SerializerBase
from parsl.serialize.errors import DeserializerPluginError

logger = logging.getLogger(__name__)

Expand All @@ -28,6 +30,12 @@ def register_method_for_data(s: SerializerBase) -> None:
register_method_for_data(concretes.DillSerializer())


# When deserialize dynamically loads a deserializer, it will be stored here,
# rather than in the methods_for_* dictionaries, so that loading does not
# cause it to be used for future serializations.
additional_methods_for_deserialization: Dict[bytes, SerializerBase] = {}


def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes:
"""Serialize and pack function and parameters
Expand Down Expand Up @@ -75,7 +83,7 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes:

for method in methods.values():
try:
result = method._identifier + b'\n' + method.serialize(obj)
result = method.identifier + b'\n' + method.serialize(obj)
except Exception as e:
result = e
continue
Expand All @@ -101,11 +109,25 @@ def deserialize(payload: bytes) -> Any:
header, body = payload.split(b'\n', 1)

if header in methods_for_code:
result = methods_for_code[header].deserialize(body)
deserializer = methods_for_code[header]
elif header in methods_for_data:
result = methods_for_data[header].deserialize(body)
deserializer = methods_for_data[header]
elif header in additional_methods_for_deserialization:
deserializer = additional_methods_for_deserialization[header]
else:
raise TypeError("Invalid header: {!r} in data payload. Buffer is either corrupt or not created by ParslSerializer".format(header))
logger.info("Trying to dynamically load deserializer: {!r}".format(header))
# This is a user plugin point, so expect exceptions to happen.
try:
module_name, class_name = header.split(b' ', 1)
decoded_module_name = module_name.decode('utf-8')
module = importlib.import_module(decoded_module_name)
deserializer_class = getattr(module, class_name.decode('utf-8'))
deserializer = deserializer_class()
additional_methods_for_deserialization[header] = deserializer
except Exception as e:
raise DeserializerPluginError(header) from e

result = deserializer.deserialize(body)

return result

Expand Down
49 changes: 49 additions & 0 deletions parsl/serialize/proxystore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import dill
import io
import typing as t

from parsl.serialize.base import SerializerBase
from proxystore.store import Store


class ProxyStoreDeepPickler(dill.Pickler):
"""This class extends dill so that certain objects will be stored into
ProxyStore rather than serialized directly. The selection of objects is
made by a user-specified policy.
"""

def __init__(self, *args: t.Any, should_proxy: t.Callable[[t.Any], bool], store: Store, **kwargs: t.Any) -> None:
super().__init__(*args, **kwargs)
self._store = store
self._should_proxy = should_proxy

def reducer_override(self, o: t.Any) -> t.Any:
if self._should_proxy(o):
proxy = self._store.proxy(o)
return proxy.__reduce__()
else:
# fall through to dill
return NotImplemented


class ProxyStoreSerializer(SerializerBase):

def __init__(self, *, should_proxy: t.Optional[t.Callable[[t.Any], bool]] = None, store: t.Optional[Store] = None) -> None:
self._store = store
self._should_proxy = should_proxy

def serialize(self, data: t.Any) -> bytes:
assert self._store is not None
assert self._should_proxy is not None

assert data is not None

f = io.BytesIO()
pickler = ProxyStoreDeepPickler(file=f, store=self._store, should_proxy=self._should_proxy)
pickler.dump(data)
return f.getvalue()

def deserialize(self, body: bytes) -> t.Any:
# because we aren't customising deserialization, use regular
# dill for deserialization
return dill.loads(body)
2 changes: 1 addition & 1 deletion parsl/tests/test_error_handling/test_serialization_fail.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from parsl import python_app
from parsl.tests.configs.htex_local import fresh_config
from parsl.executors.errors import SerializationError
from parsl.serialize.errors import SerializationError


def local_config():
Expand Down
Loading

0 comments on commit 41a5174

Please sign in to comment.