diff --git a/CHANGELOG.md b/CHANGELOG.md index e5e5e1fc..00b98694 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ - load arguments into an `BaseOptions` class from a `ArgumentParser` - starting to move some classes over to this approaches (including some CLIs) - Added a verify tarball function to verify archives +- Removed `suppress_artefact` and `minimum_absolute_clip` functions from + `flint.masking` +- Added an adaptive box selection mode to the minimum absolute algorithm # 0.2.7 diff --git a/flint/data/tests/test_config.yaml b/flint/data/tests/test_config.yaml index ef61a764..0c48fc8b 100644 --- a/flint/data/tests/test_config.yaml +++ b/flint/data/tests/test_config.yaml @@ -54,9 +54,6 @@ defaults: flood_fill: true flood_fill_positive_seed_clip: 4.5 flood_fill_positive_flood_clip: 1.5 - suppress_artefacts: true - suppress_artefacts_negative_seed_clip: 5 - suppress_artefacts_guard_negative_dilation: 40 grow_low_snr_island: true grow_low_snr_island_clip: 1.75 grow_low_snr_island_size: 768 diff --git a/flint/data/tests/test_config_2.yaml b/flint/data/tests/test_config_2.yaml index e0158662..d74b05a2 100644 --- a/flint/data/tests/test_config_2.yaml +++ b/flint/data/tests/test_config_2.yaml @@ -54,9 +54,6 @@ defaults: flood_fill: true flood_fill_positive_seed_clip: 4.5 flood_fill_positive_flood_clip: 1.5 - suppress_artefacts: true - suppress_artefacts_negative_seed_clip: 5 - suppress_artefacts_guard_negative_dilation: 40 grow_low_snr_island: true grow_low_snr_island_clip: 1.75 grow_low_snr_island_size: 768 diff --git a/flint/masking.py b/flint/masking.py index 388f0b0f..8865844c 100644 --- a/flint/masking.py +++ b/flint/masking.py @@ -6,7 +6,7 @@ from argparse import ArgumentParser from pathlib import Path -from typing import Collection, Optional, Union +from typing import Collection, Optional, Union, NamedTuple import astropy.units as u import numpy as np @@ -20,6 +20,7 @@ binary_erosion as scipy_binary_erosion, # Rename to distinguish from skimage ) from scipy.ndimage import label, minimum_filter +from scipy.signal import fftconvolve from radio_beam import Beam from flint.logging import logger @@ -48,26 +49,18 @@ class MaskingOptions(BaseOptions): """If True, the clipping levels are used as the `increase_factor` when using a minimum absolute clip""" flood_fill_use_mbc_box_size: int = 75 """The size of the mbc box size should mbc be used""" - suppress_artefacts: bool = False - """Whether to attempt artefacts based on the presence of significant negatives""" - suppress_artefacts_negative_seed_clip: float = 5 - """The significance level of a negative island for the sidelobe suppression to be activated. This should be a positive number (the signal map is internally inverted)""" - suppress_artefacts_guard_negative_dilation: float = 40 - """The minimum positive significance pixels should have to be guarded when attempting to suppress artefacts around bright sources""" - suppress_artefacts_large_island_threshold: float = 1.0 - """Threshold in units of beams for an island of negative pixels to be considered large""" + flood_fill_use_mbc_adaptive_step_factor: float = 2.0 + """Stepping size used to increase box by should adaptive detect poor boxcar statistics""" + flood_fill_use_mbc_adaptive_skew_delta: float = 0.2 + """A box is consider too small for a pixel if the fractional proportion of positive pixels is larger than the deviation away of (0.5 + frac). This threshold is therefore 0 to 0.5""" + flood_fill_use_mbc_adaptive_max_depth: Optional[int] = None + """Determines the number of adaptive boxcar scales to use when constructing seed mask. If None no adaptive boxcar sizes""" grow_low_snr_island: bool = False """Whether to attempt to grow a mask to capture islands of low SNR (e.g. diffuse emission)""" grow_low_snr_island_clip: float = 1.75 """The minimum significance levels of pixels to be to seed low SNR islands for consideration""" grow_low_snr_island_size: int = 768 """The number of pixels an island has to be for it to be accepted""" - minimum_boxcar: bool = False - """Use the boxcar minimum threshold to compare to remove artefacts""" - minimum_boxcar_size: int = 100 - """Size of the boxcar filter""" - minimum_boxcar_increase_factor: float = 1.2 - """The factor used to construct minimum positive signal threshold for an island """ beam_shape_erode: bool = False """Erode the mask using the shape of the restoring beam""" beam_shape_erode_minimum_response: float = 0.6 @@ -333,171 +326,165 @@ def grow_low_snr_mask( return low_snr_mask -# TODO> Allow box car size to be scaled in proportion to beam size -def minimum_boxcar_artefact_mask( - signal: np.ndarray, - island_mask: np.ndarray, - boxcar_size: int, - increase_factor: float = 2.0, +class SkewResult(NamedTuple): + positive_pixel_frac: np.ndarray + """The fraction of positive pixels in a boxcar function""" + skew_mask: np.ndarray + """Mask of pixel positions indicating which positions failed the skew test""" + box_size: int + """Size of the boxcar window applies""" + skew_delta: float + """The test threshold for skew""" + + +def create_boxcar_skew_mask( + image: np.ndarray, skew_delta: float, box_size: int ) -> np.ndarray: - """Attempt to remove islands from a potential clean mask by - examining surrounding pixels. A boxcar is applied to find the - minimum signal to noise in a small localised region. For each - island the maximum signal is considered. + assert 0.0 < skew_delta < 0.5, f"{skew_delta=}, but should be 0.0 to 0.5" + assert ( + len(image.shape) == 2 + ), f"Expected two dimensions, got image shape of {image.shape}" + logger.info(f"Computing boxcar skew with {box_size=} and {skew_delta=}") + positive_pixels = (image > 0.0).astype(np.float32) + + # Counting positive pixel fraction here. The su + window_shape = (box_size, box_size) + positive_pixel_fraction = fftconvolve( + in1=positive_pixels, in2=np.ones(window_shape, dtype=np.float32), mode="same" + ) / np.prod(window_shape) + positive_pixel_fraction = np.clip( + positive_pixel_fraction, 0.0, 1.0 + ) # trust nothing + + skew_mask = positive_pixel_fraction > (0.5 + skew_delta) + logger.info(f"{np.sum(skew_mask)} pixels above {skew_delta=} with {box_size=}") + + return SkewResult( + positive_pixel_frac=positive_pixel_fraction, + skew_mask=skew_mask, + skew_delta=skew_delta, + box_size=box_size, + ) - If the absolute minimum signal increased by a factor in an - island region is larger to the maximum signal then that island - is omitted. - This is planned to be deprecated. +def _minimum_absolute_clip( + image: np.ndarray, increase_factor: float = 2.0, box_size: int = 100 +) -> np.ndarray: + """Given an input image or signal array, construct a simple image mask by applying a + rolling boxcar minimum filter, and then selecting pixels above a cut of + the absolute value value scaled by `increase_factor`. This is a pixel-wise operation. Args: - signal (np.ndarray): The input signl to use - island_mask (np.ndarray): The current island mask derived by other methods - boxcar_size (int): Size of the minimum boxcar size - increase_factor (float, optional): Factor to increase the minimum signal by. Defaults to 2.0. + image (np.ndarray): The input array to consider + increase_factor (float, optional): How large to scale the absolute minimum by. Defaults to 2.0. + box_size (int, optional): Size of the rolling boxcar minimum filtr. Defaults to 100. Returns: - np.ndarray: _description_ + np.ndarray: The mask of pixels above the locally varying threshold """ - import warnings - - warnings.warn("minimum_boxcar_artefacts to be removed. ", DeprecationWarning) - return island_mask - - logger.info( - f"Running boxcar minimum island clip with {boxcar_size=} {increase_factor=}" - ) + logger.info(f"Minimum absolute clip, {increase_factor=} {box_size=}") + rolling_box_min = minimum_filter(image, box_size) - # Make a copy of the original island mask to avoid unintended persistence - mask = island_mask.copy() + image_mask = image > (increase_factor * np.abs(rolling_box_min)) + # NOTE: This used to attempt to select pixels should that belong to an island of positive pixels with a box that was too small + # | ( + # (image > 0.0) & (rolling_box_min > 0.0) + # ) - # label each of the islands with a id - mask_labels, _ = label(island_mask, structure=np.ones((3, 3))) # type: ignore - uniq_labels = np.unique(mask_labels) - logger.info(f"Number of unique islands: {len(uniq_labels)}") + return image_mask - rolling_min = minimum_filter(signal, boxcar_size) - # For each island work out the maximum signal in the island and the minimum signal - # at the island in the output of the boxcar. - island_min, island_max = {}, {} - for island_id in uniq_labels: - if island_id == 0: - continue +def _adaptive_minimum_absolute_clip( + image: np.ndarray, + increase_factor: float = 2.0, + box_size: int = 100, + adaptive_max_depth: int = 3, + adaptive_box_step: float = 2.0, + adaptive_skew_delta: float = 0.2, +) -> np.ndarray: + logger.info( + f"Using adaptive minimum absolute clip with {box_size=} {adaptive_skew_delta=}" + ) + min_value = minimum_filter(input=image, size=box_size) - # compute the mask once. These could be a dixt comprehension - # but then this mask is computed twice - island_id_mask = mask_labels == island_id + for box_round in range(adaptive_max_depth, 0, -1): + skew_results = create_boxcar_skew_mask( + image=image, skew_delta=adaptive_skew_delta, box_size=box_size + ) + if np.all(~skew_results.skew_mask): + logger.info("No skewed islands detected") + break + if any([box_size > dim for dim in image.shape]): + logger.info(f"{box_size=} larger than a dimension in {image.shape=}") + break - island_max[island_id] = np.max(signal[island_id_mask]) - island_min[island_id] = np.min(rolling_min[island_id_mask]) + logger.info(f"({box_round}) Growing {box_size=} {adaptive_box_step=}") + box_size = int(box_size * adaptive_box_step) + _min_value = minimum_filter(input=image, size=box_size) + logger.debug("Slicing minimum values into place") - # Nuke the weak ones, mask and report - eliminate = [ - k - for k in island_max - if island_max[k] < np.abs(increase_factor * island_min[k]) - and island_min[k] < 0.0 - ] - # Walk the plank - mask[np.isin(mask_labels, eliminate)] = False + min_value[skew_results.skew_mask] = _min_value[skew_results.skew_mask] - logger.info(f"Eliminated {len(eliminate)} islands") + mask = image > (np.abs(min_value) * increase_factor) return mask -def suppress_artefact_mask( - signal: np.ndarray, - negative_seed_clip: float, - guard_negative_dilation: float, - pixels_per_beam: Optional[float] = None, - large_island_threshold: float = 1.0, +def minimum_absolute_clip( + image: np.ndarray, + increase_factor: float = 2.0, + box_size: int = 100, + adaptive_max_depth: Optional[int] = None, + adaptive_box_step: float = 2.0, + adaptive_skew_delta: float = 0.2, ) -> np.ndarray: - """Attempt to grow mask that suppresses artefacts around bright sources. Small islands - of negative emission seed pixels, which then grow out. Bright positive pixels are not - allowed to change (which presumably are the source of negative artetfacts). + """Implements minimum absolute clip method. A minimum filter of a particular + boxc size is applied to the input image. The absolute of the output is taken + and increased by a guard factor, which forms the clipping level used to construct + a clean mask: - The assumption here is that: + >>> image > (absolute(minimum_filter(image, box)) * factor) - - no genuine source of negative sky emission - - negative islands are around bright sources with deconvolution/calibration errors - - if there are brightish negative islands there is also positive brightish arteefact islands nearby + The idea is only valid for zero mean and normally distributed pixels, with + positive definite flux, making it appropriate for Stokes I. - For this reason the guard mask should be sufficiently high to protect the main source but nuke the fask positive islands + Larger box sizes and guard factors will make the mask more conservative. Should + the boxcar be too small relative to some feature it is aligned it is possible + that an excess of positive pixels will produce an less than optimal clipping + level. An adaptive box size mode, if activated, attempts to use a larger box + around these regions. - Consider using ``minimum_absolute_clip``, which has a simpler set of options and has - largely the same intended result. + The basic idea being detecting regions where the boxcar is too small is around + the idea that there should be a similar number of positive to negative pixels. + Should there be too many positive pixels in a region it is likely there is an Args: - signal (np.ndarray): The signal mask, - negative_seed_clip (float): The minimum significance level to seed. This is a positive number (as it is applied to the inverted signal). - guard_negative_dilation (float): Regions of positive emission above this are protected. This is positive. - pixels_per_beam (Optional[float], optional): The number of pixels per beam. If not None, seed islands larger than this many pixels are removed. Defaults to None. - large_island_threshold (float, optional): The number of beams required for a large island of negative pixels to be dropped as an artefact seed. Only used if `pixels_per_beam` is set. Defaults to 1.0. + image (np.ndarray): Image to create a mask for + increase_factor (float, optional): The guard factor used to inflate the absolute of the minimum filter. Defaults to 2.0. + box_size (int, optional): Size of the box car of the minimum filter. Defaults to 100. + adaptive_max_depth (Optional[int], optional): The maximum number of rounds that the adaptive mode is allowed to perform when rescaling boxcar results in certain directions. Defaults to None. + adaptive_box_step (float, optional): A multiplicative factor to increase the boxcar size by each round. Defaults to 2.0. + adaptive_skew_delta (float, optional): Minimum deviation from 0.5 that needs to be met to classify a region as skewed. Defaults to 0.2. Returns: - np.ndarray: The artefact suppression mask + np.ndarray: Final mask """ - # This Pirate thinks provided the background is handled - # that taking the inverse is correct - negative_signal = -1 * signal - negative_mask = negative_signal > negative_seed_clip - - if pixels_per_beam: - mask_labels, no_labels = label(negative_mask, structure=np.ones((3, 3))) - _, counts = np.unique(mask_labels.flatten(), return_counts=True) - - clip_pixels_threshold = large_island_threshold * pixels_per_beam - logger.info( - f"Removing negative islands larger than {clip_pixels_threshold} pixels with {large_island_threshold=}, {pixels_per_beam=}" + if adaptive_max_depth is None: + return _minimum_absolute_clip( + image=image, box_size=box_size, increase_factor=increase_factor ) - large_islands = [ - idx - for idx, count in enumerate(counts) - if count > clip_pixels_threshold and idx > 0 - ] - logger.info(f"Removing islands with labels: {large_islands=}") - negative_mask[np.isin(mask_labels, large_islands)] = False - - negative_dilated_mask = scipy_binary_dilation( - input=negative_mask, - mask=signal < guard_negative_dilation, - iterations=10, - structure=np.ones((3, 3)), - ) + adaptive_max_depth = int(adaptive_max_depth) - return negative_dilated_mask - - -def minimum_absolute_clip( - image: np.ndarray, increase_factor: float = 2.0, box_size: int = 100 -) -> np.ndarray: - """Given an input image or signal array, construct a simple image mask by applying a - rolling boxcar minimum filter, and then selecting pixels above a cut of - the absolute value value scaled by `increase_factor`. This is a pixel-wise operation. - - Args: - image (np.ndarray): The input array to consider - increase_factor (float, optional): How large to scale the absolute minimum by. Defaults to 2.0. - box_size (int, optional): Size of the rolling boxcar minimum filtr. Defaults to 100. - - Returns: - np.ndarray: The mask of pixels above the locally varying threshold - """ - logger.info(f"Minimum absolute clip, {increase_factor=} {box_size=}") - rolling_box_min = minimum_filter(image, box_size) - - image_mask = image > (increase_factor * np.abs(rolling_box_min)) - # NOTE: This used to attempt to select pixels should that belong to an island of positive pixels with a box that was too small - # | ( - # (image > 0.0) & (rolling_box_min > 0.0) - # ) - - return image_mask + return _adaptive_minimum_absolute_clip( + image=image, + increase_factor=increase_factor, + box_size=box_size, + adaptive_max_depth=adaptive_max_depth, + adaptive_box_step=adaptive_box_step, + adaptive_skew_delta=adaptive_skew_delta, + ) def _verify_set_positive_seed_clip( @@ -567,11 +554,17 @@ def reverse_negative_flood_fill( image=base_image, increase_factor=masking_options.flood_fill_positive_seed_clip, box_size=masking_options.flood_fill_use_mbc_box_size, + adaptive_max_depth=masking_options.flood_fill_use_mbc_adaptive_max_depth, + adaptive_box_step=masking_options.flood_fill_use_mbc_adaptive_step_factor, + adaptive_skew_delta=masking_options.flood_fill_use_mbc_adaptive_skew_delta, ) flood_floor_mask = minimum_absolute_clip( image=base_image, increase_factor=masking_options.flood_fill_positive_flood_clip, box_size=masking_options.flood_fill_use_mbc_box_size, + adaptive_max_depth=masking_options.flood_fill_use_mbc_adaptive_max_depth, + adaptive_box_step=masking_options.flood_fill_use_mbc_adaptive_step_factor, + adaptive_skew_delta=masking_options.flood_fill_use_mbc_adaptive_skew_delta, ) else: # Sanity check the upper clip level, you rotten seadog @@ -592,33 +585,11 @@ def reverse_negative_flood_fill( structure=np.ones((3, 3)), ) - if masking_options.minimum_boxcar: - positive_dilated_mask = minimum_boxcar_artefact_mask( - signal=base_image, - island_mask=positive_dilated_mask, - boxcar_size=masking_options.minimum_boxcar_size, - increase_factor=masking_options.minimum_boxcar_increase_factor, - ) - - negative_dilated_mask = None - if masking_options.suppress_artefacts: - negative_dilated_mask = suppress_artefact_mask( - signal=base_image, - negative_seed_clip=masking_options.suppress_artefacts_negative_seed_clip, - guard_negative_dilation=masking_options.suppress_artefacts_guard_negative_dilation, - pixels_per_beam=pixels_per_beam, - large_island_threshold=masking_options.suppress_artefacts_large_island_threshold, - ) - - # and here we set the presumable nasty islands to False - positive_dilated_mask[negative_dilated_mask] = False - if masking_options.grow_low_snr_island: low_snr_mask = grow_low_snr_mask( signal=base_image, grow_low_snr=masking_options.grow_low_snr_island_clip, grow_low_island_size=masking_options.grow_low_snr_island_size, - region_mask=negative_dilated_mask, ) positive_dilated_mask[low_snr_mask] = True @@ -788,6 +759,11 @@ def get_parser() -> ArgumentParser: fits_parser.add_argument( "--bkg-fits", type=Path, help="Path to the BKG of the input image. " ) + fits_parser.add_argument( + "--save-signal", + action="store_true", + help="Save the signal image internally generated (should it be generated)", + ) extract_parser = subparser.add_parser( "extractmask", diff --git a/tests/test_masking.py b/tests/test_masking.py index d4270bf3..13f841e7 100644 --- a/tests/test_masking.py +++ b/tests/test_masking.py @@ -15,13 +15,74 @@ create_options_from_parser, create_snr_mask_from_fits, get_parser, - minimum_boxcar_artefact_mask, + _minimum_absolute_clip, + minimum_absolute_clip, ) from flint.naming import FITSMaskNames SHAPE = (100, 100) +def test_adaptive_minimum_absolute_clip(): + """Activate the adaptive mode of the mac""" + + image = np.ones((2000, 2000)) * -1.2 + image[100:200, 100:200] = 1.0 + mask = minimum_absolute_clip( + image=image, + box_size=50, + increase_factor=1.1, + adaptive_max_depth=1, + adaptive_box_step=4.0, + adaptive_skew_delta=0.2, + ) + assert np.all(~mask) + + image = np.ones((2000, 2000)) * -1.2 + image[100:200, 100:200] = 1.0 + image[110, 130] = 2000 + mask = minimum_absolute_clip( + image=image, + box_size=50, + increase_factor=1.1, + adaptive_max_depth=1, + adaptive_box_step=4.0, + adaptive_skew_delta=0.2, + ) + assert np.sum(mask) == 1 + + +def test_private_minimum_absolute_clip(): + """The minimum absolute clipping process accepts an image + and returns a mask. At its heart it applies a ``minimum_filter`` + from scipy.""" + + image = np.ones(SHAPE) * -1.0 + image[10, 10] = -2 + private_mbc_mask = _minimum_absolute_clip(image=image, box_size=10) + mbc_mask = minimum_absolute_clip(image=image, box_size=10) + assert np.all(~mbc_mask) + assert np.all(private_mbc_mask == mbc_mask) + + image[12, 12] = 5 + private_mbc_mask = _minimum_absolute_clip(image=image) + mbc_mask = minimum_absolute_clip(image=image) + assert np.sum(mbc_mask) == 1 + assert np.all(private_mbc_mask == mbc_mask) + + image[12, 12] = 0 + private_mbc_mask = _minimum_absolute_clip(image=image) + mbc_mask = minimum_absolute_clip(image=image) + assert np.all(~mbc_mask) + assert np.all(private_mbc_mask == mbc_mask) + + image[12, 12] = 5 + private_mbc_mask = _minimum_absolute_clip(image=image, increase_factor=3) + mbc_mask = minimum_absolute_clip(image=image, increase_factor=3) + assert np.all(~mbc_mask) + assert np.all(private_mbc_mask == mbc_mask) + + def test_create_signal_from_rmsbkg(): """Make sure the operations around the creation of a signal image from rms / bkg images follows and handles paths vs numpy arrays""" @@ -198,66 +259,6 @@ def test_consider_beam_masking_round(): assert consider_beam_mask_round(current_round=1, mask_rounds=1) -def test_minimum_boxcar_artefact(): - """See if the minimum box care artefact suppressor can suppress the - bright artefact when a bright negative artefact - """ - img = np.zeros((SHAPE)) - - img[30:40, 30:40] = 10 - img_mask = img > 5 - - out_mask = minimum_boxcar_artefact_mask( - signal=img, island_mask=img_mask, boxcar_size=10 - ) - assert np.all(img_mask == out_mask) - assert out_mask is img_mask # minimum boxcar artefact mask to be deprecated - # assert img_mask is not out_mask - - img[41:45, 30:40] = -20 - out_mask = minimum_boxcar_artefact_mask( - signal=img, island_mask=img_mask, boxcar_size=10 - ) - assert out_mask is img_mask # minimum boxcar artefact mask to be deprecated - - # assert not np.all(img_mask == out_mask) - - -def test_minimum_boxcar_artefact_blanked(): - """See if the minimum box care artefact suppressor can suppress the - bright artefact when a bright negative artefact - """ - img = np.zeros((SHAPE)) - - img[30:40, 30:40] = 10 - img[41:45, 30:40] = -20 - - img_mask = img > 5 - - out_mask = minimum_boxcar_artefact_mask( - signal=img, island_mask=img_mask, boxcar_size=10, increase_factor=1000 - ) - assert out_mask is img_mask # minimum boxcar artefact mask will be deprecated - - # assert out_mask is not img_mask - # assert np.all(out_mask[30:40, 30:40] == False) # noqa - - -def test_minimum_boxcar_large_bright_island(): - """This one checks to make sure that if the boxcar is smaller than - a positive islane that the island is not the island_min - """ - - img = np.zeros(SHAPE) - img[30:40, 30:40] = 10 - img_mask = img > 5 - - out_mask = minimum_boxcar_artefact_mask( - signal=img, island_mask=img_mask, boxcar_size=2 - ) - assert np.all(img_mask == out_mask) - - @pytest.fixture def fits_dir(tmpdir): fits_dir = Path(tmpdir) / "fits"