Skip to content

Commit

Permalink
bane callback error
Browse files Browse the repository at this point in the history
  • Loading branch information
tgalvin committed Jan 2, 2025
1 parent 427088e commit 4435d67
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 1 deletion.
6 changes: 6 additions & 0 deletions flint/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ class FlintException(Exception):
pass


class AttemptRerunException(FlintException):
"""Intended to be used in stream call back functions to
capture strange errors and signify that they should be
rerun. For instance, strange BANE deadlocking errors."""


class TimeLimitException(FlintException):
"""A function has taken too long to execute"""

Expand Down
17 changes: 17 additions & 0 deletions flint/sclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from spython.main import Client as sclient

from flint.exceptions import AttemptRerunException
from flint.logging import logger
from flint.utils import get_job_info, log_job_environment

Expand All @@ -17,6 +18,7 @@ def run_singularity_command(
bind_dirs: Optional[Union[Path, Collection[Path]]] = None,
stream_callback_func: Optional[Callable] = None,
ignore_logging_output: bool = False,
max_retries: int = 2,
) -> None:
"""Executes a command within the context of a nominated singularity
container
Expand All @@ -27,11 +29,14 @@ def run_singularity_command(
bind_dirs (Optional[Union[Path,Collection[Path]]], optional): Specifies a Path, or list of Paths, to bind to in the container. Defaults to None.
stream_callback_func (Optional[Callable], optional): Provide a function that is applied to each line of output text when singularity is running and `stream=True`. IF provide it should accept a single (string) parameter. If None, nothing happens. Defaultds to None.
ignore_logging_output (bool, optional): If `True` output from the executed singularity command is not logged. Defaults to False.
max_reties (int, optional): If a callback handler is specified which raised an `AttemptRerunException`, this signifies how many attempts should be made. Defaults to 2.
Raises:
FileNotFoundError: Thrown when container image not found
CalledProcessError: Thrown when the command into the container was not successful
"""
if max_retries <= 0:
raise ValueError("Too many retries")

if not image.exists():
raise FileNotFoundError(f"The singularity container {image} was not found. ")
Expand Down Expand Up @@ -75,6 +80,18 @@ def run_singularity_command(
# Sleep for a few moments. If the command created files (often they do), give the lustre a moment
# to properly register them. You dirty sea dog.
sleep(2.0)
except AttemptRerunException as e:
logger.info("A callback handler has raised an error. Attempting to rerun.")
logger.info(f"{e=}")
run_singularity_command(
image=image,
command=command,
bind_dirs=bind_dirs,
stream_callback_func=stream_callback_func,
ignore_logging_output=ignore_logging_output,
max_retries=max_retries - 1,
)

except CalledProcessError as e:
logger.error(f"Failed to run command: {command}")
logger.error(f"Stdout: {e.stdout}")
Expand Down
16 changes: 15 additions & 1 deletion flint/source_finding/aegean.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from astropy.io import fits

from flint.exceptions import AttemptRerunException
from flint.logging import logger
from flint.naming import create_aegean_names
from flint.sclient import run_singularity_command
Expand Down Expand Up @@ -69,6 +70,16 @@ def _get_bane_command(image: Path, cores: int, bane_options: BANEOptions) -> str
return bane_command_str


def _bane_output_callback(line: str) -> None:
"""Callback handler for the BANE program. Will raise an error
on the 'deadlock' issue."""

assert isinstance(line, str)

if "must be strictly ascending or descending" in line:
raise AttemptRerunException("BANE deadlock detected. ")


def _get_aegean_command(
image: Path, base_output: str, aegean_options: AegeanOptions
) -> str:
Expand Down Expand Up @@ -131,7 +142,10 @@ def run_bane_and_aegean(

bind_dir = [image.absolute().parent]
run_singularity_command(
image=aegean_container, command=bane_command_str, bind_dirs=bind_dir
image=aegean_container,
command=bane_command_str,
stream_callback_func=_bane_output_callback,
bind_dirs=bind_dir,
)

aegean_command = _get_aegean_command(
Expand Down
20 changes: 20 additions & 0 deletions tests/test_aegean.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,36 @@
feed the clean mask creation.
"""

import pytest
from pathlib import Path

from flint.exceptions import AttemptRerunException
from flint.source_finding.aegean import (
AegeanOptions,
BANEOptions,
_bane_output_callback,
_get_aegean_command,
_get_bane_command,
)


def test_bane_deadlock_callback():
"""Noticed that BANE sometimes goes into a 'deadlock' state, and will
seemingly always issue an log (from scipy, presumably, arrr)
around interpolation not being strictly ascending"""

lines = (
"30146:INFO using 8 cores",
"30146:INFO using 7 stripes",
"30343:WARNING The points in dimension 0 must be strictly ascending or descending",
)
_bane_output_callback(line=lines[0])
_bane_output_callback(line=lines[1])

with pytest.raises(AttemptRerunException):
_bane_output_callback(line=lines[2])


def test_bane_options():
bane_opts = BANEOptions(box_size=(2, 1), grid_size=(20, 10))

Expand Down

0 comments on commit 4435d67

Please sign in to comment.