Skip to content

Commit

Permalink
Remove monitoring vs serialization cache as causes space explosion
Browse files Browse the repository at this point in the history
See issue #2848 for a bunch of description and benchmarking.
  • Loading branch information
benclifford committed Aug 3, 2023
1 parent 9783b79 commit 7dc8810
Showing 1 changed file with 56 additions and 72 deletions.
128 changes: 56 additions & 72 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@

logger = logging.getLogger(__name__)

monitoring_wrapper_cache: Dict
monitoring_wrapper_cache = {}


def monitor_wrapper(f: Any, # per app
args: Sequence, # per invocation
Expand All @@ -33,76 +30,63 @@ def monitor_wrapper(f: Any, # per app
"""Wrap the Parsl app with a function that will call the monitor function and point it at the correct pid when the task begins.
"""

# this makes assumptions that when subsequently executed with the same
# cache key, then the relevant parameters will not have changed from the
# first invocation with that cache key (otherwise, the resulting cached
# closure will be incorrectly cached)
cache_key = (run_id, f, radio_mode)

if cache_key in monitoring_wrapper_cache:
wrapped = monitoring_wrapper_cache[cache_key]

else:
@wraps(f)
def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
task_id = kwargs.pop('_parsl_monitoring_task_id')
try_id = kwargs.pop('_parsl_monitoring_try_id')
terminate_event = Event()
# Send first message to monitoring router
send_first_message(try_id,
task_id,
monitoring_hub_url,
run_id,
radio_mode,
run_dir)

if monitor_resources:
# create the monitor process and start
pp = ForkProcess(target=monitor,
args=(os.getpid(),
try_id,
task_id,
monitoring_hub_url,
run_id,
radio_mode,
logging_level,
sleep_dur,
run_dir,
terminate_event),
daemon=True,
name="Monitor-Wrapper-{}".format(task_id))
pp.start()
p = pp
# TODO: awkwardness because ForkProcess is not directly a constructor
# and type-checking is expecting p to be optional and cannot
# narrow down the type of p in this block.

else:
p = None

@wraps(f)
def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
task_id = kwargs.pop('_parsl_monitoring_task_id')
try_id = kwargs.pop('_parsl_monitoring_try_id')
terminate_event = Event()
# Send first message to monitoring router
send_first_message(try_id,
task_id,
monitoring_hub_url,
run_id,
radio_mode,
run_dir)

if monitor_resources:
# create the monitor process and start
pp = ForkProcess(target=monitor,
args=(os.getpid(),
try_id,
task_id,
monitoring_hub_url,
run_id,
radio_mode,
logging_level,
sleep_dur,
run_dir,
terminate_event),
daemon=True,
name="Monitor-Wrapper-{}".format(task_id))
pp.start()
p = pp
# TODO: awkwardness because ForkProcess is not directly a constructor
# and type-checking is expecting p to be optional and cannot
# narrow down the type of p in this block.

else:
p = None

try:
return f(*args, **kwargs)
finally:
# There's a chance of zombification if the workers are killed by some signals (?)
if p:
terminate_event.set()
p.join(30) # 30 second delay for this -- this timeout will be hit in the case of an unusually long end-of-loop
if p.exitcode is None:
logger.warn("Event-based termination of monitoring helper took too long. Using process-based termination.")
p.terminate()
# DANGER: this can corrupt shared queues according to docs.
# So, better that the above termination event worked.
# This is why this log message is a warning
p.join()

send_last_message(try_id,
task_id,
monitoring_hub_url,
run_id,
radio_mode, run_dir)

monitoring_wrapper_cache[cache_key] = wrapped
try:
return f(*args, **kwargs)
finally:
# There's a chance of zombification if the workers are killed by some signals (?)
if p:
terminate_event.set()
p.join(30) # 30 second delay for this -- this timeout will be hit in the case of an unusually long end-of-loop
if p.exitcode is None:
logger.warn("Event-based termination of monitoring helper took too long. Using process-based termination.")
p.terminate()
# DANGER: this can corrupt shared queues according to docs.
# So, better that the above termination event worked.
# This is why this log message is a warning
p.join()

send_last_message(try_id,
task_id,
monitoring_hub_url,
run_id,
radio_mode, run_dir)

new_kwargs = kwargs.copy()
new_kwargs['_parsl_monitoring_task_id'] = x_task_id
Expand Down

0 comments on commit 7dc8810

Please sign in to comment.