From 4cb6544084ddb8d8d5732f2867cb5d85be814d3a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 10:53:44 +0000 Subject: [PATCH 01/18] Remove assumption that _identifier attribute exists; compute identifier dynamically --- parsl/serialize/base.py | 21 ++++++++++++++------- parsl/serialize/concretes.py | 6 +++--- parsl/serialize/facade.py | 2 +- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index 7553652077..db3a136379 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -1,4 +1,5 @@ from abc import abstractmethod +from functools import cached_property import logging from typing import Any @@ -10,18 +11,24 @@ class SerializerBase: """ Adds shared functionality for all serializer implementations """ - _identifier: bytes - - @property + @cached_property def identifier(self) -> bytes: - """Get that identifier that will be used to indicate in byte streams - that this class should be used for deserialization. - + """ + Compute the identifier that will be used to indicate in byte streams + that this class should be used for deserialization. + + Serializers that use identifiers that don't align with the way this is + computed (such as the default concretes.py implementations) should + override this property with their own identifier. + Returns ------- identifier : bytes """ - return self._identifier + t = type(self) + m = bytes(t.__module__, encoding="utf-8") + c = bytes(t.__name__, encoding="utf-8") + return m + b' ' + c @abstractmethod def serialize(self, data: Any) -> bytes: diff --git a/parsl/serialize/concretes.py b/parsl/serialize/concretes.py index 56ca9d7ddc..fb0363c046 100644 --- a/parsl/serialize/concretes.py +++ b/parsl/serialize/concretes.py @@ -17,7 +17,7 @@ class PickleSerializer(SerializerBase): * closures, generators and coroutines """ - _identifier = b'01' + identifier = b'01' def serialize(self, data: Any) -> bytes: return pickle.dumps(data) @@ -38,7 +38,7 @@ class DillSerializer(SerializerBase): * closures """ - _identifier = b'02' + identifier = b'02' def serialize(self, data: Any) -> bytes: return dill.dumps(data) @@ -53,7 +53,7 @@ class DillCallableSerializer(SerializerBase): assumption that callables are immutable and so can be cached. """ - _identifier = b'C2' + identifier = b'C2' @functools.lru_cache def serialize(self, data: Any) -> bytes: diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 7a14ecebb9..7341a0a4af 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -75,7 +75,7 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: for method in methods.values(): try: - result = method._identifier + b'\n' + method.serialize(obj) + result = method.identifier + b'\n' + method.serialize(obj) except Exception as e: result = e continue From d5eaf870a992924045e18d935eef41a4ac36c6d2 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 11:11:04 +0000 Subject: [PATCH 02/18] ongoing work --- parsl/executors/errors.py | 13 ------------- parsl/serialize/errors.py | 0 parsl/serialize/facade.py | 23 ++++++++++++++++++++++- 3 files changed, 22 insertions(+), 14 deletions(-) create mode 100644 parsl/serialize/errors.py diff --git a/parsl/executors/errors.py b/parsl/executors/errors.py index d1f31aae1c..a92ef87e36 100644 --- a/parsl/executors/errors.py +++ b/parsl/executors/errors.py @@ -58,19 +58,6 @@ def __str__(self): return "Failed to deserialize return objects. Reason:{}".format(self.reason) -class SerializationError(ParslError): - """ Failure to serialize data arguments for the tasks - """ - - def __init__(self, fname): - self.fname = fname - self.troubleshooting = "https://parsl.readthedocs.io/en/latest/faq.html#addressing-serializationerror" - - def __str__(self): - return "Failed to serialize data objects for {}. Refer {} ".format(self.fname, - self.troubleshooting) - - class BadMessage(ParslError): """ Mangled/Poorly formatted/Unsupported message received """ diff --git a/parsl/serialize/errors.py b/parsl/serialize/errors.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 7341a0a4af..a8b13bff1b 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -1,3 +1,4 @@ +import importlib import logging from typing import Any, Dict, List, Union @@ -28,6 +29,12 @@ def register_method_for_data(s: SerializerBase) -> None: register_method_for_data(concretes.DillSerializer()) +# When deserialize dynamically loads a deserializer, it will be stored here, +# rather than in the methods_for_* dictionaries, so that loading does not +# cause it to be used for future serializations. +additional_methods_for_deserialization: Dict[bytes, SerializerBase] = {} + + def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: """Serialize and pack function and parameters @@ -104,8 +111,22 @@ def deserialize(payload: bytes) -> Any: result = methods_for_code[header].deserialize(body) elif header in methods_for_data: result = methods_for_data[header].deserialize(body) + elif header in additional_methods_for_deserialization: + result = additional_methods_for_deserialization[header].deserialize(body) else: - raise TypeError("Invalid header: {!r} in data payload. Buffer is either corrupt or not created by ParslSerializer".format(header)) + logger.info("Trying to dynamically load deserializer: {!r}".format(header)) + # This is a user plugin point, so expect exceptions to happen. + try: + module_name, class_name = header.split(b' ', 1) + decoded_module_name = module_name.decode('utf-8') + module = importlib.import_module(decoded_module_name) + deserializer_class = getattr(module, class_name.decode('utf-8')) + deserializer = deserializer_class() + additional_methods_for_deserialization[header] = deserializer + except Exception as e: + raise RuntimeError("Could not dynamically load deserializer") from e + + result = deserializer.deserialize(body) return result From 0c329bb3de1d9b2acc1fed25b5e21b277c64f60a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 11:18:57 +0000 Subject: [PATCH 03/18] Move serialization error definition into parsl.serialize This is in expectation of at least one more exception being added related to serialization as part of ongoing serializer plugin work. --- parsl/executors/errors.py | 13 ------------- parsl/executors/flux/executor.py | 3 ++- parsl/executors/high_throughput/executor.py | 3 ++- parsl/serialize/errors.py | 14 ++++++++++++++ .../test_error_handling/test_serialization_fail.py | 2 +- 5 files changed, 19 insertions(+), 16 deletions(-) create mode 100644 parsl/serialize/errors.py diff --git a/parsl/executors/errors.py b/parsl/executors/errors.py index d1f31aae1c..a92ef87e36 100644 --- a/parsl/executors/errors.py +++ b/parsl/executors/errors.py @@ -58,19 +58,6 @@ def __str__(self): return "Failed to deserialize return objects. Reason:{}".format(self.reason) -class SerializationError(ParslError): - """ Failure to serialize data arguments for the tasks - """ - - def __init__(self, fname): - self.fname = fname - self.troubleshooting = "https://parsl.readthedocs.io/en/latest/faq.html#addressing-serializationerror" - - def __str__(self): - return "Failed to serialize data objects for {}. Refer {} ".format(self.fname, - self.troubleshooting) - - class BadMessage(ParslError): """ Mangled/Poorly formatted/Unsupported message received """ diff --git a/parsl/executors/flux/executor.py b/parsl/executors/flux/executor.py index f840b49750..65c7f701a9 100644 --- a/parsl/executors/flux/executor.py +++ b/parsl/executors/flux/executor.py @@ -21,10 +21,11 @@ from parsl.executors.status_handling import NoStatusHandlingExecutor from parsl.executors.flux.execute_parsl_task import __file__ as _WORKER_PATH from parsl.executors.flux.flux_instance_manager import __file__ as _MANAGER_PATH -from parsl.executors.errors import SerializationError, ScalingFailed +from parsl.executors.errors import ScalingFailed from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider from parsl.serialize import pack_apply_message, deserialize +from parsl.serialize.errors import SerializationError from parsl.app.errors import AppException diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index fbb4004504..68df000c09 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -12,12 +12,13 @@ import math from parsl.serialize import pack_apply_message, deserialize +from parsl.serialize.errors import SerializationError from parsl.app.errors import RemoteExceptionWrapper from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput import interchange from parsl.executors.errors import ( BadMessage, ScalingFailed, - DeserializationError, SerializationError, + DeserializationError, UnsupportedFeatureError ) diff --git a/parsl/serialize/errors.py b/parsl/serialize/errors.py new file mode 100644 index 0000000000..8878d2a987 --- /dev/null +++ b/parsl/serialize/errors.py @@ -0,0 +1,14 @@ +from parsl.errors import ParslError + + +class SerializationError(ParslError): + """Failure to serialize task objects. + """ + + def __init__(self, fname: str) -> None: + self.fname = fname + self.troubleshooting = "https://parsl.readthedocs.io/en/latest/faq.html#addressing-serializationerror" + + def __str__(self) -> str: + return "Failed to serialize objects for an invocation of function {}. Refer {} ".format(self.fname, + self.troubleshooting) diff --git a/parsl/tests/test_error_handling/test_serialization_fail.py b/parsl/tests/test_error_handling/test_serialization_fail.py index 9854d563e0..1f62c199bf 100644 --- a/parsl/tests/test_error_handling/test_serialization_fail.py +++ b/parsl/tests/test_error_handling/test_serialization_fail.py @@ -2,7 +2,7 @@ from parsl import python_app from parsl.tests.configs.htex_local import fresh_config -from parsl.executors.errors import SerializationError +from parsl.serialize.errors import SerializationError def local_config(): From e57b6c62df0923e7d748265b4b7a30329024c4c9 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 11:31:27 +0000 Subject: [PATCH 04/18] Add a bit of error handling and make deserialize call in only one place. --- parsl/serialize/errors.py | 11 +++++++++++ parsl/serialize/facade.py | 11 ++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/parsl/serialize/errors.py b/parsl/serialize/errors.py index 8878d2a987..22f6ee0cfc 100644 --- a/parsl/serialize/errors.py +++ b/parsl/serialize/errors.py @@ -12,3 +12,14 @@ def __init__(self, fname: str) -> None: def __str__(self) -> str: return "Failed to serialize objects for an invocation of function {}. Refer {} ".format(self.fname, self.troubleshooting) + + +class DeserializerPluginError(ParslError): + """Failure to dynamically load a deserializer plugin. + """ + + def __init__(self, header: bytes) -> None: + self.header = bytes + + def __str__(self) -> str: + return "Failed to load deserializer plugin for header {header!r}" diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index a8b13bff1b..bb7e360967 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -4,6 +4,7 @@ import parsl.serialize.concretes as concretes from parsl.serialize.base import SerializerBase +from parsl.serialize.errors import DeserializerPluginError logger = logging.getLogger(__name__) @@ -108,11 +109,11 @@ def deserialize(payload: bytes) -> Any: header, body = payload.split(b'\n', 1) if header in methods_for_code: - result = methods_for_code[header].deserialize(body) + deserializer = methods_for_code[header] elif header in methods_for_data: - result = methods_for_data[header].deserialize(body) + deserializer = methods_for_data[header] elif header in additional_methods_for_deserialization: - result = additional_methods_for_deserialization[header].deserialize(body) + deserializer = additional_methods_for_deserialization[header] else: logger.info("Trying to dynamically load deserializer: {!r}".format(header)) # This is a user plugin point, so expect exceptions to happen. @@ -124,9 +125,9 @@ def deserialize(payload: bytes) -> Any: deserializer = deserializer_class() additional_methods_for_deserialization[header] = deserializer except Exception as e: - raise RuntimeError("Could not dynamically load deserializer") from e + raise DeserializerPluginError(header) from e - result = deserializer.deserialize(body) + result = deserializer.deserialize(body) return result From e3865e4e4ac4d5fe5308b538613ba66f6693dcfc Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 11:44:48 +0000 Subject: [PATCH 05/18] fix f-string --- parsl/serialize/errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/serialize/errors.py b/parsl/serialize/errors.py index 22f6ee0cfc..8bc8197a11 100644 --- a/parsl/serialize/errors.py +++ b/parsl/serialize/errors.py @@ -22,4 +22,4 @@ def __init__(self, header: bytes) -> None: self.header = bytes def __str__(self) -> str: - return "Failed to load deserializer plugin for header {header!r}" + return f"Failed to load deserializer plugin for header {header!r}" From 77123eee412c5234ec1b662cbca7e44a8483e983 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 11:57:53 +0000 Subject: [PATCH 06/18] Correct f-string --- parsl/serialize/errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/serialize/errors.py b/parsl/serialize/errors.py index 8bc8197a11..76acd5e343 100644 --- a/parsl/serialize/errors.py +++ b/parsl/serialize/errors.py @@ -22,4 +22,4 @@ def __init__(self, header: bytes) -> None: self.header = bytes def __str__(self) -> str: - return f"Failed to load deserializer plugin for header {header!r}" + return f"Failed to load deserializer plugin for header {self.header!r}" From fe7669969cb5a76da9c4786e343567866c697488 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 11:58:01 +0000 Subject: [PATCH 07/18] Add proxystore dependency --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index b90806bfc6..aec30ccd7d 100755 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ 'azure' : ['azure<=4', 'msrestazure'], 'workqueue': ['work_queue'], 'flux': ['pyyaml', 'cffi', 'jsonschema'], + 'proxystore': ['proxystore'], # Disabling psi-j since github direct links are not allowed by pypi # 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl'] } From 692691047d7af0b1dfc8dffa9088b77df5ebee38 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 12:54:14 +0000 Subject: [PATCH 08/18] add basic proxystore serializer with raw test --- mypy.ini | 7 +++ parsl/serialize/proxystore.py | 49 +++++++++++++++++++ .../test_serialization/test_proxystore.py | 36 ++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 parsl/serialize/proxystore.py create mode 100644 parsl/tests/test_serialization/test_proxystore.py diff --git a/mypy.ini b/mypy.ini index f8c3c05b8c..14272918b7 100644 --- a/mypy.ini +++ b/mypy.ini @@ -73,6 +73,10 @@ disallow_subclassing_any = True warn_unreachable = True disallow_untyped_defs = True +[mypy-parsl.serialize.proxystore.*] +# parsl/serialize/proxystore.py:9: error: Class cannot subclass "Pickler" (has type "Any") +disallow_subclassing_any = False + [mypy-parsl.executors.base.*] disallow_untyped_defs = True disallow_any_expr = True @@ -198,3 +202,6 @@ ignore_missing_imports = True [mypy-setproctitle.*] ignore_missing_imports = True + +[mypy-proxystore.*] +ignore_missing_imports = True diff --git a/parsl/serialize/proxystore.py b/parsl/serialize/proxystore.py new file mode 100644 index 0000000000..dcecfbc8d8 --- /dev/null +++ b/parsl/serialize/proxystore.py @@ -0,0 +1,49 @@ +import dill +import io +import typing as t + +from parsl.serialize.base import SerializerBase +from proxystore.store import Store + + +class ProxyStoreDeepPickler(dill.Pickler): + """This class extends dill so that certain objects will be stored into + ProxyStore rather than serialized directly. The selection of objects is + made by a user-specified policy. + """ + + def __init__(self, *args: t.Any, should_proxy: t.Callable[[t.Any], bool], store: Store, **kwargs: t.Any) -> None: + super().__init__(*args, **kwargs) + self._store = store + self._should_proxy = should_proxy + + def reducer_override(self, o: t.Any) -> t.Any: + if self._should_proxy(o): + proxy = self._store.proxy(o) + return proxy.__reduce__() + else: + # fall through to dill + return NotImplemented + + +class ProxyStoreSerializer(SerializerBase): + + def __init__(self, *, should_proxy: t.Optional[t.Callable[[t.Any], bool]] = None, store: t.Optional[Store] = None) -> None: + self._store = store + self._should_proxy = should_proxy + + def serialize(self, data: t.Any) -> bytes: + assert self._store is not None + assert self._should_proxy is not None + + assert data is not None + + f = io.BytesIO() + pickler = ProxyStoreDeepPickler(file=f, store=self._store, should_proxy=self._should_proxy) + pickler.dump(data) + return f.getvalue() + + def deserialize(self, body: bytes) -> t.Any: + # because we aren't customising deserialization, use regular + # dill for deserialization + return dill.loads(body) diff --git a/parsl/tests/test_serialization/test_proxystore.py b/parsl/tests/test_serialization/test_proxystore.py new file mode 100644 index 0000000000..1761c6a693 --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore.py @@ -0,0 +1,36 @@ +import pytest + +# from parsl.serialize.facade import methods_for_data +from parsl.serialize.proxystore import ProxyStoreSerializer + + +def policy_example(o): + """Example policy will proxy only lists.""" + return isinstance(o, list) + +@pytest.mark.local +def test_proxystore_nonglobal(): + """Check that non-proxy-store values are roundtripped. + """ + # import in function, because proxystore is not importable in base parsl + # installation. + from proxystore.proxy import Proxy + from proxystore.store import Store, register_store + from proxystore.connectors.file import FileConnector + + store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) + register_store(store) + + s = ProxyStoreSerializer(store=store, should_proxy=policy_example) + + # check roundtrip for an int, which will not be proxystored + roundtripped_7 = s.deserialize(s.serialize(7)) + assert roundtripped_7 == 7 + assert not isinstance(roundtripped_7, Proxy) + + + l = [1,2,3] + k = s.serialize(l) + roundtripped_l = s.deserialize(s.serialize(l)) + assert roundtripped_l == l + assert isinstance(roundtripped_l, Proxy) From e07cc4b9c7290ecd9fcbcac68b4a7240f2f2e52c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 12:58:07 +0000 Subject: [PATCH 09/18] rename test --- .../{test_proxystore.py => test_proxystore_impl.py} | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) rename parsl/tests/test_serialization/{test_proxystore.py => test_proxystore_impl.py} (90%) diff --git a/parsl/tests/test_serialization/test_proxystore.py b/parsl/tests/test_serialization/test_proxystore_impl.py similarity index 90% rename from parsl/tests/test_serialization/test_proxystore.py rename to parsl/tests/test_serialization/test_proxystore_impl.py index 1761c6a693..2ed312e3c9 100644 --- a/parsl/tests/test_serialization/test_proxystore.py +++ b/parsl/tests/test_serialization/test_proxystore_impl.py @@ -1,6 +1,5 @@ import pytest -# from parsl.serialize.facade import methods_for_data from parsl.serialize.proxystore import ProxyStoreSerializer @@ -10,7 +9,7 @@ def policy_example(o): @pytest.mark.local def test_proxystore_nonglobal(): - """Check that non-proxy-store values are roundtripped. + """Check that values are roundtripped, for both proxied and non-proxied types. """ # import in function, because proxystore is not importable in base parsl # installation. From 23da73dfe309a80721e967b1da43124179438c7b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 13:11:20 +0000 Subject: [PATCH 10/18] Use unique store names --- parsl/tests/test_serialization/test_proxystore_impl.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_serialization/test_proxystore_impl.py b/parsl/tests/test_serialization/test_proxystore_impl.py index 2ed312e3c9..4a4a7ae71b 100644 --- a/parsl/tests/test_serialization/test_proxystore_impl.py +++ b/parsl/tests/test_serialization/test_proxystore_impl.py @@ -1,4 +1,5 @@ import pytest +import uuid from parsl.serialize.proxystore import ProxyStoreSerializer @@ -17,7 +18,7 @@ def test_proxystore_nonglobal(): from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector - store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) + store = Store(name='parsl_store_'+str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp")) register_store(store) s = ProxyStoreSerializer(store=store, should_proxy=policy_example) From 70910e802515230ff0258e279096e86979afeeec Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 13:21:57 +0000 Subject: [PATCH 11/18] Remove spurious debug variable --- parsl/tests/test_serialization/test_proxystore_impl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsl/tests/test_serialization/test_proxystore_impl.py b/parsl/tests/test_serialization/test_proxystore_impl.py index 4a4a7ae71b..b2a1021320 100644 --- a/parsl/tests/test_serialization/test_proxystore_impl.py +++ b/parsl/tests/test_serialization/test_proxystore_impl.py @@ -30,7 +30,6 @@ def test_proxystore_nonglobal(): l = [1,2,3] - k = s.serialize(l) roundtripped_l = s.deserialize(s.serialize(l)) assert roundtripped_l == l assert isinstance(roundtripped_l, Proxy) From 5d827de15f04a9491101af36d3267ae1e3bffabe Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 13:40:16 +0000 Subject: [PATCH 12/18] Move another serialization exception; fix docs build --- docs/reference.rst | 4 ++-- parsl/executors/errors.py | 11 ----------- parsl/executors/high_throughput/executor.py | 3 +-- parsl/serialize/errors.py | 14 ++++++++++++-- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/docs/reference.rst b/docs/reference.rst index 8a16f6fd71..d98ed12cab 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -139,8 +139,6 @@ Exceptions parsl.errors.OptionalModuleMissing parsl.executors.errors.ExecutorError parsl.executors.errors.ScalingFailed - parsl.executors.errors.SerializationError - parsl.executors.errors.DeserializationError parsl.executors.errors.BadMessage parsl.dataflow.errors.DataFlowException parsl.dataflow.errors.BadCheckpoint @@ -161,6 +159,8 @@ Exceptions parsl.channels.errors.FileCopyException parsl.executors.high_throughput.errors.WorkerLost parsl.executors.high_throughput.interchange.ManagerLost + parsl.serialize.errors.DeserializationError + parsl.serialize.errors.SerializationError Internal ======== diff --git a/parsl/executors/errors.py b/parsl/executors/errors.py index a92ef87e36..5006494762 100644 --- a/parsl/executors/errors.py +++ b/parsl/executors/errors.py @@ -47,17 +47,6 @@ def __str__(self): return f"Executor {self.executor.label} failed to scale due to: {self.reason}" -class DeserializationError(ParslError): - """ Failure at the Deserialization of results/exceptions from remote workers - """ - - def __init__(self, reason): - self.reason = reason - - def __str__(self): - return "Failed to deserialize return objects. Reason:{}".format(self.reason) - - class BadMessage(ParslError): """ Mangled/Poorly formatted/Unsupported message received """ diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 68df000c09..7ca7b27d77 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -12,13 +12,12 @@ import math from parsl.serialize import pack_apply_message, deserialize -from parsl.serialize.errors import SerializationError +from parsl.serialize.errors import SerializationError, DeserializationError from parsl.app.errors import RemoteExceptionWrapper from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput import interchange from parsl.executors.errors import ( BadMessage, ScalingFailed, - DeserializationError, UnsupportedFeatureError ) diff --git a/parsl/serialize/errors.py b/parsl/serialize/errors.py index 8878d2a987..527d13ba8f 100644 --- a/parsl/serialize/errors.py +++ b/parsl/serialize/errors.py @@ -1,6 +1,17 @@ from parsl.errors import ParslError +class DeserializationError(ParslError): + """Failure at the deserialization of results/exceptions from remote workers. + """ + + def __init__(self, reason: str) -> None: + self.reason = reason + + def __str__(self) -> str: + return f"Failed to deserialize objects. Reason: {self.reason}" + + class SerializationError(ParslError): """Failure to serialize task objects. """ @@ -10,5 +21,4 @@ def __init__(self, fname: str) -> None: self.troubleshooting = "https://parsl.readthedocs.io/en/latest/faq.html#addressing-serializationerror" def __str__(self) -> str: - return "Failed to serialize objects for an invocation of function {}. Refer {} ".format(self.fname, - self.troubleshooting) + return "Failed to serialize objects for an invocation of function {self.fname}. Refer {self.troubleshooting}" From c31695742ee3c63f52f38cbfa6e0a23d410cb4ae Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 13:40:56 +0000 Subject: [PATCH 13/18] Fix f-string --- parsl/serialize/errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/serialize/errors.py b/parsl/serialize/errors.py index 527d13ba8f..7d509dca75 100644 --- a/parsl/serialize/errors.py +++ b/parsl/serialize/errors.py @@ -21,4 +21,4 @@ def __init__(self, fname: str) -> None: self.troubleshooting = "https://parsl.readthedocs.io/en/latest/faq.html#addressing-serializationerror" def __str__(self) -> str: - return "Failed to serialize objects for an invocation of function {self.fname}. Refer {self.troubleshooting}" + return f"Failed to serialize objects for an invocation of function {self.fname}. Refer {self.troubleshooting}" From 385d39adaceac917631fa8f68acb1df03d25d414 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 14:25:55 +0000 Subject: [PATCH 14/18] check types of serialized values --- .../test_serialization/test_proxystore_impl.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/parsl/tests/test_serialization/test_proxystore_impl.py b/parsl/tests/test_serialization/test_proxystore_impl.py index b2a1021320..86932f9b68 100644 --- a/parsl/tests/test_serialization/test_proxystore_impl.py +++ b/parsl/tests/test_serialization/test_proxystore_impl.py @@ -8,6 +8,7 @@ def policy_example(o): """Example policy will proxy only lists.""" return isinstance(o, list) + @pytest.mark.local def test_proxystore_nonglobal(): """Check that values are roundtripped, for both proxied and non-proxied types. @@ -18,18 +19,21 @@ def test_proxystore_nonglobal(): from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector - store = Store(name='parsl_store_'+str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp")) + store = Store(name='parsl_store_' + str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp")) register_store(store) s = ProxyStoreSerializer(store=store, should_proxy=policy_example) # check roundtrip for an int, which will not be proxystored - roundtripped_7 = s.deserialize(s.serialize(7)) + s_7 = s.serialize(7) + assert isinstance(s_7, bytes) + roundtripped_7 = s.deserialize(s_7) assert roundtripped_7 == 7 assert not isinstance(roundtripped_7, Proxy) - - l = [1,2,3] - roundtripped_l = s.deserialize(s.serialize(l)) - assert roundtripped_l == l - assert isinstance(roundtripped_l, Proxy) + v = [1, 2, 3] + s_v = s.serialize(v) + assert isinstance(s_7, bytes) + roundtripped_v = s.deserialize(s_v) + assert roundtripped_v == v + assert isinstance(roundtripped_v, Proxy) From 6c21740bd8b8fd14e2170b71792e4c2ccba6e91f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 15:02:59 +0000 Subject: [PATCH 15/18] more work to make tests pass --- .../test_proxystore_configured.py | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/parsl/tests/test_serialization/test_proxystore_configured.py b/parsl/tests/test_serialization/test_proxystore_configured.py index dd9ac4ddff..45ea29e091 100644 --- a/parsl/tests/test_serialization/test_proxystore_configured.py +++ b/parsl/tests/test_serialization/test_proxystore_configured.py @@ -3,7 +3,7 @@ import uuid import parsl -from parsl.serialize.facade import methods_for_data, register_method_for_data +from parsl.serialize.facade import additional_methods_for_deserialization, methods_for_data, register_method_for_data from parsl.serialize.proxystore import ProxyStoreSerializer from parsl.tests.configs.htex_local import fresh_config @@ -16,13 +16,19 @@ def local_setup(): from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector - store = Store(name='parsl_store_'+str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp")) + store = Store(name='parsl_store_' + str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp")) register_store(store) s = ProxyStoreSerializer(store=store, should_proxy=policy_example) global previous_methods previous_methods = methods_for_data.copy() + + # get rid of all data serialization methods, in preparation for using only + # proxystore. put all the old methods as additional methods used only for + # deserialization, because those will be needed to deserialize the results, + # which will be serialized using the default serializer set. + additional_methods_for_deserialization.update(previous_methods) methods_for_data.clear() register_method_for_data(s) @@ -36,6 +42,7 @@ def local_teardown(): methods_for_data.clear() methods_for_data.update(previous_methods) + additional_methods_for_deserialization.clear() @parsl.python_app @@ -43,9 +50,15 @@ def identity(o): return o +@parsl.python_app +def is_proxy(o) -> bool: + from proxystore.proxy import Proxy + return isinstance(o, Proxy) + + def policy_example(o): """Example policy will proxy only lists.""" - return isinstance(o, list) + return isinstance(o, frozenset) @pytest.mark.local @@ -60,7 +73,9 @@ def test_proxystore_via_apps(): # check roundtrip for a list, which should be proxystored according # to example_policy() - l = [1,2,3] - roundtripped_l = identity(l).result() - assert roundtripped_l == l - assert isinstance(roundtripped_l, Proxy) + v = frozenset([1, 2, 3]) + + assert is_proxy(v).result() + + roundtripped_v = identity(v).result() + assert roundtripped_v == v From fe284184871b6d97a255810842d78c4957ef56b8 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 15:12:14 +0000 Subject: [PATCH 16/18] Rearrange imports to only import proxystore when actually executing proxystore test, as it cannot be imported when optional proxystore is not installed --- parsl/tests/test_serialization/test_proxystore_configured.py | 5 +++-- parsl/tests/test_serialization/test_proxystore_impl.py | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/parsl/tests/test_serialization/test_proxystore_configured.py b/parsl/tests/test_serialization/test_proxystore_configured.py index 45ea29e091..04486a62ce 100644 --- a/parsl/tests/test_serialization/test_proxystore_configured.py +++ b/parsl/tests/test_serialization/test_proxystore_configured.py @@ -4,7 +4,6 @@ import parsl from parsl.serialize.facade import additional_methods_for_deserialization, methods_for_data, register_method_for_data -from parsl.serialize.proxystore import ProxyStoreSerializer from parsl.tests.configs.htex_local import fresh_config @@ -12,10 +11,12 @@ def local_setup(): - parsl.load(fresh_config()) + from parsl.serialize.proxystore import ProxyStoreSerializer from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector + parsl.load(fresh_config()) + store = Store(name='parsl_store_' + str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp")) register_store(store) diff --git a/parsl/tests/test_serialization/test_proxystore_impl.py b/parsl/tests/test_serialization/test_proxystore_impl.py index 86932f9b68..fefc4e2a6e 100644 --- a/parsl/tests/test_serialization/test_proxystore_impl.py +++ b/parsl/tests/test_serialization/test_proxystore_impl.py @@ -1,8 +1,6 @@ import pytest import uuid -from parsl.serialize.proxystore import ProxyStoreSerializer - def policy_example(o): """Example policy will proxy only lists.""" @@ -15,6 +13,7 @@ def test_proxystore_nonglobal(): """ # import in function, because proxystore is not importable in base parsl # installation. + from parsl.serialize.proxystore import ProxyStoreSerializer from proxystore.proxy import Proxy from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector From 3b16da8bfc0930cf8c6a3826f3551cc17d4e29e1 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 15:14:53 +0000 Subject: [PATCH 17/18] Install proxystore for --config local tests --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 02c8aa7f9f..e75f2edc05 100644 --- a/Makefile +++ b/Makefile @@ -74,7 +74,7 @@ wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with wqex_local config .PHONY: config_local_test config_local_test: - pip3 install ".[monitoring]" + pip3 install ".[monitoring,proxystore]" pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10 .PHONY: site_test From f95eceb00dc9be5742552819356e86a157e99a95 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 15:25:28 +0000 Subject: [PATCH 18/18] fixup docstring --- parsl/serialize/base.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index db3a136379..77060d4acf 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -13,13 +13,13 @@ class SerializerBase: @cached_property def identifier(self) -> bytes: - """ - Compute the identifier that will be used to indicate in byte streams - that this class should be used for deserialization. + """Compute identifier used in serialization header. + This will be used to indicate in byte streams that this class should + be used for deserialization.  - Serializers that use identifiers that don't align with the way this is - computed (such as the default concretes.py implementations) should - override this property with their own identifier. + Serializers that use identifiers that don't align with the way this is + computed (such as the default concretes.py implementations) should + override this property with their own identifier.  Returns -------