From 4435d676c05612c62f8a1dc69f4ebe7da13fae90 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Thu, 2 Jan 2025 15:28:51 +1100 Subject: [PATCH] bane callback error --- flint/exceptions.py | 6 ++++++ flint/sclient.py | 17 +++++++++++++++++ flint/source_finding/aegean.py | 16 +++++++++++++++- tests/test_aegean.py | 20 ++++++++++++++++++++ 4 files changed, 58 insertions(+), 1 deletion(-) diff --git a/flint/exceptions.py b/flint/exceptions.py index 60989675..630b0d52 100644 --- a/flint/exceptions.py +++ b/flint/exceptions.py @@ -4,6 +4,12 @@ class FlintException(Exception): pass +class AttemptRerunException(FlintException): + """Intended to be used in stream call back functions to + capture strange errors and signify that they should be + rerun. For instance, strange BANE deadlocking errors.""" + + class TimeLimitException(FlintException): """A function has taken too long to execute""" diff --git a/flint/sclient.py b/flint/sclient.py index b337b965..5e1ce524 100644 --- a/flint/sclient.py +++ b/flint/sclient.py @@ -7,6 +7,7 @@ from spython.main import Client as sclient +from flint.exceptions import AttemptRerunException from flint.logging import logger from flint.utils import get_job_info, log_job_environment @@ -17,6 +18,7 @@ def run_singularity_command( bind_dirs: Optional[Union[Path, Collection[Path]]] = None, stream_callback_func: Optional[Callable] = None, ignore_logging_output: bool = False, + max_retries: int = 2, ) -> None: """Executes a command within the context of a nominated singularity container @@ -27,11 +29,14 @@ def run_singularity_command( bind_dirs (Optional[Union[Path,Collection[Path]]], optional): Specifies a Path, or list of Paths, to bind to in the container. Defaults to None. stream_callback_func (Optional[Callable], optional): Provide a function that is applied to each line of output text when singularity is running and `stream=True`. IF provide it should accept a single (string) parameter. If None, nothing happens. Defaultds to None. ignore_logging_output (bool, optional): If `True` output from the executed singularity command is not logged. Defaults to False. + max_reties (int, optional): If a callback handler is specified which raised an `AttemptRerunException`, this signifies how many attempts should be made. Defaults to 2. Raises: FileNotFoundError: Thrown when container image not found CalledProcessError: Thrown when the command into the container was not successful """ + if max_retries <= 0: + raise ValueError("Too many retries") if not image.exists(): raise FileNotFoundError(f"The singularity container {image} was not found. ") @@ -75,6 +80,18 @@ def run_singularity_command( # Sleep for a few moments. If the command created files (often they do), give the lustre a moment # to properly register them. You dirty sea dog. sleep(2.0) + except AttemptRerunException as e: + logger.info("A callback handler has raised an error. Attempting to rerun.") + logger.info(f"{e=}") + run_singularity_command( + image=image, + command=command, + bind_dirs=bind_dirs, + stream_callback_func=stream_callback_func, + ignore_logging_output=ignore_logging_output, + max_retries=max_retries - 1, + ) + except CalledProcessError as e: logger.error(f"Failed to run command: {command}") logger.error(f"Stdout: {e.stdout}") diff --git a/flint/source_finding/aegean.py b/flint/source_finding/aegean.py index dda464c9..026c6ca5 100644 --- a/flint/source_finding/aegean.py +++ b/flint/source_finding/aegean.py @@ -6,6 +6,7 @@ from astropy.io import fits +from flint.exceptions import AttemptRerunException from flint.logging import logger from flint.naming import create_aegean_names from flint.sclient import run_singularity_command @@ -69,6 +70,16 @@ def _get_bane_command(image: Path, cores: int, bane_options: BANEOptions) -> str return bane_command_str +def _bane_output_callback(line: str) -> None: + """Callback handler for the BANE program. Will raise an error + on the 'deadlock' issue.""" + + assert isinstance(line, str) + + if "must be strictly ascending or descending" in line: + raise AttemptRerunException("BANE deadlock detected. ") + + def _get_aegean_command( image: Path, base_output: str, aegean_options: AegeanOptions ) -> str: @@ -131,7 +142,10 @@ def run_bane_and_aegean( bind_dir = [image.absolute().parent] run_singularity_command( - image=aegean_container, command=bane_command_str, bind_dirs=bind_dir + image=aegean_container, + command=bane_command_str, + stream_callback_func=_bane_output_callback, + bind_dirs=bind_dir, ) aegean_command = _get_aegean_command( diff --git a/tests/test_aegean.py b/tests/test_aegean.py index 0e94b6d3..0c02d3d3 100644 --- a/tests/test_aegean.py +++ b/tests/test_aegean.py @@ -3,16 +3,36 @@ feed the clean mask creation. """ +import pytest from pathlib import Path +from flint.exceptions import AttemptRerunException from flint.source_finding.aegean import ( AegeanOptions, BANEOptions, + _bane_output_callback, _get_aegean_command, _get_bane_command, ) +def test_bane_deadlock_callback(): + """Noticed that BANE sometimes goes into a 'deadlock' state, and will + seemingly always issue an log (from scipy, presumably, arrr) + around interpolation not being strictly ascending""" + + lines = ( + "30146:INFO using 8 cores", + "30146:INFO using 7 stripes", + "30343:WARNING The points in dimension 0 must be strictly ascending or descending", + ) + _bane_output_callback(line=lines[0]) + _bane_output_callback(line=lines[1]) + + with pytest.raises(AttemptRerunException): + _bane_output_callback(line=lines[2]) + + def test_bane_options(): bane_opts = BANEOptions(box_size=(2, 1), grid_size=(20, 10))