diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 61225fe329..c3549b3512 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1199,15 +1199,16 @@ def cleanup(self) -> None: if not executor.bad_state_is_set: if isinstance(executor, BlockProviderExecutor): logger.info(f"Scaling in executor {executor.label}") - job_ids = executor.provider.resources.keys() - block_ids = executor.scale_in(len(job_ids)) - if self.monitoring and block_ids: - new_status = {} - for bid in block_ids: - new_status[bid] = JobStatus(JobState.CANCELLED) - msg = executor.create_monitoring_info(new_status) - logger.debug("Sending message {} to hub from DFK".format(msg)) - self.monitoring.send(MessageType.BLOCK_INFO, msg) + if executor.provider: + job_ids = executor.provider.resources.keys() + block_ids = executor.scale_in(len(job_ids)) + if self.monitoring and block_ids: + new_status = {} + for bid in block_ids: + new_status[bid] = JobStatus(JobState.CANCELLED) + msg = executor.create_monitoring_info(new_status) + logger.debug("Sending message {} to hub from DFK".format(msg)) + self.monitoring.send(MessageType.BLOCK_INFO, msg) logger.info(f"Shutting down executor {executor.label}") executor.shutdown() logger.info(f"Shut down executor {executor.label}") diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 8eb2c79799..987cf19b0a 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -97,9 +97,10 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin): address : string An address to connect to the main Parsl process which is reachable from the network in which - workers will be running. This can be either a hostname as returned by ``hostname`` or an - IP address. Most login nodes on clusters have several network interfaces available, only - some of which can be reached from the compute nodes. + workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx). + Most login nodes on clusters have several network interfaces available, only some of which + can be reached from the compute nodes. This field can be used to limit the executor to listen + only on a specific interface, and limiting connections to the internal network. By default, the executor will attempt to enumerate and connect through all possible addresses. Setting an address here overrides the default behavior. default=None @@ -473,6 +474,7 @@ def _start_local_interchange_process(self): kwargs={"client_ports": (self.outgoing_q.port, self.incoming_q.port, self.command_client.port), + "interchange_address": self.address, "worker_ports": self.worker_ports, "worker_port_range": self.worker_port_range, "hub_address": self.hub_address, diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index a12141f02d..d032144655 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -import argparse import zmq import os import sys @@ -14,7 +13,7 @@ import threading import json -from typing import cast, Any, Dict, Set +from typing import cast, Any, Dict, Set, Optional from parsl.utils import setproctitle from parsl.version import VERSION as PARSL_VERSION @@ -29,6 +28,9 @@ HEARTBEAT_CODE = (2 ** 32) - 1 PKL_HEARTBEAT_CODE = pickle.dumps((2 ** 32) - 1) +LOGGER_NAME = "interchange" +logger = logging.getLogger(LOGGER_NAME) + class ManagerLost(Exception): ''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats @@ -39,11 +41,8 @@ def __init__(self, manager_id, hostname): self.tstamp = time.time() self.hostname = hostname - def __repr__(self): - return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname) - def __str__(self): - return self.__repr__() + return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname) class VersionMismatch(Exception): @@ -53,14 +52,11 @@ def __init__(self, interchange_version, manager_version): self.interchange_version = interchange_version self.manager_version = manager_version - def __repr__(self): + def __str__(self): return "Manager version info {} does not match interchange version info {}, causing a critical failure".format( self.manager_version, self.interchange_version) - def __str__(self): - return self.__repr__() - class Interchange: """ Interchange is a task orchestrator for distributed systems. @@ -72,7 +68,7 @@ class Interchange: """ def __init__(self, client_address="127.0.0.1", - interchange_address="127.0.0.1", + interchange_address: Optional[str] = None, client_ports=(50055, 50056, 50057), worker_ports=None, worker_port_range=(54000, 55000), @@ -89,8 +85,9 @@ def __init__(self, client_address : str The ip address at which the parsl client can be reached. Default: "127.0.0.1" - interchange_address : str - The ip address at which the workers will be able to reach the Interchange. Default: "127.0.0.1" + interchange_address : Optional str + If specified the interchange will only listen on this address for connections from workers + else, it binds to all addresses. client_ports : triple(int, int, int) The ports at which the client can be reached @@ -131,7 +128,7 @@ def __init__(self, logger.debug("Initializing Interchange process") self.client_address = client_address - self.interchange_address = interchange_address + self.interchange_address: str = interchange_address or "*" self.poll_period = poll_period logger.info("Attempting connection to client at {} on ports: {},{},{}".format( @@ -166,14 +163,14 @@ def __init__(self, self.worker_task_port = self.worker_ports[0] self.worker_result_port = self.worker_ports[1] - self.task_outgoing.bind("tcp://*:{}".format(self.worker_task_port)) - self.results_incoming.bind("tcp://*:{}".format(self.worker_result_port)) + self.task_outgoing.bind(f"tcp://{self.interchange_address}:{self.worker_task_port}") + self.results_incoming.bind(f"tcp://{self.interchange_address}:{self.worker_result_port}") else: - self.worker_task_port = self.task_outgoing.bind_to_random_port('tcp://*', + self.worker_task_port = self.task_outgoing.bind_to_random_port(f"tcp://{self.interchange_address}", min_port=worker_port_range[0], max_port=worker_port_range[1], max_tries=100) - self.worker_result_port = self.results_incoming.bind_to_random_port('tcp://*', + self.worker_result_port = self.results_incoming.bind_to_random_port(f"tcp://{self.interchange_address}", min_port=worker_port_range[0], max_port=worker_port_range[1], max_tries=100) @@ -580,7 +577,7 @@ def expire_bad_managers(self, interesting_managers, hub_channel): interesting_managers.remove(manager_id) -def start_file_logger(filename, name='interchange', level=logging.DEBUG, format_string=None): +def start_file_logger(filename, level=logging.DEBUG, format_string=None): """Add a stream log handler. Parameters @@ -588,8 +585,6 @@ def start_file_logger(filename, name='interchange', level=logging.DEBUG, format_ filename: string Name of the file to write logs to. Required. - name: string - Logger name. Default="parsl.executors.interchange" level: logging.LEVEL Set the logging level. Default=logging.DEBUG - format_string (string): Set the format string @@ -604,7 +599,7 @@ def start_file_logger(filename, name='interchange', level=logging.DEBUG, format_ format_string = "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d %(processName)s(%(process)d) %(threadName)s %(funcName)s [%(levelname)s] %(message)s" global logger - logger = logging.getLogger(name) + logger = logging.getLogger(LOGGER_NAME) logger.setLevel(level) handler = logging.FileHandler(filename) handler.setLevel(level) @@ -625,46 +620,3 @@ def starter(comm_q, *args, **kwargs): comm_q.put((ic.worker_task_port, ic.worker_result_port)) ic.start() - - -if __name__ == '__main__': - - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--client_address", - help="Client address") - parser.add_argument("-l", "--logdir", default="parsl_worker_logs", - help="Parsl worker log directory") - parser.add_argument("-t", "--task_url", - help="REQUIRED: ZMQ url for receiving tasks") - parser.add_argument("-r", "--result_url", - help="REQUIRED: ZMQ url for posting results") - parser.add_argument("-p", "--poll_period", - help="REQUIRED: poll period used for main thread") - parser.add_argument("--worker_ports", default=None, - help="OPTIONAL, pair of workers ports to listen on, eg --worker_ports=50001,50005") - parser.add_argument("-d", "--debug", action='store_true', - help="Count of apps to launch") - - args = parser.parse_args() - - # Setup logging - global logger - format_string = "%(asctime)s %(name)s:%(lineno)d [%(levelname)s] %(message)s" - - logger = logging.getLogger("interchange") - logger.setLevel(logging.DEBUG) - handler = logging.StreamHandler() - handler.setLevel('DEBUG' if args.debug is True else 'INFO') - formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S') - handler.setFormatter(formatter) - logger.addHandler(handler) - - logger.debug("Starting Interchange") - - optionals = {} - - if args.worker_ports: - optionals['worker_ports'] = [int(i) for i in args.worker_ports.split(',')] - - ic = Interchange(**optionals) - ic.start() diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 7998ceb582..835390e33f 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -44,7 +44,7 @@ class BlockProviderExecutor(ParslExecutor): I'm not sure of use cases for switchability at the moment beyond "yes or no" """ def __init__(self, *, - provider: ExecutionProvider, + provider: Optional[ExecutionProvider], block_error_handler: bool): super().__init__() self._provider = provider diff --git a/parsl/executors/taskvine/install-taskvine.sh b/parsl/executors/taskvine/install-taskvine.sh index 98ecdd3683..c253753fd4 100755 --- a/parsl/executors/taskvine/install-taskvine.sh +++ b/parsl/executors/taskvine/install-taskvine.sh @@ -2,15 +2,17 @@ set -xe -if [ "$CCTOOLS_VERSION" == "" ] ; then +if [[ -z $CCTOOLS_VERSION ]]; then echo Environment variable CCTOOLS_VERSION must be set exit 1 fi -if [ -f "/etc/redhat-release" ]; then - wget -O /tmp/cctools.tar.gz "https://github.com/cooperative-computing-lab/cctools/releases/download/release/$CCTOOLS_VERSION/cctools-$CCTOOLS_VERSION-x86_64-centos8.tar.gz" -else - wget -O /tmp/cctools.tar.gz "https://github.com/cooperative-computing-lab/cctools/releases/download/release/$CCTOOLS_VERSION/cctools-$CCTOOLS_VERSION-x86_64-ubuntu20.04.tar.gz" -fi +TARBALL="cctools-$CCTOOLS_VERSION-x86_64-ubuntu20.04.tar.gz" +[[ -f "/etc/redhat-release" ]] && TARBALL="cctools-$CCTOOLS_VERSION-x86_64-centos8.tar.gz" + +# If stderr is *not* a TTY, then disable progress bar and show HTTP response headers +[[ ! -t 1 ]] && NO_VERBOSE="--no-verbose" SHOW_HEADERS="-S" +wget "$NO_VERBOSE" "$SHOW_HEADERS" -O /tmp/cctools.tar.gz "https://github.com/cooperative-computing-lab/cctools/releases/download/release/$CCTOOLS_VERSION/$TARBALL" + mkdir -p /tmp/cctools -tar -C /tmp/cctools -zxvf /tmp/cctools.tar.gz --strip-components=1 +tar -C /tmp/cctools -zxf /tmp/cctools.tar.gz --strip-components=1 diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 10c886e6bc..fc937f1720 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -20,12 +20,10 @@ from parsl.monitoring.message_type import MessageType from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage -from typing import cast, Any, Callable, Dict, Optional, Sequence, Union +from typing import cast, Any, Callable, Dict, Optional, Sequence, Tuple, Union _db_manager_excepts: Optional[Exception] -from typing import Optional, Tuple - try: from parsl.monitoring.db_manager import dbm_starter diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index 0dea791cfc..bc4a881abc 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -10,10 +10,6 @@ class SerializerBase(metaclass=ABCMeta): # For deserializer _identifier: bytes - # For serializer - _for_code: bool - _for_data: bool - # For deserializer @property def identifier(self) -> bytes: diff --git a/parsl/serialize/concretes.py b/parsl/serialize/concretes.py index e7a83bd411..56ca9d7ddc 100644 --- a/parsl/serialize/concretes.py +++ b/parsl/serialize/concretes.py @@ -18,8 +18,6 @@ class PickleSerializer(SerializerBase): """ _identifier = b'01' - _for_code = False - _for_data = True def serialize(self, data: Any) -> bytes: return pickle.dumps(data) @@ -41,8 +39,6 @@ class DillSerializer(SerializerBase): """ _identifier = b'02' - _for_code = False - _for_data = True def serialize(self, data: Any) -> bytes: return dill.dumps(data) @@ -58,8 +54,6 @@ class DillCallableSerializer(SerializerBase): """ _identifier = b'C2' - _for_code = True - _for_data = False @functools.lru_cache def serialize(self, data: Any) -> bytes: diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index d1cb7657bf..d586cb2960 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -1,19 +1,12 @@ import logging -import parsl.serialize.concretes as c from parsl.serialize.plugin_codeprotector import CodeProtectorSerializer +from typing import Any, Dict, List, Union +import parsl.serialize.concretes as concretes from parsl.serialize.base import SerializerBase -from typing import Any, List, Union logger = logging.getLogger(__name__) -# these are used for two directions: - -# 1. to iterate over to find a valid serializer -# for that, the ordering is important - -# 2. to perform an ID -> deserializer lookup - # These must be registered in reverse order of # importance: later registered serializers # will take priority over earlier ones. This is @@ -25,6 +18,7 @@ methods_for_data: List[SerializerBase] methods_for_data = [] +deserializers: Dict[bytes, SerializerBase] deserializers = {} @@ -38,15 +32,6 @@ def clear_serializers() -> None: methods_for_data = [] -def register_serializer(serializer: SerializerBase) -> None: - deserializers[serializer._identifier] = serializer - - if serializer._for_code: - methods_for_code.insert(0, serializer) - if serializer._for_data: - methods_for_data.insert(0, serializer) - - def unregister_serializer(serializer: SerializerBase) -> None: logger.info(f"BENC: deserializers {deserializers}, serializer {serializer}") logger.info(f"BENC: unregistering serializer {serializer}") @@ -66,16 +51,27 @@ def unregister_serializer(serializer: SerializerBase) -> None: logger.warning("BENC: not found in methods for data") -register_serializer(c.DillSerializer()) - -# register_serializer(c.DillCallableSerializer()) -register_serializer(CodeProtectorSerializer()) # structuring it this way is probably wrong - should perhaps be a single # Pickle-variant (or dill-variant) that is used, with all pluggable hooked # in - eg proxystore should hook into the same Pickle/Dill subclass as # CodeProtector? -register_serializer(c.PickleSerializer()) +def register_method_for_code(s: SerializerBase) -> None: + deserializers[s.identifier] = s + methods_for_code.insert(0, s) + + +# register_method_for_code(concretes.DillCallableSerializer()) +register_method_for_code(CodeProtectorSerializer()) + + +def register_method_for_data(s: SerializerBase) -> None: + deserializers[s.identifier] = s + methods_for_data.insert(0, s) + + +register_method_for_data(concretes.PickleSerializer()) +register_method_for_data(concretes.DillSerializer()) def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py index 604c9a017f..8659d24136 100644 --- a/parsl/serialize/plugin_proxystore.py +++ b/parsl/serialize/plugin_proxystore.py @@ -1,7 +1,7 @@ # parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector -from parsl.serialize.facade import register_serializer +from parsl.serialize.facade import register_method_for_data from parsl.serialize.base import SerializerBase @@ -37,7 +37,7 @@ def deserialize(self, body: bytes) -> Any: def register_proxystore_serializer() -> None: """Initializes proxystore and registers it as a serializer with parsl""" serializer = create_proxystore_serializer() - register_serializer(serializer) + register_method_for_data(serializer) def create_proxystore_serializer() -> ProxyStoreSerializer: diff --git a/parsl/tests/configs/workqueue_ex.py b/parsl/tests/configs/workqueue_ex.py index a2d62ff6da..c64be32295 100644 --- a/parsl/tests/configs/workqueue_ex.py +++ b/parsl/tests/configs/workqueue_ex.py @@ -8,4 +8,5 @@ def fresh_config(): return Config(executors=[WorkQueueExecutor(port=9000, + coprocess=True, storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()])]) diff --git a/parsl/tests/test_htex/__init__.py b/parsl/tests/test_htex/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/parsl/tests/test_htex/test_htex_zmq_binding.py b/parsl/tests/test_htex/test_htex_zmq_binding.py new file mode 100644 index 0000000000..442f0431b6 --- /dev/null +++ b/parsl/tests/test_htex/test_htex_zmq_binding.py @@ -0,0 +1,46 @@ +import logging + +import psutil +import pytest +import zmq + +from parsl.executors.high_throughput.interchange import Interchange + + +def test_interchange_binding_no_address(): + ix = Interchange() + assert ix.interchange_address == "*" + + +def test_interchange_binding_with_address(): + # Using loopback address + address = "127.0.0.1" + ix = Interchange(interchange_address=address) + assert ix.interchange_address == address + + +def test_interchange_binding_with_non_ipv4_address(): + # Confirm that a ipv4 address is required + address = "localhost" + with pytest.raises(zmq.error.ZMQError): + Interchange(interchange_address=address) + + +def test_interchange_binding_bad_address(): + """ Confirm that we raise a ZMQError when a bad address is supplied""" + address = "550.0.0.0" + with pytest.raises(zmq.error.ZMQError): + Interchange(interchange_address=address) + + +def test_limited_interface_binding(): + """ When address is specified the worker_port would be bound to it rather than to 0.0.0.0""" + address = "127.0.0.1" + ix = Interchange(interchange_address=address) + ix.worker_result_port + proc = psutil.Process() + conns = proc.connections(kind="tcp") + + matched_conns = [conn for conn in conns if conn.laddr.port == ix.worker_result_port] + assert len(matched_conns) == 1 + assert matched_conns[0].laddr.ip == address diff --git a/parsl/tests/test_regression/test_69b.py b/parsl/tests/test_regression/test_69b.py deleted file mode 100644 index a898ab2d9b..0000000000 --- a/parsl/tests/test_regression/test_69b.py +++ /dev/null @@ -1,125 +0,0 @@ -''' -Regression tests for issue #69. -''' -import time - -import pytest - -import parsl -from parsl.app.app import bash_app, python_app -from parsl.data_provider.files import File -from parsl.tests.configs.local_threads import config - - -@python_app -def double(x): - import time - time.sleep(1) - return x * 2 - - -def test_1(): - - x = double(5) - print(x.done()) - - # Testing. Adding explicit block - x.result() - - -@python_app -def sleep_double(x): - import time - time.sleep(0.2) - return x * 2 - - -def test_2(): - - # doubled_x is an AppFuture - doubled_x = sleep_double(10) - - # The result() waits till the sleep_double() app is done (2s wait) and then prints - # the result from the app *10* - print(doubled_x.result()) - - -@python_app -def wait_sleep_double(x, fu_1, fu_2): - import time - time.sleep(0.2) - return x * 2 - - -@pytest.mark.skip('fails intermittently; too sensitive to machine load') -def test_3(): - - start = time.time() - - # Launch two apps, which will execute in parallel - doubled_x = wait_sleep_double(10, None, None) - doubled_y = wait_sleep_double(10, None, None) - - # The third depends on the first two : - # doubled_x doubled_y (2 s) - # \ / - # doublex_z (2 s) - doubled_z = wait_sleep_double(10, doubled_x, doubled_y) - - # doubled_z will be done in ~4s - print(doubled_z.result()) - - end = time.time() - - delta = (end - start) * 10 - print("delta : ", delta) - assert delta > 4, "Took too little time" - assert delta < 5, "Took too much time" - - -@python_app -def bad_divide(x): - return 6 / x - - -def test_4(): - - doubled_x = bad_divide(0) - - try: - doubled_x.result() - - except ZeroDivisionError: - print("Oops! You tried to divide by 0 ") - except Exception: - print("Oops! Something really bad happened") - - -@bash_app -def echo(message, outputs=[]): - return 'echo {0} &> {outputs[0]}'.format(message, outputs=outputs) - -# This app *cat*sthe contents ofthe first file in its inputs[] kwargs to -# the first file in its outputs[] kwargs - - -@bash_app -def cat(inputs=[], outputs=[], stdout='cat.out', stderr='cat.err'): - return 'cat {inputs[0]} > {outputs[0]}'.format(inputs=inputs, outputs=outputs) - - -@pytest.mark.staging_required -def test_5(): - """Testing behavior of outputs """ - # Call echo specifying the outputfile - hello = echo("Hello World!", outputs=[File('hello1.txt')]) - - # the outputs attribute of the AppFuture is a list of DataFutures - print(hello.outputs) - - # This step *cat*s hello1.txt to hello2.txt - hello2 = cat(inputs=[hello.outputs[0]], outputs=[File('hello2.txt')]) - - hello2.result() - with open(hello2.outputs[0].result().filepath, 'r') as f: - print(f.read()) diff --git a/parsl/tests/test_serialization/config_serpent.py b/parsl/tests/test_serialization/config_serpent.py index 5175fa03c8..5fcd0d2e4d 100644 --- a/parsl/tests/test_serialization/config_serpent.py +++ b/parsl/tests/test_serialization/config_serpent.py @@ -22,7 +22,7 @@ from parsl.data_provider.ftp import FTPInTaskStaging from parsl.data_provider.file_noop import NoOpFileStaging -from parsl.serialize.facade import register_serializer # TODO: move this into parsl.serialize root as its user exposed +from parsl.serialize.facade import register_method_for_data # TODO: move this into parsl.serialize root as its user exposed from parsl.serialize.plugin_serpent import SerpentSerializer @@ -30,7 +30,7 @@ def fresh_config(): - register_serializer(SerpentSerializer()) + register_method_for_data(SerpentSerializer()) return Config( executors=[ diff --git a/parsl/tests/test_serialization/test_plugin.py b/parsl/tests/test_serialization/test_plugin.py index bd546f5a5d..900aa1b494 100644 --- a/parsl/tests/test_serialization/test_plugin.py +++ b/parsl/tests/test_serialization/test_plugin.py @@ -3,7 +3,7 @@ import pytest from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer +from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer B_MAGIC = b'3626874628368432' # arbitrary const bytestring V_MAGIC = 777 # arbitrary const object @@ -15,9 +15,6 @@ class ConstSerializer(SerializerBase): deserialization. """ - _for_code = True - _for_data = True - # TODO: should be enforcing/defaulting this to class name so that by default we can dynamically load the serializer remotely? _identifier = b'parsl.tests.test_serializer.test_plugin ConstSerializer' # note a space in the name here not final dot, to distinguish modules vs attribute in module @@ -40,7 +37,7 @@ def deserialize(self, b): @pytest.mark.local def test_const_inprocess(): s = ConstSerializer() - register_serializer(s) + register_method_for_data(s) try: diff --git a/parsl/tests/test_serialization/test_plugin_htex.py b/parsl/tests/test_serialization/test_plugin_htex.py index f486c4ae3c..5dcede2e05 100644 --- a/parsl/tests/test_serialization/test_plugin_htex.py +++ b/parsl/tests/test_serialization/test_plugin_htex.py @@ -6,7 +6,7 @@ from parsl.tests.configs.htex_local import fresh_config as local_config from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer +from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer logger = logging.getLogger(__name__) @@ -50,7 +50,7 @@ def func(x): @pytest.mark.local def test_const_inprocess(): s = XXXXSerializer() - register_serializer(s) + register_method_for_data(s) try: assert func(100).result() == 100 # but how do we know this went through XXXXSerializer? (or not) diff --git a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py index 7dce270298..324eddd249 100644 --- a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py +++ b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py @@ -6,7 +6,7 @@ from parsl.tests.configs.htex_local import fresh_config as local_config from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer, clear_serializers +from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer, clear_serializers from parsl.serialize.plugin_proxystore_deep_pickle import create_deep_proxystore_serializer @@ -45,7 +45,7 @@ def test_proxystore_single_call(): clear_serializers() s = create_deep_proxystore_serializer(policy=MyDemo) - register_serializer(s) + register_method_for_data(s) try: assert func(100, 4).result() == 105 # but how do we know this went through proxystore? diff --git a/parsl/tests/test_serialization/test_proxystore_htex.py b/parsl/tests/test_serialization/test_proxystore_htex.py index 1efa596b5f..18a60e0774 100644 --- a/parsl/tests/test_serialization/test_proxystore_htex.py +++ b/parsl/tests/test_serialization/test_proxystore_htex.py @@ -6,7 +6,7 @@ from parsl.tests.configs.htex_local import fresh_config as local_config from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer +from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer from parsl.serialize.plugin_proxystore import create_proxystore_serializer @@ -28,7 +28,7 @@ def func(x, y): @pytest.mark.local def test_proxystore_single_call(): s = create_proxystore_serializer() # TODO this isnt' testing the register_proxystore_serializer function... - register_serializer(s) + register_method_for_data(s) try: assert func(100, 4).result() == 105 # but how do we know this went through proxystore? diff --git a/parsl/tests/test_serialization/test_serpent_htex.py b/parsl/tests/test_serialization/test_serpent_htex.py index c69b53dbd2..54377aa94b 100644 --- a/parsl/tests/test_serialization/test_serpent_htex.py +++ b/parsl/tests/test_serialization/test_serpent_htex.py @@ -6,7 +6,7 @@ from parsl.tests.configs.htex_local import fresh_config as local_config from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer +from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer from parsl.serialize.plugin_serpent import SerpentSerializer @@ -21,7 +21,7 @@ def func(x): @pytest.mark.local def test_serpent_single_call(): s = SerpentSerializer() - register_serializer(s) + register_method_for_data(s) try: assert func(100).result() == 101 # but how do we know this went through serpent? TODO