Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
Browse files Browse the repository at this point in the history
…ystore

Conflicts:
	parsl/serialize/base.py
	parsl/serialize/facade.py
  • Loading branch information
benclifford committed Jul 19, 2023
2 parents 759ff4d + c39700b commit 2e8bdea
Show file tree
Hide file tree
Showing 20 changed files with 123 additions and 263 deletions.
19 changes: 10 additions & 9 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1199,15 +1199,16 @@ def cleanup(self) -> None:
if not executor.bad_state_is_set:
if isinstance(executor, BlockProviderExecutor):
logger.info(f"Scaling in executor {executor.label}")
job_ids = executor.provider.resources.keys()
block_ids = executor.scale_in(len(job_ids))
if self.monitoring and block_ids:
new_status = {}
for bid in block_ids:
new_status[bid] = JobStatus(JobState.CANCELLED)
msg = executor.create_monitoring_info(new_status)
logger.debug("Sending message {} to hub from DFK".format(msg))
self.monitoring.send(MessageType.BLOCK_INFO, msg)
if executor.provider:
job_ids = executor.provider.resources.keys()
block_ids = executor.scale_in(len(job_ids))
if self.monitoring and block_ids:
new_status = {}
for bid in block_ids:
new_status[bid] = JobStatus(JobState.CANCELLED)
msg = executor.create_monitoring_info(new_status)
logger.debug("Sending message {} to hub from DFK".format(msg))
self.monitoring.send(MessageType.BLOCK_INFO, msg)
logger.info(f"Shutting down executor {executor.label}")
executor.shutdown()
logger.info(f"Shut down executor {executor.label}")
Expand Down
8 changes: 5 additions & 3 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin):
address : string
An address to connect to the main Parsl process which is reachable from the network in which
workers will be running. This can be either a hostname as returned by ``hostname`` or an
IP address. Most login nodes on clusters have several network interfaces available, only
some of which can be reached from the compute nodes.
workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx).
Most login nodes on clusters have several network interfaces available, only some of which
can be reached from the compute nodes. This field can be used to limit the executor to listen
only on a specific interface, and limiting connections to the internal network.
By default, the executor will attempt to enumerate and connect through all possible addresses.
Setting an address here overrides the default behavior.
default=None
Expand Down Expand Up @@ -473,6 +474,7 @@ def _start_local_interchange_process(self):
kwargs={"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
"interchange_address": self.address,
"worker_ports": self.worker_ports,
"worker_port_range": self.worker_port_range,
"hub_address": self.hub_address,
Expand Down
82 changes: 17 additions & 65 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python
import argparse
import zmq
import os
import sys
Expand All @@ -14,7 +13,7 @@
import threading
import json

from typing import cast, Any, Dict, Set
from typing import cast, Any, Dict, Set, Optional

from parsl.utils import setproctitle
from parsl.version import VERSION as PARSL_VERSION
Expand All @@ -29,6 +28,9 @@
HEARTBEAT_CODE = (2 ** 32) - 1
PKL_HEARTBEAT_CODE = pickle.dumps((2 ** 32) - 1)

LOGGER_NAME = "interchange"
logger = logging.getLogger(LOGGER_NAME)


class ManagerLost(Exception):
''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats
Expand All @@ -39,11 +41,8 @@ def __init__(self, manager_id, hostname):
self.tstamp = time.time()
self.hostname = hostname

def __repr__(self):
return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname)

def __str__(self):
return self.__repr__()
return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname)


class VersionMismatch(Exception):
Expand All @@ -53,14 +52,11 @@ def __init__(self, interchange_version, manager_version):
self.interchange_version = interchange_version
self.manager_version = manager_version

def __repr__(self):
def __str__(self):
return "Manager version info {} does not match interchange version info {}, causing a critical failure".format(
self.manager_version,
self.interchange_version)

def __str__(self):
return self.__repr__()


class Interchange:
""" Interchange is a task orchestrator for distributed systems.
Expand All @@ -72,7 +68,7 @@ class Interchange:
"""
def __init__(self,
client_address="127.0.0.1",
interchange_address="127.0.0.1",
interchange_address: Optional[str] = None,
client_ports=(50055, 50056, 50057),
worker_ports=None,
worker_port_range=(54000, 55000),
Expand All @@ -89,8 +85,9 @@ def __init__(self,
client_address : str
The ip address at which the parsl client can be reached. Default: "127.0.0.1"
interchange_address : str
The ip address at which the workers will be able to reach the Interchange. Default: "127.0.0.1"
interchange_address : Optional str
If specified the interchange will only listen on this address for connections from workers
else, it binds to all addresses.
client_ports : triple(int, int, int)
The ports at which the client can be reached
Expand Down Expand Up @@ -131,7 +128,7 @@ def __init__(self,
logger.debug("Initializing Interchange process")

self.client_address = client_address
self.interchange_address = interchange_address
self.interchange_address: str = interchange_address or "*"
self.poll_period = poll_period

logger.info("Attempting connection to client at {} on ports: {},{},{}".format(
Expand Down Expand Up @@ -166,14 +163,14 @@ def __init__(self,
self.worker_task_port = self.worker_ports[0]
self.worker_result_port = self.worker_ports[1]

self.task_outgoing.bind("tcp://*:{}".format(self.worker_task_port))
self.results_incoming.bind("tcp://*:{}".format(self.worker_result_port))
self.task_outgoing.bind(f"tcp://{self.interchange_address}:{self.worker_task_port}")
self.results_incoming.bind(f"tcp://{self.interchange_address}:{self.worker_result_port}")

else:
self.worker_task_port = self.task_outgoing.bind_to_random_port('tcp://*',
self.worker_task_port = self.task_outgoing.bind_to_random_port(f"tcp://{self.interchange_address}",
min_port=worker_port_range[0],
max_port=worker_port_range[1], max_tries=100)
self.worker_result_port = self.results_incoming.bind_to_random_port('tcp://*',
self.worker_result_port = self.results_incoming.bind_to_random_port(f"tcp://{self.interchange_address}",
min_port=worker_port_range[0],
max_port=worker_port_range[1], max_tries=100)

Expand Down Expand Up @@ -580,16 +577,14 @@ def expire_bad_managers(self, interesting_managers, hub_channel):
interesting_managers.remove(manager_id)


def start_file_logger(filename, name='interchange', level=logging.DEBUG, format_string=None):
def start_file_logger(filename, level=logging.DEBUG, format_string=None):
"""Add a stream log handler.
Parameters
---------
filename: string
Name of the file to write logs to. Required.
name: string
Logger name. Default="parsl.executors.interchange"
level: logging.LEVEL
Set the logging level. Default=logging.DEBUG
- format_string (string): Set the format string
Expand All @@ -604,7 +599,7 @@ def start_file_logger(filename, name='interchange', level=logging.DEBUG, format_
format_string = "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d %(processName)s(%(process)d) %(threadName)s %(funcName)s [%(levelname)s] %(message)s"

global logger
logger = logging.getLogger(name)
logger = logging.getLogger(LOGGER_NAME)
logger.setLevel(level)
handler = logging.FileHandler(filename)
handler.setLevel(level)
Expand All @@ -625,46 +620,3 @@ def starter(comm_q, *args, **kwargs):
comm_q.put((ic.worker_task_port,
ic.worker_result_port))
ic.start()


if __name__ == '__main__':

parser = argparse.ArgumentParser()
parser.add_argument("-c", "--client_address",
help="Client address")
parser.add_argument("-l", "--logdir", default="parsl_worker_logs",
help="Parsl worker log directory")
parser.add_argument("-t", "--task_url",
help="REQUIRED: ZMQ url for receiving tasks")
parser.add_argument("-r", "--result_url",
help="REQUIRED: ZMQ url for posting results")
parser.add_argument("-p", "--poll_period",
help="REQUIRED: poll period used for main thread")
parser.add_argument("--worker_ports", default=None,
help="OPTIONAL, pair of workers ports to listen on, eg --worker_ports=50001,50005")
parser.add_argument("-d", "--debug", action='store_true',
help="Count of apps to launch")

args = parser.parse_args()

# Setup logging
global logger
format_string = "%(asctime)s %(name)s:%(lineno)d [%(levelname)s] %(message)s"

logger = logging.getLogger("interchange")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel('DEBUG' if args.debug is True else 'INFO')
formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)
logger.addHandler(handler)

logger.debug("Starting Interchange")

optionals = {}

if args.worker_ports:
optionals['worker_ports'] = [int(i) for i in args.worker_ports.split(',')]

ic = Interchange(**optionals)
ic.start()
2 changes: 1 addition & 1 deletion parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class BlockProviderExecutor(ParslExecutor):
I'm not sure of use cases for switchability at the moment beyond "yes or no"
"""
def __init__(self, *,
provider: ExecutionProvider,
provider: Optional[ExecutionProvider],
block_error_handler: bool):
super().__init__()
self._provider = provider
Expand Down
16 changes: 9 additions & 7 deletions parsl/executors/taskvine/install-taskvine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

set -xe

if [ "$CCTOOLS_VERSION" == "" ] ; then
if [[ -z $CCTOOLS_VERSION ]]; then
echo Environment variable CCTOOLS_VERSION must be set
exit 1
fi

if [ -f "/etc/redhat-release" ]; then
wget -O /tmp/cctools.tar.gz "https://github.com/cooperative-computing-lab/cctools/releases/download/release/$CCTOOLS_VERSION/cctools-$CCTOOLS_VERSION-x86_64-centos8.tar.gz"
else
wget -O /tmp/cctools.tar.gz "https://github.com/cooperative-computing-lab/cctools/releases/download/release/$CCTOOLS_VERSION/cctools-$CCTOOLS_VERSION-x86_64-ubuntu20.04.tar.gz"
fi
TARBALL="cctools-$CCTOOLS_VERSION-x86_64-ubuntu20.04.tar.gz"
[[ -f "/etc/redhat-release" ]] && TARBALL="cctools-$CCTOOLS_VERSION-x86_64-centos8.tar.gz"

# If stderr is *not* a TTY, then disable progress bar and show HTTP response headers
[[ ! -t 1 ]] && NO_VERBOSE="--no-verbose" SHOW_HEADERS="-S"
wget "$NO_VERBOSE" "$SHOW_HEADERS" -O /tmp/cctools.tar.gz "https://github.com/cooperative-computing-lab/cctools/releases/download/release/$CCTOOLS_VERSION/$TARBALL"

mkdir -p /tmp/cctools
tar -C /tmp/cctools -zxvf /tmp/cctools.tar.gz --strip-components=1
tar -C /tmp/cctools -zxf /tmp/cctools.tar.gz --strip-components=1
4 changes: 1 addition & 3 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@

from parsl.monitoring.message_type import MessageType
from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage
from typing import cast, Any, Callable, Dict, Optional, Sequence, Union
from typing import cast, Any, Callable, Dict, Optional, Sequence, Tuple, Union

_db_manager_excepts: Optional[Exception]

from typing import Optional, Tuple


try:
from parsl.monitoring.db_manager import dbm_starter
Expand Down
4 changes: 0 additions & 4 deletions parsl/serialize/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ class SerializerBase(metaclass=ABCMeta):
# For deserializer
_identifier: bytes

# For serializer
_for_code: bool
_for_data: bool

# For deserializer
@property
def identifier(self) -> bytes:
Expand Down
6 changes: 0 additions & 6 deletions parsl/serialize/concretes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ class PickleSerializer(SerializerBase):
"""

_identifier = b'01'
_for_code = False
_for_data = True

def serialize(self, data: Any) -> bytes:
return pickle.dumps(data)
Expand All @@ -41,8 +39,6 @@ class DillSerializer(SerializerBase):
"""

_identifier = b'02'
_for_code = False
_for_data = True

def serialize(self, data: Any) -> bytes:
return dill.dumps(data)
Expand All @@ -58,8 +54,6 @@ class DillCallableSerializer(SerializerBase):
"""

_identifier = b'C2'
_for_code = True
_for_data = False

@functools.lru_cache
def serialize(self, data: Any) -> bytes:
Expand Down
42 changes: 19 additions & 23 deletions parsl/serialize/facade.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
import logging
import parsl.serialize.concretes as c
from parsl.serialize.plugin_codeprotector import CodeProtectorSerializer
from typing import Any, Dict, List, Union

import parsl.serialize.concretes as concretes
from parsl.serialize.base import SerializerBase
from typing import Any, List, Union

logger = logging.getLogger(__name__)

# these are used for two directions:

# 1. to iterate over to find a valid serializer
# for that, the ordering is important

# 2. to perform an ID -> deserializer lookup

# These must be registered in reverse order of
# importance: later registered serializers
# will take priority over earlier ones. This is
Expand All @@ -25,6 +18,7 @@
methods_for_data: List[SerializerBase]
methods_for_data = []

deserializers: Dict[bytes, SerializerBase]
deserializers = {}


Expand All @@ -38,15 +32,6 @@ def clear_serializers() -> None:
methods_for_data = []


def register_serializer(serializer: SerializerBase) -> None:
deserializers[serializer._identifier] = serializer

if serializer._for_code:
methods_for_code.insert(0, serializer)
if serializer._for_data:
methods_for_data.insert(0, serializer)


def unregister_serializer(serializer: SerializerBase) -> None:
logger.info(f"BENC: deserializers {deserializers}, serializer {serializer}")
logger.info(f"BENC: unregistering serializer {serializer}")
Expand All @@ -66,16 +51,27 @@ def unregister_serializer(serializer: SerializerBase) -> None:
logger.warning("BENC: not found in methods for data")


register_serializer(c.DillSerializer())

# register_serializer(c.DillCallableSerializer())
register_serializer(CodeProtectorSerializer())
# structuring it this way is probably wrong - should perhaps be a single
# Pickle-variant (or dill-variant) that is used, with all pluggable hooked
# in - eg proxystore should hook into the same Pickle/Dill subclass as
# CodeProtector?

register_serializer(c.PickleSerializer())
def register_method_for_code(s: SerializerBase) -> None:
deserializers[s.identifier] = s
methods_for_code.insert(0, s)


# register_method_for_code(concretes.DillCallableSerializer())
register_method_for_code(CodeProtectorSerializer())


def register_method_for_data(s: SerializerBase) -> None:
deserializers[s.identifier] = s
methods_for_data.insert(0, s)


register_method_for_data(concretes.PickleSerializer())
register_method_for_data(concretes.DillSerializer())


def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes:
Expand Down
Loading

0 comments on commit 2e8bdea

Please sign in to comment.