From 2fa5e87e87ee477785b735c8005003668e034101 Mon Sep 17 00:00:00 2001 From: tjgalvin Date: Fri, 27 Dec 2024 18:04:06 +1100 Subject: [PATCH] Add a timelimit context handler, eye towards BANE deadlocks (#200) * added a timelimit context manager * expanded test * added a list for collections cast * added to changelog * fixed a test --------- Co-authored-by: tgalvin --- CHANGELOG.md | 2 ++ flint/coadd/linmos.py | 4 +++- flint/exceptions.py | 24 ++++++++++++++++------ flint/prefect/common/imaging.py | 24 ++++++++++++++++------ flint/utils.py | 35 +++++++++++++++++++++++++++++++++ tests/test_linmos_coadd.py | 10 +++++++++- tests/test_utils.py | 25 +++++++++++++++++++++++ 7 files changed, 110 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c9d35bd..ddc8e02e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ - subtract flow will remove files whenever possible (remove original files after convolving, removing convolved files after linmos, remove channel linmos images after combining into a cube, removing the weight text files) +- Added a `timelimit_on_context` helper to raise an error after some specified + length of time. Looking at you, BANE and issue #186. Arrrr. # 0.2.8 diff --git a/flint/coadd/linmos.py b/flint/coadd/linmos.py index 423fd73c..31f4ec33 100644 --- a/flint/coadd/linmos.py +++ b/flint/coadd/linmos.py @@ -526,7 +526,9 @@ def generate_linmos_parameter_set( ) # Construct the holography section of the linmos parset parset += _get_holography_linmos_options( - holofile=holofile, pol_axis=pol_axis, remove_leakage=".i." not in str(images[0]) + holofile=holofile, + pol_axis=pol_axis, + remove_leakage=".i." not in str(list(images)[0]), ) # Now write the file, me hearty diff --git a/flint/exceptions.py b/flint/exceptions.py index c07252a2..60989675 100644 --- a/flint/exceptions.py +++ b/flint/exceptions.py @@ -1,30 +1,42 @@ -class MSError(Exception): +class FlintException(Exception): + """Base exception for Flint""" + + pass + + +class TimeLimitException(FlintException): + """A function has taken too long to execute""" + + pass + + +class MSError(FlintException): """An error for MS related things""" pass -class FrequencyMismatchError(Exception): +class FrequencyMismatchError(FlintException): """Raised when there are differences in frequencies""" -class PhaseOutlierFitError(Exception): +class PhaseOutlierFitError(FlintException): """Raised when the phase outlier fit routine fails.""" pass -class GainCalError(Exception): +class GainCalError(FlintException): """Raised when it appears like the casa gaincal task fails.""" pass -class CleanDivergenceError(Exception): +class CleanDivergenceError(FlintException): """Raised if it is detected that cleaning has diverged.""" pass -class TarArchiveError(Exception): +class TarArchiveError(FlintException): """Raised it the flint tarball is not created successfullty""" diff --git a/flint/prefect/common/imaging.py b/flint/prefect/common/imaging.py index aedc1a85..f3fdb004 100644 --- a/flint/prefect/common/imaging.py +++ b/flint/prefect/common/imaging.py @@ -163,15 +163,24 @@ def task_extract_solution_path(calibrate_cmd: CalibrateCommand) -> Path: # BANE sometimes gets cauht in some stalled staTE -@task(timeout_seconds=60 * 45, retries=3) +@task(retries=3) def task_run_bane_and_aegean( - image: Union[WSCleanCommand, LinmosCommand], aegean_container: Path + image: Union[WSCleanCommand, LinmosCommand], + aegean_container: Path, + timelimit_seconds: Union[int, float] = 60 * 45, ) -> AegeanOutputs: - """Run BANE and Aegean against a FITS image + """Run BANE and Aegean against a FITS image. + + Notes: + It has been noted that BANE can sometimes get caught in a interpolation error which haults execution. + The ``timelimit_seconds`` will attempt to detect long runnings BANE processes and raise an error. The + retry functionality of prefect should then restart the task. Since this task is pure (e.g. no last + dataproducts, modification to data etc) simply restarting should be fine. Args: image (Union[WSCleanCommand, LinmosCommand]): The image that will be searched aegean_container (Path): Path to a singularity container containing BANE and aegean + timelimit_seconds (Union[int,float], optional): The maximum amount of time, in seconds, before an exception is raised. Defaults to 45*60. Raises: ValueError: Raised when ``image`` is not a supported type @@ -200,9 +209,12 @@ def task_run_bane_and_aegean( else: raise ValueError(f"Unexpected type, have received {type(image)} for {image=}. ") - aegean_outputs = run_bane_and_aegean( - image=image_path, aegean_container=aegean_container - ) + from flint.utils import timelimit_on_context + + with timelimit_on_context(timelimit_seconds=timelimit_seconds): + aegean_outputs = run_bane_and_aegean( + image=image_path, aegean_container=aegean_container + ) return aegean_outputs diff --git a/flint/utils.py b/flint/utils.py index 3487f00f..c7fc0cc6 100644 --- a/flint/utils.py +++ b/flint/utils.py @@ -5,6 +5,7 @@ import datetime import os import shutil +import signal import subprocess from contextlib import contextmanager from pathlib import Path @@ -18,6 +19,7 @@ from astropy.wcs import WCS from flint.convol import BeamShape +from flint.exceptions import TimeLimitException from flint.logging import logger # TODO: This Captain is aware that there is a common fits getheader between @@ -27,6 +29,39 @@ # that only opens the FITS file once and places things into common field names. +def _signal_timelimit_handler(*args): + raise TimeLimitException + + +@contextmanager +def timelimit_on_context( + timelimit_seconds: Union[int, float], +) -> Generator[None, None, None]: + """Creates a context manager that will raise ``flint.exceptions.TimelimitException`` + should the control not leave the ``with`` context within an specified amount of time. + + Args: + timelimit_seconds (Union[int,float]): The maximum time allowed for the with context to be escaped + + Raises: + TimeLimitException: Raised should the maximum timelimit be violated. + + Yields: + Generator[None, None, None]: A generating function that returns nothing + """ + signal.signal(signal.SIGALRM, _signal_timelimit_handler) + signal.alarm(int(timelimit_seconds)) + logger.info(f"Setting a timelimit of {timelimit_seconds=}") + + try: + yield + except TimeLimitException: + logger.info(f"Timeout limit of {timelimit_seconds=} reached") + raise TimeLimitException + + signal.alarm(0) + + @contextmanager def hold_then_move_into( move_directory: Path, diff --git a/tests/test_linmos_coadd.py b/tests/test_linmos_coadd.py index 15aa2cee..b37265a1 100644 --- a/tests/test_linmos_coadd.py +++ b/tests/test_linmos_coadd.py @@ -148,12 +148,20 @@ def test_linmos_holo_options(tmpdir): parset = _get_holography_linmos_options(holofile=holofile, pol_axis=None) assert "linmos.primarybeam = ASKAP_PB\n" in parset - assert "linmos.removeleakage = true\n" in parset + assert "linmos.removeleakage = false\n" in parset assert f"linmos.primarybeam.ASKAP_PB.image = {str(holofile.absolute())}\n" in parset assert "linmos.primarybeam.ASKAP_PB.alpha" not in parset parset = _get_holography_linmos_options(holofile=holofile, pol_axis=np.deg2rad(-45)) assert "linmos.primarybeam = ASKAP_PB\n" in parset + assert "linmos.removeleakage = false\n" in parset + assert f"linmos.primarybeam.ASKAP_PB.image = {str(holofile.absolute())}\n" in parset + assert "linmos.primarybeam.ASKAP_PB.alpha" in parset + + parset = _get_holography_linmos_options( + holofile=holofile, remove_leakage=True, pol_axis=np.deg2rad(-45) + ) + assert "linmos.primarybeam = ASKAP_PB\n" in parset assert "linmos.removeleakage = true\n" in parset assert f"linmos.primarybeam.ASKAP_PB.image = {str(holofile.absolute())}\n" in parset assert "linmos.primarybeam.ASKAP_PB.alpha" in parset diff --git a/tests/test_utils.py b/tests/test_utils.py index 0200ba46..378b143d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,6 +3,7 @@ import math import os import shutil +import time from pathlib import Path from typing import Any @@ -14,6 +15,7 @@ from astropy.wcs import WCS from flint.convol import BeamShape +from flint.exceptions import TimeLimitException from flint.logging import logger from flint.utils import ( SlurmInfo, @@ -29,9 +31,32 @@ hold_then_move_into, log_job_environment, temporarily_move_into, + timelimit_on_context, ) +def some_long_function(minimum_time=5): + t1 = time.time() + while time.time() - t1 < minimum_time: + sum = 0 + for i in range(1000): + sum = sum + 1 + print(f"Time taken: {time.time() - t1} seconds") + + +def test_timelimit_on_context(): + """Raise an error should a function take longer than expected to run""" + with pytest.raises(TimeLimitException): + with timelimit_on_context(timelimit_seconds=1): + some_long_function(minimum_time=20) + + with timelimit_on_context(timelimit_seconds=5): + some_long_function(minimum_time=1) + + # This should make sure that the signal is not raised after the context left + some_long_function(minimum_time=5) + + @pytest.fixture(scope="session", autouse=True) def set_slurm_env(): """Set up variables for a specific test"""