diff --git a/flint/data/tests/test_config.yaml b/flint/data/tests/test_config.yaml index 946f7e83..96c193d5 100644 --- a/flint/data/tests/test_config.yaml +++ b/flint/data/tests/test_config.yaml @@ -58,8 +58,8 @@ defaults: suppress_artefacts: true suppress_artefacts_negative_seed_clip: 5 suppress_artefacts_guard_negative_dilation: 40 - grow_low_snr_islands: true - grow_low_snr_clip: 1.75 + grow_low_snr_island: true + grow_low_snr_island_clip: 1.75 grow_low_snr_island_size: 768 initial: wsclean: {} @@ -68,7 +68,9 @@ selfcal: wsclean: data_column: "EXAMPLE" gaincal: {} - masking: {} + masking: + flood_fill_positive_seed_clip: 40 + flood_fill_positive_flood_clip: 1.5 2: wsclean: multiscale: false diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 60178e45..008ca382 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -110,6 +110,10 @@ class WSCleanOptions(NamedTuple): """Path to a FITS file that encodes a cleaning mask""" deconvolution_channels: Optional[int] = None """The channels out will be averaged down to this many sub-band images during deconvolution""" + parallel_deconvolution: Optional[int] = None + """If not none, then this is the number of sub-regions wsclean will attempt to divide and clean""" + parallel_gridding: Optional[int] = None + """If not none, then this is the number of channel images that will be gridded in parallel""" def with_options(self, **kwargs) -> WSCleanOptions: """Return a new instance of WSCleanOptions with updated components""" diff --git a/flint/masking.py b/flint/masking.py index cf376136..d39bf646 100644 --- a/flint/masking.py +++ b/flint/masking.py @@ -44,9 +44,9 @@ class MaskingOptions(NamedTuple): """The significance level of a negative island for the sidelobe suppresion to be activated. This should be a positive number (the signal map is internally inverted)""" suppress_artefacts_guard_negative_dilation: float = 40 """The minimum positive signifance pixels should have to be guarded when attempting to suppress artefacts around bright sources""" - grow_low_snr_islands: bool = True + grow_low_snr_island: bool = False """Whether to attempt to grow a mask to capture islands of low SNR (e.g. diffuse emission)""" - grow_low_snr_clip: float = 1.75 + grow_low_snr_island_clip: float = 1.75 """The minimum signifance levels of pixels to be to seed low SNR islands for consideration""" grow_low_snr_island_size: int = 768 """The number of pixels an island has to be for it to be accepted""" @@ -220,7 +220,8 @@ def reverse_negative_flood_fill( suppress_artefacts: bool = False, negative_seed_clip: float = 5, guard_negative_dilation: float = 50, - grow_low_snr: Optional[float] = 2, + grow_low_island: bool = False, + grow_low_island_snr: float = 2, grow_low_island_size: int = 512, ) -> np.ndarray: """Attempt to: @@ -261,7 +262,8 @@ def reverse_negative_flood_fill( suppress_artefacts (boo, optional): Attempt to suppress regions around presumed artefacts. Defaults to False. negative_seed_clip (Optional[float], optional): Initial clip of negative pixels. This operation is on the inverted signal mask (so this value should be a positive number). If None this second operation is not performed. Defaults to 5. guard_negative_dilation (float, optional): Positive pixels from the computed signal mask will be above this threshold to be protect from the negative island mask dilation. Defaults to 50. - grow_low__snr (Optional[float], optional): Attempt to grow islands of contigous pixels above thius low SNR ration. If None this is not performed. Defaults to 2. + grow_low_island (bool, optional): Whether to grow islands of pixels with low SNR. Defaults to False. + grow_low_island_snr (float, optional): The minimum SNR of contigous pixels for an island ot be grown from. Defaults to 2. grow_low_island_size (int, optional): The number of pixels a low SNR should be in order to be considered valid. Defaults to 512. Returns: @@ -319,10 +321,10 @@ def reverse_negative_flood_fill( # and here we set the presumable nasty islands to False positive_dilated_mask[negative_dilated_mask] = False - if grow_low_snr: + if grow_low_island: low_snr_mask = grow_low_snr_mask( signal=signal, - grow_low_snr=grow_low_snr, + grow_low_snr=grow_low_island_snr, grow_low_island_size=grow_low_island_size, region_mask=negative_dilated_mask, ) @@ -400,7 +402,8 @@ def create_snr_mask_from_fits( suppress_artefacts=masking_options.suppress_artefacts, negative_seed_clip=masking_options.suppress_artefacts_negative_seed_clip, guard_negative_dilation=masking_options.suppress_artefacts_guard_negative_dilation, - grow_low_snr=masking_options.grow_low_snr_clip, + grow_low_island=masking_options.grow_low_snr_island, + grow_low_island_snr=masking_options.grow_low_snr_island_clip, grow_low_island_size=masking_options.grow_low_snr_island_size, ) mask_data = mask_data.reshape(signal_data.shape) diff --git a/flint/prefect/flows/continuum_pipeline.py b/flint/prefect/flows/continuum_pipeline.py index 2de02797..afe6903c 100644 --- a/flint/prefect/flows/continuum_pipeline.py +++ b/flint/prefect/flows/continuum_pipeline.py @@ -187,6 +187,7 @@ def process_science_fields( ) beam_summaries = task_create_beam_summary.map(ms=flagged_mss, imageset=wsclean_cmds) + beam_aegean_outputs = None if run_aegean: beam_aegean_outputs = task_run_bane_and_aegean.map( image=wsclean_cmds, @@ -277,6 +278,10 @@ def process_science_fields( field_options.use_beam_masks and current_round >= field_options.use_beam_masks_from ): + masking_options = get_options_from_strategy( + strategy=strategy, mode="masking", round=current_round + ) + # NOTE: This might be run twice against the first set of images created beam_aegean_outputs = task_run_bane_and_aegean.map( image=wsclean_cmds, aegean_container=unmapped(field_options.aegean_container), @@ -285,6 +290,7 @@ def process_science_fields( image=wsclean_cmds, image_products=beam_aegean_outputs, min_snr=3.5, + update_masking_options=unmapped(masking_options), ) wsclean_cmds = task_wsclean_imager.map( diff --git a/flint/source_finding/aegean.py b/flint/source_finding/aegean.py index 03ea7987..637dd707 100644 --- a/flint/source_finding/aegean.py +++ b/flint/source_finding/aegean.py @@ -2,7 +2,7 @@ from argparse import ArgumentParser from pathlib import Path -from typing import NamedTuple, Tuple +from typing import NamedTuple, Optional, Tuple from AegeanTools import BANE from AegeanTools.catalogs import save_catalog @@ -14,6 +14,31 @@ from flint.sclient import run_singularity_command +class BANEOptions(NamedTuple): + """Container for basic BANE related options. Only a subclass of BANE options are supported.""" + + grid_size: Optional[Tuple[int, int]] = None # (8, 8) + """The step interval of each box, in pixels""" + box_size: Optional[Tuple[int, int]] = None # (196, 196) + """The size of the box in pixels""" + + +class AegeanOptions(NamedTuple): + """Container for basic aegean options. Only a subclass of aegean options are supported. + + Of note is the lack of a tables option (corresponding to --tables). This is dependent on knowing the base output name + and relying on aegean to also append a suffic of sorts to the outputs. For that reason + the aegean command generated will always create the table option. + """ + + nocov: bool = True + """Whether aegean should attempt to model the co-variance of pixels. If true aegean does not. """ + maxsummits: int = 4 + """The maximum number of components an island is allowed to have before it is ignored. """ + autoload: bool = True + """Attempt to load precomputed background and rms maps. """ + + class AegeanOutputs(NamedTuple): """Somple structure to represent output aegean products""" @@ -29,8 +54,55 @@ class AegeanOutputs(NamedTuple): """The input image that was used to source find against""" +def _get_bane_command(image: Path, cores: int, bane_options: BANEOptions) -> str: + """Create the BANE command to run""" + # The stripes is purposely set lower than the cores due to an outstanding bane bug that can cause a deadlock. + bane_command_str = f"BANE {str(image)} --cores {cores} --stripes {cores//2} " + if bane_options.grid_size: + bane_command_str += ( + f"--grid {bane_options.grid_size[0]} {bane_options.grid_size[1]} " + ) + if bane_options.box_size: + bane_command_str += ( + f"--box {bane_options.box_size[0]} {bane_options.box_size[1]}" + ) + bane_command_str = bane_command_str.rstrip() + logger.info("Constructed bane command.") + + return bane_command_str + + +def _get_aegean_command( + image: Path, base_output: str, aegean_options: AegeanOptions +) -> str: + """Create the aegean command to run""" + aegean_command = f"aegean {str(image)} " + if aegean_options.autoload: + aegean_command += "--autoload " + if aegean_options.nocov: + aegean_command += "--nocov " + + # NOTE: Aegean will add the '_comp' component to the output tables. So, if we want + # basename_comp.fits + # to be the output component table then we want to pass + # --table basename.fits + # and have to rely on aegean doing the right thing. + aegean_command += ( + f"--maxsummits {aegean_options.maxsummits} --table {base_output}.fits" + ) + + logger.info("Constructed aegean command. ") + logger.debug(f"{aegean_command=}") + + return aegean_command + + def run_bane_and_aegean( - image: Path, aegean_container: Path, cores: int = 8 + image: Path, + aegean_container: Path, + cores: int = 8, + bane_options: Optional[BANEOptions] = None, + aegean_options: Optional[AegeanOptions] = None, ) -> AegeanOutputs: """Run BANE, the background and noise estimator, and aegean, the source finder, against an input image. This function attempts to hook into the AegeanTools @@ -40,10 +112,15 @@ def run_bane_and_aegean( image (Path): The input image that BANE will calculate a background and RMS map for aegean_container (Path): Path to a singularity container that was the AegeanTools packages installed. cores (int, optional): The number of cores to allow BANE to use. Internally BANE will create a number of sub-processes. Defaults to 8. + bane_options (Optional[BANEOptions], optional): The options that are provided to BANE. If None defaults of BANEOptions are used. Defaults to None. + aegean_options (Optional[AegeanOptions], optional): The options that are provided to Aegean. if None defaults of AegeanOptions are used. Defaults to None. Returns: AegeanOutputs: The newly created BANE products """ + bane_options = bane_options if bane_options else BANEOptions() + aegean_options = aegean_options if aegean_options else AegeanOptions() + image = image.absolute() base_output = str(image.parent / image.stem) logger.info(f"Using base output name of: {base_output}") @@ -51,23 +128,18 @@ def run_bane_and_aegean( aegean_names = create_aegean_names(base_output=base_output) logger.debug(f"{aegean_names=}") - bane_command_str = f"BANE {str(image)} --cores {cores} --stripes {cores//2}" - logger.info("Constructed BANE command. ") + bane_command_str = _get_bane_command( + image=image, cores=cores, bane_options=bane_options + ) bind_dir = [image.absolute().parent] run_singularity_command( image=aegean_container, command=bane_command_str, bind_dirs=bind_dir ) - # NOTE: Aegean will add the '_comp' component to the output tables. So, if we want - # basename_comp.fits - # to be the output component table then we want to pass - # --table basename.fits - # and have to rely on aegean doing the right thing. - aegean_command = f"aegean {str(image)} --autoload --nocov --maxsummits 4 --table {base_output}.fits" - logger.info("Constructed aegean command. ") - logger.debug(f"{aegean_command=}") - + aegean_command = _get_aegean_command( + image=image, base_output=base_output, aegean_options=aegean_options + ) run_singularity_command( image=aegean_container, command=aegean_command, bind_dirs=bind_dir ) diff --git a/tests/test_aegean.py b/tests/test_aegean.py new file mode 100644 index 00000000..b2871868 --- /dev/null +++ b/tests/test_aegean.py @@ -0,0 +1,75 @@ +"""Tests around BANE and aegean, which are one of the source finding +tools used in flint. BANE is also used to derive signal maps that ultimately +feed the clean mask creation. +""" + +from pathlib import Path + +from flint.source_finding.aegean import ( + AegeanOptions, + BANEOptions, + _get_aegean_command, + _get_bane_command, +) + + +def test_bane_options(): + bane_opts = BANEOptions(box_size=(2, 1), grid_size=(20, 10)) + + assert isinstance(bane_opts, BANEOptions) + + bane_str = _get_bane_command( + image=Path("this/is/a/test.fits"), cores=8, bane_options=bane_opts + ) + + assert isinstance(bane_str, str) + + expected = "BANE this/is/a/test.fits --cores 8 --stripes 4 --grid 20 10 --box 2 1" + assert expected == bane_str + + +def test_bane_options_with_defaults(): + bane_opts = BANEOptions() + + assert isinstance(bane_opts, BANEOptions) + + bane_str = _get_bane_command( + image=Path("this/is/a/test.fits"), cores=8, bane_options=bane_opts + ) + + assert isinstance(bane_str, str) + + expected = "BANE this/is/a/test.fits --cores 8 --stripes 4" + assert expected == bane_str + + bane_opts = BANEOptions(box_size=(3, 5)) + bane_str = _get_bane_command( + image=Path("this/is/a/test.fits"), cores=8, bane_options=bane_opts + ) + expected = "BANE this/is/a/test.fits --cores 8 --stripes 4 --box 3 5" + assert expected == bane_str + + +def test_aegean_options(): + """Testing basic aegean options creation and expected command""" + aegean_options = AegeanOptions(nocov=True, maxsummits=4, autoload=True) + + # This are silly tests + assert isinstance(aegean_options, AegeanOptions) + assert aegean_options.maxsummits == 4 + + ex_path = Path("this/is/a/test.fits") + aegean_command = _get_aegean_command( + image=ex_path, base_output="example", aegean_options=aegean_options + ) + + expected_cmd = "aegean this/is/a/test.fits --autoload --nocov --maxsummits 4 --table example.fits" + assert aegean_command == expected_cmd + + aegean_options2 = AegeanOptions(nocov=False, maxsummits=40, autoload=False) + aegean_command2 = _get_aegean_command( + image=ex_path, base_output="example", aegean_options=aegean_options2 + ) + + expected_cmd2 = "aegean this/is/a/test.fits --maxsummits 40 --table example.fits" + assert aegean_command2 == expected_cmd2 diff --git a/tests/test_configuration.py b/tests/test_configuration.py index 740a95fb..a5fd0429 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -136,6 +136,31 @@ def test_get_options(strategy): assert wsclean["data_column"] == "CORRECTED_DATA" +def test_get_mask_options(package_strategy): + """Basic test to prove masking operation behaves well""" + masking = get_options_from_strategy( + strategy=package_strategy, mode="masking", round="initial" + ) + + assert isinstance(masking, dict) + assert masking["flood_fill_positive_seed_clip"] == 4.5 + + masking2 = get_options_from_strategy( + strategy=package_strategy, mode="masking", round=1 + ) + + print(strategy) + + assert isinstance(masking2, dict) + assert masking2["flood_fill_positive_seed_clip"] == 40 + + for k in masking.keys(): + if k == "flood_fill_positive_seed_clip": + continue + + assert masking[k] == masking2[k] + + def test_get_modes(package_strategy): # makes sure defaults for these modes are return when reuestion options # on a self-cal round without them set