diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a8429ed354..19ecd519ab 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -747,11 +747,10 @@ def launch_task(self, task_record: TaskRecord) -> Future: kwargs=kwargs, x_try_id=try_id, x_task_id=task_id, - monitoring_hub_url=self.monitoring.monitoring_hub_url, + radio_config=executor.remote_monitoring_radio_config, run_id=self.run_id, logging_level=wrapper_logging_level, sleep_dur=self.monitoring.resource_monitoring_interval, - radio_mode=executor.radio_mode, monitor_resources=executor.monitor_resources(), run_dir=self.run_dir) @@ -1179,6 +1178,19 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: executor.hub_address = self.monitoring.hub_address executor.hub_zmq_port = self.monitoring.hub_zmq_port executor.submit_monitoring_radio = self.monitoring.radio + # this will modify the radio config object: it will add relevant parameters needed + # for the particular remote radio sender to communicate back + logger.info("starting monitoring receiver " + f"for executor {executor} " + f"with remote monitoring radio config {executor.remote_monitoring_radio_config}") + executor.monitoring_receiver = self.monitoring.start_receiver(executor.remote_monitoring_radio_config, + ip=self.monitoring.hub_address, + run_dir=self.run_dir) + # TODO: this is a weird way to start the receiver. + # Rather than in executor.start, but there's a tangle here + # trying to make the executors usable in a non-pure-parsl + # context where there is no DFK to grab config out of? + # (and no monitoring...) if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') diff --git a/parsl/executors/base.py b/parsl/executors/base.py index fc97db89d3..94c485d4be 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -1,3 +1,4 @@ +import logging import os from abc import ABCMeta, abstractmethod from concurrent.futures import Future @@ -5,7 +6,14 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.base import ( + MonitoringRadioReceiver, + MonitoringRadioSender, + RadioConfig, +) +from parsl.monitoring.radios.udp import UDPRadio + +logger = logging.getLogger(__name__) class ParslExecutor(metaclass=ABCMeta): @@ -19,15 +27,13 @@ class ParslExecutor(metaclass=ABCMeta): no arguments and re-raises any thrown exception. In addition to the listed methods, a ParslExecutor instance must always - have a member field: + have these member fields: label: str - a human readable label for the executor, unique with respect to other executors. - Per-executor monitoring behaviour can be influenced by exposing: - - radio_mode: str - a string describing which radio mode should be used to - send task resource data back to the submit side. + remote_monitoring_radio_config: RadioConfig describing how tasks on this executor + should report task resource status An executor may optionally expose: @@ -45,11 +51,16 @@ class ParslExecutor(metaclass=ABCMeta): """ label: str = "undefined" - radio_mode: str = "udp" def __init__( self, *, + + # TODO: I'd like these two to go away but they're needed right now + # to configure the interchange monitoring radio, that is + # in addition to the submit and worker monitoring radios (!). They + # are effectivley a third monitoring radio config, though, so what + # should that look like for the interchange? hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, submit_monitoring_radio: Optional[MonitoringRadioSender] = None, @@ -58,10 +69,19 @@ def __init__( ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port + + # these are parameters for the monitoring radio to be used on the remote side + # eg. in workers - to send results back, and they should end up encapsulated + # inside a RadioConfig. self.submit_monitoring_radio = submit_monitoring_radio + self.remote_monitoring_radio_config: RadioConfig = UDPRadio() + self.run_dir = os.path.abspath(run_dir) self.run_id = run_id + # will be set externally later, which is pretty ugly + self.monitoring_receiver: Optional[MonitoringRadioReceiver] = None + def __enter__(self) -> Self: return self @@ -94,7 +114,13 @@ def shutdown(self) -> None: This includes all attached resources such as workers and controllers. """ - pass + logger.debug("Starting base executor shutdown") + # logger.error(f"BENC: monitoring receiver on {self} is {self.monitoring_receiver}") + if self.monitoring_receiver is not None: + logger.debug("Starting monitoring receiver shutdown") + self.monitoring_receiver.shutdown() + logger.debug("Done with monitoring receiver shutdown") + logger.debug("Done with base executor shutdown") def monitor_resources(self) -> bool: """Should resource monitoring happen for tasks on running on this executor? diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index a1def0466a..1d6c474d3e 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -29,6 +29,8 @@ ) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus +from parsl.monitoring.radios.base import RadioConfig +from parsl.monitoring.radios.htex import HTEXRadio from parsl.process_loggers import wrap_with_logs from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -253,11 +255,13 @@ def __init__(self, worker_logdir_root: Optional[str] = None, manager_selector: ManagerSelector = RandomManagerSelector(), block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, - encrypted: bool = False): + encrypted: bool = False, + remote_monitoring_radio_config: Optional[RadioConfig] = None): logger.debug("Initializing HighThroughputExecutor") BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler) + self.label = label self.worker_debug = worker_debug self.storage_access = storage_access @@ -300,6 +304,12 @@ def __init__(self, self._workers_per_node = 1 # our best guess-- we do not have any provider hints self._task_counter = 0 + + if remote_monitoring_radio_config is not None: + self.remote_monitoring_radio_config = remote_monitoring_radio_config + else: + self.remote_monitoring_radio_config = HTEXRadio() + self.worker_ports = worker_ports self.worker_port_range = worker_port_range self.interchange_proc: Optional[subprocess.Popen] = None @@ -322,7 +332,6 @@ def __init__(self, interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD self.interchange_launch_cmd = interchange_launch_cmd - radio_mode = "htex" enable_mpi_mode: bool = False mpi_launcher: str = "mpiexec" @@ -832,6 +841,9 @@ def shutdown(self, timeout: float = 10.0): logger.info("Closing command client") self.command_client.close() + # TODO: implement this across all executors + super().shutdown() + logger.info("Finished HighThroughputExecutor shutdown attempt") def get_usage_information(self): diff --git a/parsl/executors/high_throughput/mpi_executor.py b/parsl/executors/high_throughput/mpi_executor.py index 04b8cf5197..919fcc066f 100644 --- a/parsl/executors/high_throughput/mpi_executor.py +++ b/parsl/executors/high_throughput/mpi_executor.py @@ -15,6 +15,7 @@ from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import JobStatus from parsl.launchers import SimpleLauncher +from parsl.monitoring.radios.base import RadioConfig from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -66,7 +67,8 @@ def __init__(self, worker_logdir_root: Optional[str] = None, mpi_launcher: str = "mpiexec", block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, - encrypted: bool = False): + encrypted: bool = False, + remote_monitoring_radio_config: Optional[RadioConfig] = None): super().__init__( # Hard-coded settings cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers @@ -92,7 +94,15 @@ def __init__(self, address_probe_timeout=address_probe_timeout, worker_logdir_root=worker_logdir_root, block_error_handler=block_error_handler, - encrypted=encrypted + encrypted=encrypted, + + # TODO: + # worker-side monitoring in MPI-style code is probably going to be + # broken - resource monitoring won't see any worker processes + # most likely, as so perhaps it should have worker resource + # monitoring disabled like the thread pool executor has? + # (for related but different reasons...) + remote_monitoring_radio_config=remote_monitoring_radio_config ) self.enable_mpi_mode = True self.mpi_launcher = mpi_launcher diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index a15a444d2c..efde5dfa15 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -605,6 +605,8 @@ def shutdown(self, *args, **kwargs): self._finished_task_queue.close() self._finished_task_queue.join_thread() + super().shutdown() + logger.debug("TaskVine shutdown completed") @wrap_with_logs diff --git a/parsl/executors/threads.py b/parsl/executors/threads.py index 9b3b0df5ce..a532478f58 100644 --- a/parsl/executors/threads.py +++ b/parsl/executors/threads.py @@ -73,6 +73,7 @@ def shutdown(self, block=True): """ logger.debug("Shutting down executor, which involves waiting for running tasks to complete") self.executor.shutdown(wait=block) + super().shutdown() logger.debug("Done with executor shutdown") def monitor_resources(self): diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 155c990ab5..e1fcd42deb 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -716,6 +716,8 @@ def shutdown(self, *args, **kwargs): self.collector_queue.close() self.collector_queue.join_thread() + super().shutdown() + logger.debug("Work Queue shutdown completed") @wrap_with_logs diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 999ea84ab3..82aba51648 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -8,12 +8,13 @@ import time from multiprocessing import Event from multiprocessing.queues import Queue -from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast import typeguard from parsl.log_utils import set_file_logger from parsl.monitoring.errors import MonitoringHubStartError +from parsl.monitoring.radios.base import RadioConfig from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import TaggedMonitoringMessage @@ -121,7 +122,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No # in the future, Queue will allow runtime subscripts. if TYPE_CHECKING: - comm_q: Queue[Union[Tuple[int, int], str]] + comm_q: Queue[Union[int, str]] else: comm_q: Queue @@ -142,7 +143,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No "resource_msgs": self.resource_msgs, "exit_event": self.router_exit_event, "hub_address": self.hub_address, - "udp_port": self.hub_port, "zmq_port_range": self.hub_port_range, "run_dir": dfk_run_dir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, @@ -166,6 +166,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.filesystem_proc = ForkProcess(target=filesystem_receiver, args=(self.resource_msgs, dfk_run_dir), + # tmp should be DFK run dir... but its not wired in at the sender side... name="Monitoring-Filesystem-Process", daemon=True ) @@ -186,9 +187,23 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No logger.error("MonitoringRouter sent an error message: %s", comm_q_result) raise RuntimeError(f"MonitoringRouter failed to start: {comm_q_result}") - udp_port, zmq_port = comm_q_result + zmq_port = comm_q_result - self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) + self.zmq_port = zmq_port + + # need to initialize radio configs, perhaps first time a radio config is used + # in each executor? (can't do that at startup because executor list is dynamic, + # don't know all the executors till later) + # self.radio_config.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) + # How can this config be populated properly? + # There's a UDP port chosen right now by the monitoring router and + # sent back a line above... + # What does that look like for other radios? htexradio has no specific config at all, + # filesystem radio has a path (that should have been created?) for config, and a loop + # that needs to be running, started in this start method. + # so something like... radio_config.receive() generates the appropriate receiver object? + # which has a shutdown method on it for later. and also updates radio_config itself so + # it has the right info to send across the wire? or some state driving like that? logger.info("Monitoring Hub initialized") @@ -218,7 +233,7 @@ def close(self) -> None: ) self.router_proc.terminate() self.dbm_proc.terminate() - self.filesystem_proc.terminate() + # self.filesystem_proc.terminate() logger.info("Setting router termination event") self.router_exit_event.set() logger.info("Waiting for router to terminate") @@ -238,9 +253,9 @@ def close(self) -> None: # should this be message based? it probably doesn't need to be if # we believe we've received all messages logger.info("Terminating filesystem radio receiver process") - self.filesystem_proc.terminate() - self.filesystem_proc.join() - self.filesystem_proc.close() + # self.filesystem_proc.terminate() + # self.filesystem_proc.join() + # self.filesystem_proc.close() logger.info("Closing monitoring multiprocessing queues") self.exception_q.close() @@ -249,6 +264,17 @@ def close(self) -> None: self.resource_msgs.join_thread() logger.info("Closed monitoring multiprocessing queues") + def start_receiver(self, radio_config: RadioConfig, ip: str, run_dir: str) -> Any: + """somehow start a radio receiver here and update radioconfig to be sent over the wire, without + losing the info we need to shut down that receiver later... + """ + r = radio_config.create_receiver(ip=ip, run_dir=run_dir, resource_msgs=self.resource_msgs) + logger.info(f"BENC: created receiver {r}") + # assert r is not None + return r + # ... that is, a thing we need to do a shutdown call on at shutdown, a "shutdownable"? without + # expecting any more structure on it? + @wrap_with_logs def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None: diff --git a/parsl/monitoring/radios/base.py b/parsl/monitoring/radios/base.py index 2bb799f256..f54a03b635 100644 --- a/parsl/monitoring/radios/base.py +++ b/parsl/monitoring/radios/base.py @@ -1,13 +1,34 @@ -import logging from abc import ABCMeta, abstractmethod -from typing import Optional +from multiprocessing.queues import Queue +from typing import Any -_db_manager_excepts: Optional[Exception] -logger = logging.getLogger(__name__) +class MonitoringRadioReceiver(metaclass=ABCMeta): + @abstractmethod + def shutdown(self) -> None: + pass class MonitoringRadioSender(metaclass=ABCMeta): @abstractmethod def send(self, message: object) -> None: pass + + +class RadioConfig(metaclass=ABCMeta): + """Base class for radio plugin configuration. + """ + @abstractmethod + def create_sender(self) -> MonitoringRadioSender: + pass + + @abstractmethod + def create_receiver(self, *, ip: str, run_dir: str, resource_msgs: Queue) -> Any: + # TODO: return a shutdownable, and probably take some context to help in + # creation of the radio config? esp. the ZMQ endpoint to send messages to + # from the receiving process that might be created? + """create a receiver for this config, and update this config as + appropriate so that create_sender will be able to connect back to that + receiver in whichever way is relevant. create_sender can assume + that create_receiver has been called.""" + pass diff --git a/parsl/monitoring/radios/filesystem.py b/parsl/monitoring/radios/filesystem.py index accff87d36..3ecdd1ae63 100644 --- a/parsl/monitoring/radios/filesystem.py +++ b/parsl/monitoring/radios/filesystem.py @@ -2,12 +2,22 @@ import os import pickle import uuid +from multiprocessing import Queue +from typing import Any -from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender, RadioConfig logger = logging.getLogger(__name__) +class FilesystemRadio(RadioConfig): + def create_sender(self) -> MonitoringRadioSender: + return FilesystemRadioSender(run_dir=self.run_dir) + + def create_receiver(self, *, ip: str, run_dir: str, resource_msgs: Queue) -> Any: + self.run_dir = run_dir + + class FilesystemRadioSender(MonitoringRadioSender): """A MonitoringRadioSender that sends messages over a shared filesystem. @@ -26,7 +36,7 @@ class FilesystemRadioSender(MonitoringRadioSender): the UDP radio, but should be much more reliable. """ - def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): + def __init__(self, *, run_dir: str): logger.info("filesystem based monitoring channel initializing") self.base_path = f"{run_dir}/monitor-fs-radio/" self.tmp_path = f"{self.base_path}/tmp" diff --git a/parsl/monitoring/radios/htex.py b/parsl/monitoring/radios/htex.py index bdb893b303..30e26bd1bd 100644 --- a/parsl/monitoring/radios/htex.py +++ b/parsl/monitoring/radios/htex.py @@ -1,22 +1,29 @@ import logging import pickle +from multiprocessing.queues import Queue -from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender, RadioConfig logger = logging.getLogger(__name__) +class HTEXRadio(RadioConfig): + def create_sender(self) -> MonitoringRadioSender: + return HTEXRadioSender() + + def create_receiver(self, *, ip: str, run_dir: str, resource_msgs: Queue) -> None: + pass + + class HTEXRadioSender(MonitoringRadioSender): - def __init__(self, monitoring_url: str, timeout: int = 10): + def __init__(self) -> None: """ Parameters ---------- monitoring_url : str URL of the form ://: - timeout : int - timeout, default=10s """ logger.info("htex-based monitoring channel initialising") diff --git a/parsl/monitoring/radios/udp.py b/parsl/monitoring/radios/udp.py index f2a652e9ac..36e91d062e 100644 --- a/parsl/monitoring/radios/udp.py +++ b/parsl/monitoring/radios/udp.py @@ -1,29 +1,105 @@ import logging +import multiprocessing as mp import pickle import socket +import threading +import time +from multiprocessing.queues import Queue +from typing import Any, Optional, Union -from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.base import ( + MonitoringRadioReceiver, + MonitoringRadioSender, + RadioConfig, +) +from parsl.process_loggers import wrap_with_logs + +logger = logging.getLogger(__name__) + + +class UDPRadio(RadioConfig): + ip: str + + # these two values need to be initialized by a create_receiver step... + # which is why an earlier patch needs to turn the UDP and zmq receivers + # into separate threads that can exist separately + + # TODO: this atexit_timeout is now user exposed - in the prior UDP-in-router impl, I think maybe it wasn't (but i should check) + def __init__(self, *, port: Optional[int] = None, atexit_timeout: Union[int, float] = 3): + # TODO awkward: when a user creates this it can be none, + # but after create_receiver initalization it is always an int. + # perhaps leads to motivation of serializable config being its + # own class distinct from the user-specified RadioConfig object? + # Right now, there would be a type-error in create_sender except + # for an assert that asserts this reasoning to mypy. + self.port = port + self.atexit_timeout = atexit_timeout + + def create_sender(self) -> MonitoringRadioSender: + assert self.port is not None, "self.port should have been initialized by create_receiver" + return UDPRadioSender(self.ip, self.port) + + def create_receiver(self, ip: str, run_dir: str, resource_msgs: Queue) -> Any: + """TODO: backwards compatibility would require a single one of these to + exist for all executors that want one, shut down when the last of its + users asks for shut down... in the case that udp_port is specified. + + But maybe the right thing to do here is lose that configuration parameter + in that form? especially as I'd like UDPRadio to go away entirely because + UDP isn't reliable or secure and other code requires reliability of messaging? + """ + + # we could bind to this instead of 0.0.0.0 but that would change behaviour, + # possibly breaking if the IP address isn't bindable (for example, if its + # a port forward). Otherwise, it isn't needed for creation of the listening + # port - only for creation of the sender. + self.ip = ip + + udp_sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) + + # We are trying to bind to all interfaces with 0.0.0.0 + if self.port is None: + udp_sock.bind(('0.0.0.0', 0)) + self.port = udp_sock.getsockname()[1] + else: + try: + udp_sock.bind(('0.0.0.0', self.port)) + except Exception as e: + # TODO: this can be its own patch to use 'from' notation? + raise RuntimeError(f"Could not bind to UDP port {self.port}") from e + udp_sock.settimeout(0.001) # TODO: configurable loop_freq? it's hard-coded though... + logger.info(f"Initialized the UDP socket on port {self.port}") + + # this is now in the submitting process, not the router process. + # I don't think this matters for UDP so much because it's on the + # way out - but how should this work for other things? compare with + # filesystem radio impl? + logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = UDPRadioReceiverThread(udp_sock=udp_sock, resource_msgs=resource_msgs, atexit_timeout=self.atexit_timeout) + udp_radio_receiver_thread.start() + + return udp_radio_receiver_thread + # TODO: wrap this with proper shutdown logic involving events etc? class UDPRadioSender(MonitoringRadioSender): - def __init__(self, monitoring_url: str, timeout: int = 10): + def __init__(self, ip: str, port: int, timeout: int = 10) -> None: """ Parameters ---------- + XXX TODO monitoring_url : str URL of the form ://: timeout : int timeout, default=10s """ - self.monitoring_url = monitoring_url self.sock_timeout = timeout - try: - self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) - self.port = int(port) - except Exception: - raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) + self.ip = ip + self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, @@ -42,6 +118,7 @@ def send(self, message: object) -> None: Returns: None """ + logger.info("Starting UDP radio message send") try: buffer = pickle.dumps(message) except Exception: @@ -53,4 +130,49 @@ def send(self, message: object) -> None: except socket.timeout: logging.error("Could not send message within timeout limit") return + logger.info("Normal ending for UDP radio message send") return + + +class UDPRadioReceiverThread(threading.Thread, MonitoringRadioReceiver): + def __init__(self, udp_sock: socket.socket, resource_msgs: Queue, atexit_timeout: Union[int, float]): + self.exit_event = mp.Event() + self.udp_sock = udp_sock + self.resource_msgs = resource_msgs + self.atexit_timeout = atexit_timeout + super().__init__() + + @wrap_with_logs + def run(self) -> None: + try: + while not self.exit_event.is_set(): + try: + data, addr = self.udp_sock.recvfrom(2048) + resource_msg = pickle.loads(data) + logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) + self.resource_msgs.put(resource_msg) + except socket.timeout: + pass + + logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + logger.info("UDP listener finishing normally") + finally: + logger.info("UDP listener finished") + + def shutdown(self) -> None: + logger.debug("Set exit event") + self.exit_event.set() + logger.debug("Joining") + self.join() + logger.debug("done") diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 530b39f935..fe1e62b153 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -7,10 +7,7 @@ from typing import Any, Callable, Dict, List, Sequence, Tuple from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios.base import MonitoringRadioSender -from parsl.monitoring.radios.filesystem import FilesystemRadioSender -from parsl.monitoring.radios.htex import HTEXRadioSender -from parsl.monitoring.radios.udp import UDPRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender, RadioConfig from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs @@ -23,11 +20,10 @@ def monitor_wrapper(*, kwargs: Dict, # per invocation x_try_id: int, # per invocation x_task_id: int, # per invocation - monitoring_hub_url: str, # per workflow + radio_config: RadioConfig, # per executor run_id: str, # per workflow logging_level: int, # per workflow sleep_dur: float, # per workflow - radio_mode: str, # per executor monitor_resources: bool, # per workflow run_dir: str) -> Tuple[Callable, Sequence, Dict]: """Wrap the Parsl app with a function that will call the monitor function and point it at the correct pid when the task begins. @@ -41,9 +37,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: # Send first message to monitoring router send_first_message(try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, run_dir) if monitor_resources and sleep_dur > 0: @@ -52,9 +47,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: args=(os.getpid(), try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, logging_level, sleep_dur, run_dir, @@ -87,9 +81,9 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: send_last_message(try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, run_dir) + run_dir) new_kwargs = kwargs.copy() new_kwargs['_parsl_monitoring_task_id'] = x_task_id @@ -98,47 +92,43 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) -def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender: +def get_radio(radio_config: RadioConfig, task_id: int) -> MonitoringRadioSender: + + # TODO: maybe this function will end up simple enough to eliminate + radio: MonitoringRadioSender - if radio_mode == "udp": - radio = UDPRadioSender(monitoring_hub_url) - elif radio_mode == "htex": - radio = HTEXRadioSender(monitoring_hub_url) - elif radio_mode == "filesystem": - radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url, - run_dir=run_dir) - else: - raise RuntimeError(f"Unknown radio mode: {radio_mode}") + radio = radio_config.create_sender() + return radio @wrap_with_logs def send_first_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str) -> None: - send_first_last_message(try_id, task_id, monitoring_hub_url, run_id, - radio_mode, run_dir, False) + radio_config: RadioConfig, + run_id: str, run_dir: str) -> None: + send_first_last_message(try_id, task_id, radio_config, run_id, + run_dir, False) @wrap_with_logs def send_last_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str) -> None: - send_first_last_message(try_id, task_id, monitoring_hub_url, run_id, - radio_mode, run_dir, True) + radio_config: RadioConfig, + run_id: str, run_dir: str) -> None: + send_first_last_message(try_id, task_id, radio_config, run_id, + run_dir, True) def send_first_last_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str, + radio_config: RadioConfig, + run_id: str, run_dir: str, is_last: bool) -> None: import os import platform - radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) + radio = get_radio(radio_config, task_id) msg = (MessageType.RESOURCE_INFO, {'run_id': run_id, @@ -158,9 +148,8 @@ def send_first_last_message(try_id: int, def monitor(pid: int, try_id: int, task_id: int, - monitoring_hub_url: str, + radio_config: RadioConfig, run_id: str, - radio_mode: str, logging_level: int, sleep_dur: float, run_dir: str, @@ -184,7 +173,7 @@ def monitor(pid: int, setproctitle("parsl: task resource monitor") - radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) + radio = get_radio(radio_config, task_id) logging.debug("start of monitor") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index d01913c2f7..dbc472407c 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -3,12 +3,10 @@ import logging import multiprocessing.queues as mpq import os -import pickle -import socket import threading import time from multiprocessing.synchronize import Event -from typing import Optional, Tuple +from typing import Tuple import typeguard import zmq @@ -27,7 +25,6 @@ class MonitoringRouter: def __init__(self, *, hub_address: str, - udp_port: Optional[int] = None, zmq_port_range: Tuple[int, int] = (55050, 56000), monitoring_hub_address: str = "127.0.0.1", @@ -43,8 +40,6 @@ def __init__(self, ---------- hub_address : str The ip address at which the workers will be able to reach the Hub. - udp_port : int - The specific port at which workers will be able to reach the Hub via UDP. Default: None zmq_port_range : tuple(int, int) The MonitoringHub picks ports at random from the range which will be used by Hub. Default: (55050, 56000) @@ -74,24 +69,6 @@ def __init__(self, self.loop_freq = 10.0 # milliseconds - # Initialize the UDP socket - self.udp_sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) - - # We are trying to bind to all interfaces with 0.0.0.0 - if not udp_port: - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_port = self.udp_sock.getsockname()[1] - else: - self.udp_port = udp_port - try: - self.udp_sock.bind(('0.0.0.0', self.udp_port)) - except Exception as e: - raise RuntimeError(f"Could not bind to udp_port {udp_port} because: {e}") - self.udp_sock.settimeout(self.loop_freq / 1000) - self.logger.info("Initialized the UDP socket on 0.0.0.0:{}".format(self.udp_port)) - self._context = zmq.Context() self.zmq_receiver_channel = self._context.socket(zmq.DEALER) self.zmq_receiver_channel.setsockopt(zmq.LINGER, 0) @@ -107,47 +84,13 @@ def __init__(self, @wrap_with_logs(target="monitoring_router") def start(self) -> None: - self.logger.info("Starting UDP listener thread") - udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True) - udp_radio_receiver_thread.start() - self.logger.info("Starting ZMQ listener thread") zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True) zmq_radio_receiver_thread.start() self.logger.info("Joining on ZMQ listener thread") zmq_radio_receiver_thread.join() - self.logger.info("Joining on UDP listener thread") - udp_radio_receiver_thread.join() - self.logger.info("Joined on both ZMQ and UDP listener threads") - - @wrap_with_logs(target="monitoring_router") - def start_udp_listener(self) -> None: - try: - while not self.exit_event.is_set(): - try: - data, addr = self.udp_sock.recvfrom(2048) - resource_msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - self.target_radio.send(resource_msg) - except socket.timeout: - pass - - self.logger.info("UDP listener draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.target_radio.send(msg) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("UDP listener finishing normally") - finally: - self.logger.info("UDP listener finished") + self.logger.info("Joined on ZMQ listener thread") @wrap_with_logs(target="monitoring_router") def start_zmq_listener(self) -> None: @@ -188,7 +131,6 @@ def router_starter(*, exit_event: Event, hub_address: str, - udp_port: Optional[int], zmq_port_range: Tuple[int, int], run_dir: str, @@ -196,7 +138,6 @@ def router_starter(*, setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, - udp_port=udp_port, zmq_port_range=zmq_port_range, run_dir=run_dir, logging_level=logging_level, @@ -206,7 +147,7 @@ def router_starter(*, logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") else: - comm_q.put((router.udp_port, router.zmq_receiver_port)) + comm_q.put(router.zmq_receiver_port) router.logger.info("Starting MonitoringRouter in router_starter") try: diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 63e5c45201..18a339e5cc 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -263,6 +263,8 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session): dfk.cleanup() assert DataFlowKernelLoader._dfk is None + # assert [t for t in threading.enumerate() if "MainThread" not in t.name and "HTEX-Queue-Management-Thread" not in t.name] == [] + else: yield diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index 9ffa21df01..8af899fb05 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -8,6 +8,9 @@ from parsl.config import Config from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig from parsl.monitoring import MonitoringHub +from parsl.monitoring.radios.filesystem import FilesystemRadio +from parsl.monitoring.radios.htex import HTEXRadio +from parsl.monitoring.radios.udp import UDPRadio @parsl.python_app @@ -35,9 +38,10 @@ def htex_udp_config(): from parsl.tests.configs.htex_local_alternate import fresh_config c = fresh_config() assert len(c.executors) == 1 + ex = c.executors[0] - assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" - c.executors[0].radio_mode = "udp" + assert isinstance(ex.remote_monitoring_radio_config, HTEXRadio), "precondition: htex is configured for the HTEXRadio" + ex.remote_monitoring_radio_config = UDPRadio() return c @@ -47,9 +51,10 @@ def htex_filesystem_config(): from parsl.tests.configs.htex_local_alternate import fresh_config c = fresh_config() assert len(c.executors) == 1 + ex = c.executors[0] - assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" - c.executors[0].radio_mode = "filesystem" + assert isinstance(ex.remote_monitoring_radio_config, HTEXRadio), "precondition: htex is configured for the HTEXRadio" + ex.remote_monitoring_radio_config = FilesystemRadio() return c