Skip to content

Commit

Permalink
Add a timelimit context handler, eye towards BANE deadlocks (#200)
Browse files Browse the repository at this point in the history
* added a timelimit context manager

* expanded test

* added a list for collections cast

* added to changelog

* fixed a test

---------

Co-authored-by: tgalvin <[email protected]>
  • Loading branch information
tjgalvin and tgalvin authored Dec 27, 2024
1 parent c03a5ce commit 2fa5e87
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
- subtract flow will remove files whenever possible (remove original files
after convolving, removing convolved files after linmos, remove channel
linmos images after combining into a cube, removing the weight text files)
- Added a `timelimit_on_context` helper to raise an error after some specified
length of time. Looking at you, BANE and issue #186. Arrrr.

# 0.2.8

Expand Down
4 changes: 3 additions & 1 deletion flint/coadd/linmos.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,9 @@ def generate_linmos_parameter_set(
)
# Construct the holography section of the linmos parset
parset += _get_holography_linmos_options(
holofile=holofile, pol_axis=pol_axis, remove_leakage=".i." not in str(images[0])
holofile=holofile,
pol_axis=pol_axis,
remove_leakage=".i." not in str(list(images)[0]),
)

# Now write the file, me hearty
Expand Down
24 changes: 18 additions & 6 deletions flint/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,42 @@
class MSError(Exception):
class FlintException(Exception):
"""Base exception for Flint"""

pass


class TimeLimitException(FlintException):
"""A function has taken too long to execute"""

pass


class MSError(FlintException):
"""An error for MS related things"""

pass


class FrequencyMismatchError(Exception):
class FrequencyMismatchError(FlintException):
"""Raised when there are differences in frequencies"""


class PhaseOutlierFitError(Exception):
class PhaseOutlierFitError(FlintException):
"""Raised when the phase outlier fit routine fails."""

pass


class GainCalError(Exception):
class GainCalError(FlintException):
"""Raised when it appears like the casa gaincal task fails."""

pass


class CleanDivergenceError(Exception):
class CleanDivergenceError(FlintException):
"""Raised if it is detected that cleaning has diverged."""

pass


class TarArchiveError(Exception):
class TarArchiveError(FlintException):
"""Raised it the flint tarball is not created successfullty"""
24 changes: 18 additions & 6 deletions flint/prefect/common/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,24 @@ def task_extract_solution_path(calibrate_cmd: CalibrateCommand) -> Path:


# BANE sometimes gets cauht in some stalled staTE
@task(timeout_seconds=60 * 45, retries=3)
@task(retries=3)
def task_run_bane_and_aegean(
image: Union[WSCleanCommand, LinmosCommand], aegean_container: Path
image: Union[WSCleanCommand, LinmosCommand],
aegean_container: Path,
timelimit_seconds: Union[int, float] = 60 * 45,
) -> AegeanOutputs:
"""Run BANE and Aegean against a FITS image
"""Run BANE and Aegean against a FITS image.
Notes:
It has been noted that BANE can sometimes get caught in a interpolation error which haults execution.
The ``timelimit_seconds`` will attempt to detect long runnings BANE processes and raise an error. The
retry functionality of prefect should then restart the task. Since this task is pure (e.g. no last
dataproducts, modification to data etc) simply restarting should be fine.
Args:
image (Union[WSCleanCommand, LinmosCommand]): The image that will be searched
aegean_container (Path): Path to a singularity container containing BANE and aegean
timelimit_seconds (Union[int,float], optional): The maximum amount of time, in seconds, before an exception is raised. Defaults to 45*60.
Raises:
ValueError: Raised when ``image`` is not a supported type
Expand Down Expand Up @@ -200,9 +209,12 @@ def task_run_bane_and_aegean(
else:
raise ValueError(f"Unexpected type, have received {type(image)} for {image=}. ")

aegean_outputs = run_bane_and_aegean(
image=image_path, aegean_container=aegean_container
)
from flint.utils import timelimit_on_context

with timelimit_on_context(timelimit_seconds=timelimit_seconds):
aegean_outputs = run_bane_and_aegean(
image=image_path, aegean_container=aegean_container
)

return aegean_outputs

Expand Down
35 changes: 35 additions & 0 deletions flint/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime
import os
import shutil
import signal
import subprocess
from contextlib import contextmanager
from pathlib import Path
Expand All @@ -18,6 +19,7 @@
from astropy.wcs import WCS

from flint.convol import BeamShape
from flint.exceptions import TimeLimitException
from flint.logging import logger

# TODO: This Captain is aware that there is a common fits getheader between
Expand All @@ -27,6 +29,39 @@
# that only opens the FITS file once and places things into common field names.


def _signal_timelimit_handler(*args):
raise TimeLimitException


@contextmanager
def timelimit_on_context(
timelimit_seconds: Union[int, float],
) -> Generator[None, None, None]:
"""Creates a context manager that will raise ``flint.exceptions.TimelimitException``
should the control not leave the ``with`` context within an specified amount of time.
Args:
timelimit_seconds (Union[int,float]): The maximum time allowed for the with context to be escaped
Raises:
TimeLimitException: Raised should the maximum timelimit be violated.
Yields:
Generator[None, None, None]: A generating function that returns nothing
"""
signal.signal(signal.SIGALRM, _signal_timelimit_handler)
signal.alarm(int(timelimit_seconds))
logger.info(f"Setting a timelimit of {timelimit_seconds=}")

try:
yield
except TimeLimitException:
logger.info(f"Timeout limit of {timelimit_seconds=} reached")
raise TimeLimitException

signal.alarm(0)


@contextmanager
def hold_then_move_into(
move_directory: Path,
Expand Down
10 changes: 9 additions & 1 deletion tests/test_linmos_coadd.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,20 @@ def test_linmos_holo_options(tmpdir):

parset = _get_holography_linmos_options(holofile=holofile, pol_axis=None)
assert "linmos.primarybeam = ASKAP_PB\n" in parset
assert "linmos.removeleakage = true\n" in parset
assert "linmos.removeleakage = false\n" in parset
assert f"linmos.primarybeam.ASKAP_PB.image = {str(holofile.absolute())}\n" in parset
assert "linmos.primarybeam.ASKAP_PB.alpha" not in parset

parset = _get_holography_linmos_options(holofile=holofile, pol_axis=np.deg2rad(-45))
assert "linmos.primarybeam = ASKAP_PB\n" in parset
assert "linmos.removeleakage = false\n" in parset
assert f"linmos.primarybeam.ASKAP_PB.image = {str(holofile.absolute())}\n" in parset
assert "linmos.primarybeam.ASKAP_PB.alpha" in parset

parset = _get_holography_linmos_options(
holofile=holofile, remove_leakage=True, pol_axis=np.deg2rad(-45)
)
assert "linmos.primarybeam = ASKAP_PB\n" in parset
assert "linmos.removeleakage = true\n" in parset
assert f"linmos.primarybeam.ASKAP_PB.image = {str(holofile.absolute())}\n" in parset
assert "linmos.primarybeam.ASKAP_PB.alpha" in parset
Expand Down
25 changes: 25 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math
import os
import shutil
import time
from pathlib import Path
from typing import Any

Expand All @@ -14,6 +15,7 @@
from astropy.wcs import WCS

from flint.convol import BeamShape
from flint.exceptions import TimeLimitException
from flint.logging import logger
from flint.utils import (
SlurmInfo,
Expand All @@ -29,9 +31,32 @@
hold_then_move_into,
log_job_environment,
temporarily_move_into,
timelimit_on_context,
)


def some_long_function(minimum_time=5):
t1 = time.time()
while time.time() - t1 < minimum_time:
sum = 0
for i in range(1000):
sum = sum + 1
print(f"Time taken: {time.time() - t1} seconds")


def test_timelimit_on_context():
"""Raise an error should a function take longer than expected to run"""
with pytest.raises(TimeLimitException):
with timelimit_on_context(timelimit_seconds=1):
some_long_function(minimum_time=20)

with timelimit_on_context(timelimit_seconds=5):
some_long_function(minimum_time=1)

# This should make sure that the signal is not raised after the context left
some_long_function(minimum_time=5)


@pytest.fixture(scope="session", autouse=True)
def set_slurm_env():
"""Set up variables for a specific test"""
Expand Down

0 comments on commit 2fa5e87

Please sign in to comment.