diff --git a/CHANGELOG.md b/CHANGELOG.md index 121e1bb9..99e3d1f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Change log +# dev + +- Created `subtract_cube_pipeline.py`. Associated changes include: + - wsclean imaging will delete files while still in temporary storage (for + instance is a ramdisk is used) + - added option to prefect convol task to delete files once they have be + convoled + - predicting model produced by `wsclean -save-source-list` into measurement + set + - using `taql` to subtract model from nominated data column + - added a 'flow_addmodel_to_mss` to allow different task runner to be + specified + - added `flint_no_log_wsclean_output` to `WSCleanOptions` to disable wsclean + logging + - added flag to disable logging singularity command output to the + `flint.logging.logger` + # 0.2.8 - added `wrapper_options_from_strategy` decorator helper function diff --git a/README.md b/README.md index ceff6bc6..5b15a7a6 100644 --- a/README.md +++ b/README.md @@ -112,12 +112,8 @@ tasks together (outlined above) into a single data-processing pipeline. observation sequence. - `flint_flow_continuum_pipeline`: Performs bandpass calibration, solution copying, imaging, self-calibration and mosaicing. -- `flint_flow_cointinuum_mask_pipeline`: Performs bandpass calibration, solution - copying, imaging, self-calibration and mosaicing. In this flow a process to - construct a robust clean mask is performed by exploiting an initial imaging - round. The field image is constructed across all beams, S/N clipping is - performed, then guard masks on a per-beam basis are extracted. This pipeline - has fallen out of use and could be removed. +- `flint_flow_subtract_cube_pipeline`: Subtract a continuum model and image the + residual data. ## Sky-model catalogues diff --git a/flint/coadd/linmos.py b/flint/coadd/linmos.py index 30c4883b..f037e9d4 100644 --- a/flint/coadd/linmos.py +++ b/flint/coadd/linmos.py @@ -526,6 +526,7 @@ def linmos_images( container: Path = Path("yandasoft.sif"), cutoff: float = 0.001, pol_axis: Optional[float] = None, + trim_linmos_fits: bool = True, ) -> LinmosCommand: """Create a linmos parset file and execute it. @@ -538,6 +539,7 @@ def linmos_images( container (Path, optional): Path to the singularity container that has the yandasoft tools. Defaults to Path('yandasoft.sif'). cutoff (float, optional): Pixels whose primary beam attenuation is below this cutoff value are blanked. Defaults to 0.001. pol_axis (Optional[float], optional): The physical oritentation of the ASKAP third-axis in radians. Defaults to None. + trim_linmos_fits (bool, optional): Attempt to trim the output linmos files of as much empty space as possible. Defaults to True. Returns: LinmosCommand: The linmos command executed and the associated parset file @@ -577,11 +579,12 @@ def linmos_images( ) # Trim the fits image to remove empty pixels - image_trim_results = trim_fits_image(image_path=linmos_names.image_fits) - trim_fits_image( - image_path=linmos_names.weight_fits, - bounding_box=image_trim_results.bounding_box, - ) + if trim_linmos_fits: + image_trim_results = trim_fits_image(image_path=linmos_names.image_fits) + trim_fits_image( + image_path=linmos_names.weight_fits, + bounding_box=image_trim_results.bounding_box, + ) return linmos_cmd diff --git a/flint/configuration.py b/flint/configuration.py index 40d7fc5d..eea5a8ea 100644 --- a/flint/configuration.py +++ b/flint/configuration.py @@ -30,7 +30,7 @@ # into there KNOWN_HEADERS = ("defaults", "initial", "selfcal", "version") -KNOWN_OPERATIONS = ("stokesv",) +KNOWN_OPERATIONS = ("stokesv", "subtractcube") FORMAT_VERSION = 0.1 MODE_OPTIONS_MAPPING = { "wsclean": WSCleanOptions, @@ -81,6 +81,32 @@ def copy_and_timestamp_strategy_file(output_dir: Path, input_yaml: Path) -> Path return Path(stamped_imaging_strategy) +def _load_and_copy_strategy( + output_split_science_path: Path, imaging_strategy: Optional[Path] = None +) -> Union[Strategy, None]: + """Load a strategy file and copy a timestamped version into the output directory + that would contain the science processing. + + Args: + output_split_science_path (Path): Where the strategy file should be copied to (where the data would be processed) + imaging_strategy (Optional[Path], optional): Location of the strategy file. Defaults to None. + + Returns: + Union[Strategy, None]: The loadded strategy file if provided, `None` otherwise + """ + return ( + load_strategy_yaml( + input_yaml=copy_and_timestamp_strategy_file( + output_dir=output_split_science_path, + input_yaml=imaging_strategy, + ), + verify=True, + ) + if imaging_strategy + else None + ) + + def get_selfcal_options_from_yaml(input_yaml: Optional[Path] = None) -> Dict: """Stub to represent interaction with a configurationf ile diff --git a/flint/convol.py b/flint/convol.py index 4c116f8a..b7cced8a 100644 --- a/flint/convol.py +++ b/flint/convol.py @@ -7,6 +7,7 @@ import warnings from argparse import ArgumentParser from pathlib import Path +from shutil import copyfile from typing import Collection, List, Literal, NamedTuple, Optional import astropy.units as u @@ -198,10 +199,14 @@ def get_common_beam( if cutoff: logger.info(f"Setting beam cutoff to {cutoff} arcseconds. ") - beam, beams = beamcon_2D.get_common_beam(files=list(image_paths), cutoff=cutoff) + try: + beam, beams = beamcon_2D.get_common_beam(files=list(image_paths), cutoff=cutoff) - beam_shape = BeamShape.from_radio_beam(beam) - logger.info(f"Constructed {beam_shape=}") + beam_shape = BeamShape.from_radio_beam(beam) + logger.info(f"Constructed {beam_shape=}") + except ValueError: + logger.info("The beam was not constrained. Setting to NaNs") + beam_shape = BeamShape(bmaj_arcsec=np.nan, bmin_arcsec=np.nan, bpa_deg=np.nan) return beam_shape @@ -231,29 +236,50 @@ def convolve_images( if cutoff: logger.info(f"Supplied cutoff of {cutoff} arcsecond") + if not np.isfinite(beam_shape.bmaj_arcsec): + logger.info("Beam shape is not defined. Copying files into place. ") + + conv_image_paths = [ + Path(str(image_path).replace(".fits", f".{convol_suffix}.fits")) + for image_path in image_paths + ] + # If the beam is not defined, simply copy the file into place. Although + # this takes up more space, it is not more than otherwise + for original_path, copy_path in zip(image_paths, conv_image_paths): + logger.info(f"Copying {original_path=} {copy_path=}") + copyfile(original_path, copy_path) + + return conv_image_paths + radio_beam = Beam( major=beam_shape.bmaj_arcsec * u.arcsecond, minor=beam_shape.bmin_arcsec * u.arcsecond, pa=beam_shape.bpa_deg * u.deg, ) - conv_image_paths: List[Path] = [] + return_conv_image_paths: List[Path] = [] for image_path in image_paths: - logger.info(f"Convolving {str(image_path.name)}") - beamcon_2D.beamcon_2d_on_fits( - file=image_path, - outdir=None, - new_beam=radio_beam, - conv_mode="robust", - suffix=convol_suffix, - cutoff=cutoff, - ) - conv_image_paths.append( - Path(str(image_path).replace(".fits", f".{convol_suffix}.fits")) + convol_output_path = Path( + str(image_path).replace(".fits", f".{convol_suffix}.fits") ) + header = fits.getheader(image_path) + if header["BMAJ"] == 0.0: + logger.info(f"Copying {image_path} to {convol_output_path=} for empty beam") + copyfile(image_path, convol_output_path) + else: + logger.info(f"Convolving {str(image_path.name)}") + beamcon_2D.beamcon_2d_on_fits( + file=image_path, + outdir=None, + new_beam=radio_beam, + conv_mode="robust", + suffix=convol_suffix, + cutoff=cutoff, + ) + return_conv_image_paths.append(convol_output_path) - return conv_image_paths + return return_conv_image_paths def get_parser() -> ArgumentParser: diff --git a/flint/exceptions.py b/flint/exceptions.py index 219c217b..c07252a2 100644 --- a/flint/exceptions.py +++ b/flint/exceptions.py @@ -4,6 +4,10 @@ class MSError(Exception): pass +class FrequencyMismatchError(Exception): + """Raised when there are differences in frequencies""" + + class PhaseOutlierFitError(Exception): """Raised when the phase outlier fit routine fails.""" diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index a27dc8e8..3b3e1c77 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -160,6 +160,12 @@ class WSCleanOptions(BaseOptions): """The polarisation to be imaged""" save_source_list: bool = False """Saves the found clean components as a BBS/DP3 text sky model""" + channel_range: Optional[Tuple[int, int]] = None + """Image a channel range between a lower (inclusive) and upper (exclusive) bound""" + no_reorder: bool = False + """If True turn off the reordering of the MS at the beginning of wsclean""" + flint_no_log_wsclean_output: bool = False + """If True do not log the wsclean output""" class WSCleanCommand(BaseOptions): @@ -216,10 +222,13 @@ def get_wsclean_output_source_list_path( def _rename_wsclean_title(name_str: str) -> str: - """Construct an apply a regular expression that aims to identify + """Construct and apply a regular expression that aims to identify the wsclean appended properties string within a file and replace the `-` separator with a `.`. + A simple replace of all `-` with `.` may not be ideal if the + character has been used on purpose. + Args: name_str (str): The name that will be extracted and modified @@ -229,7 +238,7 @@ def _rename_wsclean_title(name_str: str) -> str: search_re = r"(-(i|q|u|v|xx|xy|yx|yy))?-((MFS|[0-9]{4}))(-t[0-9]{5})?-(image|dirty|model|residual|psf)" match_re = re.compile(search_re) - logger.info(f"{name_str=} {type(name_str)=}") + logger.info(f"Searching {name_str=} for wsclean added components") result = match_re.search(str(name_str)) if result is None: @@ -281,7 +290,7 @@ def _wsclean_output_callback(line: str) -> None: # TODO: Update this function to also add int the source list def get_wsclean_output_names( prefix: str, - subbands: int, + subbands: Optional[int] = None, pols: Optional[Union[str, Tuple[str]]] = None, verify_exists: bool = False, include_mfs: bool = True, @@ -318,10 +327,15 @@ def get_wsclean_output_names( Returns: ImageSet: The file paths that wsclean should create/has created. """ + logger.info(f"Finding wsclean outputs, {prefix=}") # TODO: Use a regular expression for this - subband_strs = [f"{subband:04}" for subband in range(subbands)] - if include_mfs: - subband_strs.append("MFS") + subband_strs: List[Optional[str]] = [ + None, + ] + if subbands and subbands > 1: + subband_strs = [f"{subband:04}" for subband in range(subbands)] + if include_mfs: + subband_strs.append("MFS") in_pols: Tuple[Union[None, str]] if pols is None: @@ -342,10 +356,13 @@ def get_wsclean_output_names( paths: List[Path] = [] for pol in in_pols: for subband_str in subband_strs: + components = [prefix] + if subband_str: + components.append(subband_str) if pol: - path_str = f"{prefix}-{subband_str}-{pol}-{image_type}.fits" - else: - path_str = f"{prefix}-{subband_str}-{image_type}.fits" + components.append(pol) + components.append(image_type) + path_str = "-".join(components) + ".fits" if check_exists_when_adding and not Path(path_str).exists(): logger.debug(f"{path_str} does not existing. Not adding. ") @@ -358,7 +375,12 @@ def get_wsclean_output_names( # The PSF is the same for all stokes if "psf" in output_types: psf_images = [ - Path(f"{prefix}-{subband_str}-psf.fits") for subband_str in subband_strs + ( + Path(f"{prefix}-{subband_str}-psf.fits") + if subband_str + else Path(f"{prefix}-psf.fits") + ) + for subband_str in subband_strs ] # Filter out files if they do not exists if check_exists_when_adding: @@ -379,17 +401,23 @@ def get_wsclean_output_names( def delete_wsclean_outputs( - prefix: str, output_type: str = "image", ignore_mfs: bool = True -) -> Collection[Path]: + prefix: str, + output_type: str = "image", + ignore_mfs: bool = True, + no_subbands: bool = False, +) -> List[Path]: """Attempt to remove elected wsclean output files + If ``no_subbands`` is True (as in ``channels_out`` is 1) then nothing is deleted. + Args: prefix (str): The prefix of the files to remove. This would correspond to the -name of wsclean. output_type (str, optional): What type of wsclean output to try to remove. Defaults to 'image'. ignore_mfs (bool, optional): If True, do not remove MFS outputs (attempt to, at least). Defaults to True. + no_subbands (bool, Optional): If True, nothing is deleted. Defaults to False. Returns: - Collection[Path]: The paths that were removed (or at least attempted to be removed)/ + List[Path]: The paths that were removed (or at least attempted to be removed)/ """ # TODO: This glob needs to be replaced with something more explicit paths = [Path(p) for p in glob(f"{prefix}-*{output_type}.fits")] @@ -397,6 +425,8 @@ def delete_wsclean_outputs( rm_paths: List[Path] = [] for path in paths: + if no_subbands: + continue if ignore_mfs and "-MFS-" in str(path.name): logger.info(f"{path=} appears to be an MFS product, not removing. ") continue @@ -411,6 +441,38 @@ def delete_wsclean_outputs( return rm_paths +# TODO: This should be expanded to denote levels of things to delete +# TODO: Need to create a regex based mode, and better handlijng of -MFS-, +# which is only created when -join-channels is used +def wsclean_cleanup_files( + prefix: Union[str, Path], + output_types: Optional[Tuple[str, ...]] = ("dirty", "psf", "model", "residual"), + single_channel: bool = False, +) -> Tuple[Path, ...]: + """Clean up (i.e. delete) files from wsclean. + + Args: + prefix (Union[str, Path]): The prefix used to search for files. This is generally the -name + output_types (Optional[Tuple[str]], optional): Which type of output wsclean products to delete. Defaults to ("dirty", "psf", "model", "residual"). + single_channel (bool, optional): Whether there is the subband part of the wsclean file names to consider. Defaults to False. + + Returns: + Tuple[Path, ...]: Set of files that were deleted + """ + rm_files = [] + logger.info(f"Removing wsclean files with {prefix=} {output_types=}") + + if output_types is not None: + for output_type in output_types: + rm_files += delete_wsclean_outputs( + prefix=str(prefix), + output_type=output_type, + no_subbands=single_channel and output_type == "image", + ) + + return tuple(rm_files) + + def create_wsclean_name_argument(wsclean_options: WSCleanOptions, ms: MS) -> Path: """Create the value that will be provided to wsclean -name argument. This has to be generated. Among things to consider is the desired output directory of imaging @@ -428,8 +490,11 @@ def create_wsclean_name_argument(wsclean_options: WSCleanOptions, ms: MS) -> Pat # Prepare the name for the output wsclean command # Construct the name property of the string - pol = wsclean_options_dict["pol"] - name_prefix_str = create_imaging_name_prefix(ms=ms, pol=pol) + pol = wsclean_options.pol + channel_range = wsclean_options.channel_range + name_prefix_str = create_imaging_name_prefix( + ms=ms, pol=pol, channel_range=channel_range + ) # Now resolve the directory part name_dir: Union[Path, str, None] = ms.path.parent @@ -454,11 +519,13 @@ class ResolvedCLIResult(NamedTuple): """Mapping results to provide to wsclean""" cmd: Optional[str] = None - """The argument value pair to place on the CLI""" + """The argument value pair to place on the CLI. """ unknown: Optional[Any] = None """Unknown options that could not be converted""" bindpath: Optional[Path] = None """A path to bind to when called within a container""" + ignore: bool = False + """Ignore this CLIResult if True""" def _resolve_wsclean_key_value_to_cli_str(key: str, value: Any) -> ResolvedCLIResult: @@ -494,7 +561,10 @@ def _resolve_wsclean_key_value_to_cli_str(key: str, value: Any) -> ResolvedCLIRe original_key = key key = key.replace("_", "-") - if key == "size": + if original_key.startswith("flint_"): + # No need to do anything more + return ResolvedCLIResult(ignore=True) + elif key == "size": cmd = f"-size {value} {value}" elif isinstance(value, bool): if value: @@ -567,11 +637,17 @@ def create_wsclean_cmd( unknowns: List[Tuple[Any, Any]] = [] logger.info("Creating wsclean command.") - cli_results = map( - _resolve_wsclean_key_value_to_cli_str, - wsclean_options_dict.keys(), - wsclean_options_dict.values(), + cli_results = list( + map( + _resolve_wsclean_key_value_to_cli_str, + wsclean_options_dict.keys(), + wsclean_options_dict.values(), + ) ) + + # Ignore any CLIResult if it has been explicitly instructed to + cli_results = [cli_result for cli_result in cli_results if not cli_result.ignore] + cmds = [cli_result.cmd for cli_result in cli_results if cli_result.cmd] unknowns = [cli_result.unknown for cli_result in cli_results if cli_result.unknown] bind_dir_paths += [ @@ -753,11 +829,18 @@ def run_wsclean_imager( """ ms = wsclean_cmd.ms + single_channel = wsclean_cmd.options.channels_out == 1 + wsclean_cleanup = wsclean_cmd.cleanup sclient_bind_dirs = [Path(ms.path).parent.absolute()] if bind_dirs: sclient_bind_dirs = sclient_bind_dirs + list(bind_dirs) + prefix = image_prefix_str if image_prefix_str else None + if prefix is None: + prefix = str(ms.path.parent / ms.path.name) + logger.warning(f"Setting prefix to {prefix}. Likely this is not correct. ") + if move_hold_directories: with hold_then_move_into( move_directory=move_hold_directories[0], @@ -769,11 +852,19 @@ def run_wsclean_imager( command=wsclean_cmd.cmd, bind_dirs=sclient_bind_dirs, stream_callback_func=_wsclean_output_callback, + ignore_logging_output=wsclean_cmd.options.flint_no_log_wsclean_output, ) - + if wsclean_cleanup: + rm_files = wsclean_cleanup_files( + prefix=prefix, single_channel=single_channel + ) + logger.info(f"Removed {len(rm_files)} files") + # No need to attempt to clean up again once files have been moved + + wsclean_cleanup = False # Update the prefix based on where the files will be moved to - image_prefix_str = ( - f"{str(move_hold_directories[0] / Path(image_prefix_str).name)}" + prefix = ( + f"{str(move_hold_directories[0] / Path(prefix).name)}" if image_prefix_str else None ) @@ -783,22 +874,15 @@ def run_wsclean_imager( command=wsclean_cmd.cmd, bind_dirs=sclient_bind_dirs, stream_callback_func=_wsclean_output_callback, + ignore_logging_output=wsclean_cmd.options.flint_no_log_wsclean_output, ) - prefix = image_prefix_str if image_prefix_str else None - if prefix is None: - prefix = str(ms.path.parent / ms.path.name) - logger.warning(f"Setting prefix to {prefix}. Likely this is not correct. ") + # prefix should be set at this point + assert prefix is not None, f"{prefix=}, which should not happen" - if wsclean_cmd.cleanup: + if wsclean_cleanup: logger.info("Will clean up files created by wsclean. ") - - for output_type in ("dirty", "psf", "model", "residual"): - delete_wsclean_outputs(prefix=prefix, output_type=output_type) - # for output_type in ("model", "residual"): - # delete_wsclean_outputs( - # prefix=prefix, output_type=output_type, ignore_mfs=False - # ) + rm_files = wsclean_cleanup_files(prefix=prefix, single_channel=single_channel) imageset = get_wsclean_output_names( prefix=prefix, diff --git a/flint/ms.py b/flint/ms.py index 2fdf08ba..473608f6 100644 --- a/flint/ms.py +++ b/flint/ms.py @@ -504,6 +504,55 @@ def consistent_ms(ms1: MS, ms2: MS) -> bool: return result +def consistent_channelwise_frequencies( + freqs: Union[List[np.ndarray], np.ndarray], +) -> np.ndarray: + """Given a collection of frequencies in the form of + (N, frequencies), inspect the frequencies channelwise + to ensure they are all the same. + + This does not operate on MSs, just the collection of frequencies + + Args: + freqs (Union[List[np.ndarray], np.ndarray]): The collection of frequencies to be inspected + + Returns: + np.ndarray: Same length as the frequencies. True if for a single channel all frequencies are the same. False otherwise. + """ + freqs = np.array(freqs) + assert ( + len(freqs.shape) == 2 + ), f"{freqs.shape=}, but was expecting something of rank 2" + + freqs_are_same = np.all(freqs - freqs[0, None] == 0, axis=1) + assert ( + len(freqs_are_same.shape) == 1 + ), f"Channelwise check should be length 1, but have {freqs_are_same.shaope=}" + return freqs_are_same + + +def consistent_ms_frequencies(mss: Tuple[MS, ...]) -> bool: + """Given a set of measurement sets, inspect the frequencies + to ensure they are all the same + + See the ``get_freqs_from_ms`` function, which is used + internally. + + Args: + mss (Tuple[MS, ...]): Collection of MSs to inspect the frequencies of + + Returns: + bool: Whether all the frequencies and their order are the same + """ + + logger.info(f"Collection frequencies from {len(mss)} measurement sets") + freqs = [get_freqs_from_ms(ms=ms) for ms in mss] + + all_the_same = consistent_channelwise_frequencies(freqs=freqs) + + return np.all(all_the_same) + + def rename_column_in_ms( ms: MS, original_column_name: str, @@ -574,7 +623,11 @@ def remove_columns_from_ms( def subtract_model_from_data_column( - ms: MS, model_column: str = "MODEL_DATA", data_column: Optional[str] = None + ms: MS, + model_column: str = "MODEL_DATA", + data_column: Optional[str] = None, + output_column: Optional[str] = None, + update_tracked_column: bool = False, ) -> MS: """Execute a ``taql`` query to subtract the MODEL_DATA from a nominated data column. This requires the ``model_column`` to already be inserted into the MS. Internally @@ -585,6 +638,8 @@ def subtract_model_from_data_column( ms (MS): The measurement set instance being considered model_column (str, optional): The column with representing the model. Defaults to "MODEL_DATA". data_column (Optional[str], optional): The column where the column will be subtracted. If ``None`` it is taken from the ``column`` nominated by the input ``MS`` instance. Defaults to None. + output_column (Optional[str], optional): The output column that will be created. If ``None`` it defaults to ``data_column``. Defaults to None. + update_tracked_column (bool, optional): If True, update ``ms.column`` to the column with subtracted data. Defaults to False. Returns: MS: The updated MS @@ -592,6 +647,9 @@ def subtract_model_from_data_column( ms = MS.cast(ms) data_column = data_column if data_column else ms.column assert data_column is not None, f"{data_column=}, which is not allowed" + + output_column = output_column if output_column else data_column + assert output_column is not None, f"{output_column=}, which is not allowed" with critical_ms_interaction(input_ms=ms.path) as critical_ms: with table(str(critical_ms), readonly=False) as tab: logger.info("Extracting columns") @@ -600,12 +658,27 @@ def subtract_model_from_data_column( [d in colnames for d in (model_column, data_column)] ), f"{model_column=} or {data_column=} missing from {colnames=}" + if output_column not in colnames: + from casacore.tables import makecoldesc + + logger.info(f"Adding {output_column=}") + desc = makecoldesc(data_column, tab.getcoldesc(data_column)) + desc["name"] = output_column + tab.addcols(desc) + tab.flush() + logger.info(f"Subtracting {model_column=} from {data_column=}") - taql(f"UPDATE $tab SET {data_column}={data_column}-{model_column}") + taql(f"UPDATE $tab SET {output_column}={data_column}-{model_column}") + if update_tracked_column: + logger.info(f"Updating ms.column to {output_column=}") + ms = ms.with_options(column=output_column) return ms +# TODO: Clean up the usage and description of the argument `instrument_column` +# as it is currently being used in unclear ways. Specifically there is a renaming +# of the data_column to instrument_column before the rotation of things def preprocess_askap_ms( ms: Union[MS, Path], data_column: str = "DATA", @@ -848,7 +921,10 @@ def rename_ms_and_columns_for_selfcal( def find_mss( - mss_parent_path: Path, expected_ms_count: Optional[int] = 36 + mss_parent_path: Path, + expected_ms_count: Optional[int] = 36, + data_column: Optional[str] = None, + model_column: Optional[str] = None, ) -> Tuple[MS, ...]: """Search a directory to find measurement sets via a simple `*.ms` glob expression. An expected number of MSs can be enforced @@ -857,6 +933,8 @@ def find_mss( Args: mss_parent_path (Path): The parent directory that will be globbed to search for MSs. expected_ms_count (Optional[int], optional): The number of MSs that should be there. If None no check is performed. Defaults to 36. + data_column (Optional[str], optional): Set the column attribute of each MS to this (no checks to ensure it exists). If None use default of MS. Defaults to None. + model_column (Optional[str], optional): Set the model column attribute of each MS to this (no checks to ensure it exists). If None use default of MS. Defaults to None. Returns: Tuple[MS, ...]: Collection of found MSs @@ -874,6 +952,15 @@ def find_mss( len(found_mss) == expected_ms_count ), f"Expected to find {expected_ms_count} in {str(mss_parent_path)}, found {len(found_mss)}." + if data_column or model_column: + logger.info(f"Updating column attribute to {data_column=}") + found_mss = tuple( + [ + found_ms.with_options(column=data_column, model_column=model_column) + for found_ms in found_mss + ] + ) + return found_mss diff --git a/flint/naming.py b/flint/naming.py index 76a7bb5b..4de5bf17 100644 --- a/flint/naming.py +++ b/flint/naming.py @@ -5,7 +5,7 @@ import re from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, NamedTuple, Optional, Union +from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union from flint.logging import logger from flint.options import MS @@ -53,13 +53,18 @@ def create_image_cube_name( return Path(output_cube_name) -def create_imaging_name_prefix(ms: Union[MS, Path], pol: Optional[str] = None) -> str: +def create_imaging_name_prefix( + ms: Union[MS, Path], + pol: Optional[str] = None, + channel_range: Optional[Tuple[int, int]] = None, +) -> str: """Given a measurement set and a polarisation, create the naming prefix to be used by some imager Args: ms (Union[MS,Path]): The measurement set being considered pol (Optional[str], optional): Whether a polarsation is being considered. Defaults to None. + channel_range (Optional[Tuple[int,int]], optional): The channel range that is going to be imaged. Defaults to none. Returns: str: The constructed string name @@ -70,6 +75,8 @@ def create_imaging_name_prefix(ms: Union[MS, Path], pol: Optional[str] = None) - name = ms_path.stem if pol: name = f"{name}.{pol.lower()}" + if channel_range: + name = f"{name}.ch{channel_range[0]}-{channel_range[1]}" return name diff --git a/flint/options.py b/flint/options.py index 4cab7c8a..03da6357 100644 --- a/flint/options.py +++ b/flint/options.py @@ -217,6 +217,56 @@ class BandpassOptions(BaseOptions): """Flag Jones matrix if any amplitudes with a Jones are above this value""" +class AddModelSubtractFieldOptions(BaseOptions): + """Options related to predicting a continuum model during the SubtractFieldOptions workflow. + Specifically these options deal with identifying the wsclean produced source list model, which + may be used by ``admodel`` to predict model visibilities. See utilities aroun the ``aocalibrate`` + functions and routines.""" + + attempt_addmodel: bool = False + """Invoke the ``addmodel`` visibility prediction, including the search for the ``wsclean`` source list""" + wsclean_pol_mode: List[str] = ["i"] + """The polarisation of the wsclean model that was generated""" + calibrate_container: Optional[Path] = None + """Path to the container with the calibrate software (including addmodel)""" + addmodel_cluster_config: Optional[Path] = None + """Specify a new cluster configuration file different to the preferred on. If None, drawn from preferred cluster config""" + + +class SubtractFieldOptions(BaseOptions): + """Container for options related to the + continuum-subtracted pipeline""" + + wsclean_container: Path + """Path to the container with wsclean""" + yandasoft_container: Path + """Path to the container with yandasoft""" + subtract_model_data: bool = False + """Subtract the MODEL_DATA column from the nominated data column""" + data_column: str = "CORRECTED_DATA" + """Describe the column that should be imaed and, if requested, have model subtracted from""" + expected_ms: int = 36 + """The number of measurement sets that should exist""" + imaging_strategy: Optional[Path] = None + """Path to a FLINT imaging yaml file that contains settings to use throughout imaging""" + holofile: Optional[Path] = None + """Path to the holography FITS cube that will be used when co-adding beams""" + linmos_residuals: bool = False + """Linmos the cleaning residuals together into a field image""" + beam_cutoff: float = 150 + """Cutoff in arcseconds to use when calculating the common beam to convol to""" + pb_cutoff: float = 0.1 + """Primary beam attenuation cutoff to use during linmos""" + stagger_delay_seconds: Optional[float] = None + """The delay, in seconds, that should be used when submitting items in batches (e.g. looping over channels)""" + attempt_subtract: bool = False + """Attempt to subtract the model column from the nominated data column""" + subtract_data_column: str = "DATA" + """Should the continuum model be subtracted, where to store the output""" + predict_wsclean_model: bool = False + """Search for the continuum model produced by wsclean and subtract""" + + class FieldOptions(BaseOptions): """Container that represents the flint related options that might be used throughout components related to the actual diff --git a/flint/prefect/common/imaging.py b/flint/prefect/common/imaging.py index cf00533a..b7e1a3f6 100644 --- a/flint/prefect/common/imaging.py +++ b/flint/prefect/common/imaging.py @@ -8,6 +8,7 @@ from typing import Any, Collection, Dict, List, Literal, Optional, TypeVar, Union, Tuple import pandas as pd +import numpy as np from prefect import task, unmapped from prefect.artifacts import create_table_artifact @@ -52,7 +53,7 @@ get_fits_cube_from_paths, processed_ms_format, ) -from flint.options import FieldOptions +from flint.options import FieldOptions, SubtractFieldOptions from flint.peel.potato import potato_peel from flint.prefect.common.utils import upload_image_as_artifact from flint.selfcal.casa import gaincal_applycal_ms @@ -283,6 +284,7 @@ def task_wsclean_imager( wsclean_container: Path, update_wsclean_options: Optional[Dict[str, Any]] = None, fits_mask: Optional[FITSMaskNames] = None, + channel_range: Optional[Tuple[int, int]] = None, ) -> WSCleanCommand: """Run the wsclean imager against an input measurement set @@ -291,6 +293,7 @@ def task_wsclean_imager( wsclean_container (Path): Path to a singularity container with wsclean packages update_wsclean_options (Optional[Dict[str, Any]], optional): Options to update from the default wsclean options. Defaults to None. fits_mask (Optional[FITSMaskNames], optional): A path to a clean guard mask. Defaults to None. + channel_range (Optional[Tuple[int,int]], optional): Add to the wsclean options the specific channel range to be imaged. Defaults to None. Returns: WSCleanCommand: A resulting wsclean command and resulting meta-data @@ -306,6 +309,9 @@ def task_wsclean_imager( if fits_mask: update_wsclean_options["fits_mask"] = fits_mask.mask_fits + if channel_range: + update_wsclean_options["channel_range"] = channel_range + logger.info(f"wsclean inager {ms=}") try: return wsclean_imager( @@ -394,6 +400,11 @@ def task_get_common_beam( ) beam_shape = get_common_beam(image_paths=images_to_consider, cutoff=cutoff) + if np.isnan(beam_shape.bmaj_arcsec): + logger.critical("Failed to get beam resolution for:") + logger.critical(f"{images_to_consider=}") + logger.critical(f"{cutoff=}") + logger.critical(f"{filter=}") return beam_shape @@ -495,6 +506,7 @@ def task_convolve_image( mode: str = "image", filter: Optional[str] = None, convol_suffix_str: str = "conv", + remove_original_images: bool = False, ) -> Collection[Path]: """Convolve images to a specified resolution @@ -504,6 +516,7 @@ def task_convolve_image( cutoff (float, optional): Maximum major beam axis an image is allowed to have before it will not be convolved. Defaults to 60. filter (Optional[str], optional): This string must be contained in the image path for it to be convolved. Defaults to None. convol_suffix_str (str, optional): The suffix added to the convolved images. Defaults to 'conv'. + remove_original_images (bool, optional): If True remove the original image after they have been convolved. Defaults to False. Returns: Collection[Path]: Path to the output images that have been convolved. @@ -557,13 +570,19 @@ def task_convolve_image( f"{str(image_path.name)}: {image_beam.major.to(u.arcsecond)} {image_beam.minor.to(u.arcsecond)} {image_beam.pa}" ) - return convolve_images( + convolved_images = convolve_images( image_paths=image_paths, beam_shape=beam_shape, cutoff=cutoff, convol_suffix=convol_suffix_str, ) + if remove_original_images: + logger.info(f"Removing {len(image_paths)} input images") + _ = [image_path.unlink() for image_path in image_paths] # type: ignore + + return convolved_images + @task def task_linmos_images( @@ -577,6 +596,7 @@ def task_linmos_images( parset_output_path: Optional[str] = None, cutoff: float = 0.05, field_summary: Optional[FieldSummary] = None, + trim_linmos_fits: bool = True, ) -> LinmosCommand: """Run the yandasoft linmos task against a set of input images @@ -591,6 +611,7 @@ def task_linmos_images( parset_output_path (Optional[str], optional): Location to write the linmos parset file to. Defaults to None. cutoff (float, optional): The primary beam attenuation cutoff supplied to linmos when coadding. Defaults to 0.05. field_summary (Optional[FieldSummary], optional): The summary of the field, including (importantly) to orientation of the third-axis. Defaults to None. + trim_linmos_fits (bool, optional): Attempt to trim the output linmos files of as much empty space as possible. Defaults to True. Returns: LinmosCommand: The linmos command and associated meta-data @@ -649,6 +670,7 @@ def task_linmos_images( holofile=holofile, cutoff=cutoff, pol_axis=pol_axis, + trim_linmos_fits=trim_linmos_fits, ) return linmos_cmd @@ -657,12 +679,14 @@ def task_linmos_images( def _convolve_linmos( wsclean_cmds: Collection[WSCleanCommand], beam_shape: BeamShape, - field_options: FieldOptions, + field_options: Union[FieldOptions, SubtractFieldOptions], linmos_suffix_str: str, field_summary: Optional[FieldSummary] = None, convol_mode: str = "image", convol_filter: str = ".MFS.", convol_suffix_str: str = "conv", + trim_linmos_fits: bool = True, + remove_original_images: bool = False, ) -> LinmosCommand: """An internal function that launches the convolution to a common resolution and subsequent linmos of the wsclean residual images. @@ -676,6 +700,8 @@ def _convolve_linmos( convol_mode (str, optional): The mode passed to the convol task to describe the images to extract. Support image or residual. Defaults to image. convol_filter (str, optional): A text file applied when assessing images to co-add. Defaults to '.MFS.'. convol_suffix_str (str, optional): The suffix added to the convolved images. Defaults to 'conv'. + trim_linmos_fits (bool, optional): Attempt to trim the output linmos files of as much empty space as possible. Defaults to True. + remove_original_images (bool, optional): If True remove the original image after they have been convolved. Defaults to False. Returns: LinmosCommand: Resulting linmos command parset @@ -688,6 +714,7 @@ def _convolve_linmos( mode=convol_mode, filter=convol_filter, convol_suffix_str=convol_suffix_str, + remove_original_images=remove_original_images, ) assert field_options.yandasoft_container is not None parset = task_linmos_images.submit( @@ -697,6 +724,8 @@ def _convolve_linmos( holofile=field_options.holofile, cutoff=field_options.pb_cutoff, field_summary=field_summary, + trim_linmos_fits=trim_linmos_fits, + filter=convol_filter, ) # type: ignore return parset diff --git a/flint/prefect/flows/continuum_pipeline.py b/flint/prefect/flows/continuum_pipeline.py index 4e6dbe6d..597926f2 100644 --- a/flint/prefect/flows/continuum_pipeline.py +++ b/flint/prefect/flows/continuum_pipeline.py @@ -15,10 +15,9 @@ from flint.catalogue import verify_reference_catalogues from flint.coadd.linmos import LinmosCommand from flint.configuration import ( + _load_and_copy_strategy, Strategy, - copy_and_timestamp_strategy_file, get_options_from_strategy, - load_strategy_yaml, ) from flint.logging import logger from flint.masking import consider_beam_mask_round @@ -140,32 +139,6 @@ def _check_create_output_split_science_path( return output_split_science_path -def _load_and_copy_strategy( - output_split_science_path: Path, imaging_strategy: Optional[Path] = None -) -> Union[Strategy, None]: - """Load a strategy file and copy a timestamped version into the output directory - that would contain the science processing. - - Args: - output_split_science_path (Path): Where the strategy file should be copied to (where the data would be processed) - imaging_strategy (Optional[Path], optional): Location of the strategy file. Defaults to None. - - Returns: - Union[Strategy, None]: The loadded strategy file if provided, `None` otherwise - """ - return ( - load_strategy_yaml( - input_yaml=copy_and_timestamp_strategy_file( - output_dir=output_split_science_path, - input_yaml=imaging_strategy, - ), - verify=True, - ) - if imaging_strategy - else None - ) - - @flow(name="Flint Continuum Pipeline") def process_science_fields( science_path: Path, @@ -199,7 +172,7 @@ def process_science_fields( archive_wait_for: List[Any] = [] - strategy = _load_and_copy_strategy( + strategy: Optional[Strategy] = _load_and_copy_strategy( output_split_science_path=output_split_science_path, imaging_strategy=field_options.imaging_strategy, ) diff --git a/flint/prefect/flows/subtract_cube_pipeline.py b/flint/prefect/flows/subtract_cube_pipeline.py new file mode 100644 index 00000000..62827b34 --- /dev/null +++ b/flint/prefect/flows/subtract_cube_pipeline.py @@ -0,0 +1,366 @@ +"""This is a workflow to subtract a continuum model and image the channel-wise data + +Unlike the continuum imaging and self-calibnration pipeline this flow currently +expects that all measurement sets are in the flint format, which means other than +the naming scheme that they have been been preprocessed to place them in the IAU +frame and have had their fields table updated. That is to say that they have +already been preprocessed and fixed. +""" + +from pathlib import Path +from time import sleep +from typing import Tuple, Optional, Union + +import numpy as np +from configargparse import ArgumentParser +from prefect import flow, unmapped, task + +from flint.configuration import _load_and_copy_strategy +from flint.exceptions import FrequencyMismatchError +from flint.prefect.clusters import get_dask_runner +from flint.logging import logger +from flint.ms import ( + MS, + find_mss, + consistent_ms_frequencies, + get_freqs_from_ms, + subtract_model_from_data_column, +) +from flint.options import ( + AddModelSubtractFieldOptions, + SubtractFieldOptions, + add_options_to_parser, + create_options_from_parser, +) +from flint.prefect.common.imaging import ( + task_wsclean_imager, + task_get_common_beam, + _convolve_linmos, +) +from flint.naming import get_sbid_from_path + + +def _check_and_verify_options( + options: Union[AddModelSubtractFieldOptions, SubtractFieldOptions], +) -> None: + """Verrify that the options supplied to run the subtract field options make sense""" + if isinstance(options, SubtractFieldOptions): + assert ( + options.wsclean_container.exists() and options.wsclean_container.is_file() + ), f"{options.wsclean_container=} does not exist or is not a file" + assert ( + options.yandasoft_container.exists() + and options.yandasoft_container.is_file() + ), f"{options.yandasoft_container=} does not exist or is not a file" + if isinstance(options, AddModelSubtractFieldOptions): + if options.attempt_addmodel: + assert ( + options.calibrate_container is not None + ), "Calibrate container path is needede for addmodel" + assert ( + options.calibrate_container.exists() + and options.calibrate_container.is_file() + ), f"Calibrate container {options.calibrate_container} is not a file" + + +def find_mss_to_image( + mss_parent_path: Path, + expected_ms_count: Optional[int] = None, + data_column: str = "CORRECTED_DATA", + model_column: str = "MODEL_DATA", +) -> Tuple[MS, ...]: + """Search for MSs to image. See ``flint.ms.find_mss`` for further details. + + Args: + mss_parent_path (Path): Path to search for MSs in + expected_ms_count (Optional[int], optional): Expected number of measurement sets to find. Defaults to None. + data_column (str, optional): The nominated data column that should eb set. Defaults to "CORRECTED_DATA". + model_column (str, optional): The nominated model data column that should be set. Defaults to "MODEL_DATA". + + Returns: + Tuple[MS, ...]: Collect of MSs + """ + science_mss = find_mss( + mss_parent_path=mss_parent_path, + expected_ms_count=expected_ms_count, + data_column=data_column, + model_column=model_column, + ) + logger.info(f"Found {science_mss=}") + return science_mss + + +def find_and_setup_mss( + science_path_or_mss: Union[Path, Tuple[MS, ...]], + expected_ms_count: int, + data_column: str, +) -> Tuple[MS, ...]: + """Search for MSs in a directory and, if necessary, perform checks around + their consistency. If the input data appear to be collection of MSs already + assume they have already been set and checked for consistency. + + Args: + science_path_or_mss (Union[Path, List[MS, ...]]): Path to search or existing MSs + expected_ms_count (int): Expected number of MSs to find + data_column (str): The data column to nominate if creating MSs after searching + + Raises: + FrequencyMismatchError: Raised when frequency information is not consistent + + Returns: + Tuple[MS, ...]: Collection of MSs + """ + + if isinstance(science_path_or_mss, (list, tuple)): + logger.info("Already loaded MSs") + return tuple(sms for sms in science_path_or_mss) + + # Find the MSs + # - optionally untar? + science_mss = find_mss_to_image( + mss_parent_path=science_path_or_mss, + expected_ms_count=expected_ms_count, + data_column=data_column, + ) + + # 2 - ensure matchfing frequencies over channels + consistent_frequencies_across_mss = consistent_ms_frequencies(mss=science_mss) + if not consistent_frequencies_across_mss: + logger.critical("Mismatch in frequencies among provided MSs") + raise FrequencyMismatchError("There is a mismatch in frequencies") + + return science_mss + + +task_subtract_model_from_ms = task(subtract_model_from_data_column) + + +@task +def task_addmodel_to_ms( + ms: MS, addmodel_subtract_options: AddModelSubtractFieldOptions +) -> MS: + from flint.imager.wsclean import get_wsclean_output_source_list_path + from flint.calibrate.aocalibrate import AddModelOptions, add_model + + logger.info(f"Searching for wsclean source list for {ms.path}") + for idx, pol in enumerate(addmodel_subtract_options.wsclean_pol_mode): + wsclean_source_list_path = get_wsclean_output_source_list_path( + name_path=ms.path, pol=pol + ) + assert ( + wsclean_source_list_path.exists() + ), f"{wsclean_source_list_path=} was requested, but does not exist" + + # This should attempt to add model of different polarisations together. + # But to this point it is a future proof and is not tested. + addmodel_options = AddModelOptions( + model_path=wsclean_source_list_path, + ms_path=ms.path, + mode="c" if idx == 0 else "a", + datacolumn="MODEL_DATA", + ) + assert ( + addmodel_subtract_options.calibrate_container is not None + ), f"{addmodel_subtract_options.calibrate_container=}, which should not happen" + add_model( + add_model_options=addmodel_options, + container=addmodel_subtract_options.calibrate_container, + remove_datacolumn=idx == 0, + ) + + return ms.with_options(model_column="MODEL_DATA") + + +@flow +def flow_addmodel_to_mss( + science_path_or_mss: Union[Path, Tuple[MS, ...]], + addmodel_subtract_field_options: AddModelSubtractFieldOptions, + expected_ms: int, + data_column: str, +) -> Tuple[MS, ...]: + """Separate flow to perform the potentially expensive model prediction + into MSs""" + _check_and_verify_options(options=addmodel_subtract_field_options) + + # Get the MSs that will have their model added to + science_mss = find_and_setup_mss( + science_path_or_mss=science_path_or_mss, + expected_ms_count=expected_ms, + data_column=data_column, + ) + science_mss = task_addmodel_to_ms.map( + ms=science_mss, + addmodel_subtract_options=unmapped(addmodel_subtract_field_options), + ) + + return science_mss + + +@flow +def flow_subtract_cube( + science_path: Path, + subtract_field_options: SubtractFieldOptions, + addmodel_subtract_field_options: AddModelSubtractFieldOptions, +) -> None: + strategy = _load_and_copy_strategy( + output_split_science_path=science_path, + imaging_strategy=subtract_field_options.imaging_strategy, + ) + _check_and_verify_options(options=subtract_field_options) + _check_and_verify_options(options=addmodel_subtract_field_options) + + # Find the MSs + # - optionally untar? + science_mss = find_and_setup_mss( + science_path_or_mss=science_path, + expected_ms_count=subtract_field_options.expected_ms, + data_column=subtract_field_options.data_column, + ) + + # 2.5 - Continuum subtract if requested + + freqs_mhz = get_freqs_from_ms(ms=science_mss[0]) / 1e6 + logger.info( + f"Considering {len(freqs_mhz)} frequencies from {len(science_mss)} channels, minimum {np.min(freqs_mhz)}-{np.max(freqs_mhz)}" + ) + if len(freqs_mhz) > 20 and subtract_field_options.stagger_delay_seconds is None: + logger.critical( + f"{len(freqs_mhz)} channels and no stagger delay set! Consider setting a stagger delay" + ) + + if addmodel_subtract_field_options.attempt_addmodel: + # science_mss = task_addmodel_to_ms.map( + # ms=science_mss, + # addmodel_subtract_options=unmapped(addmodel_subtract_field_options), + # ) + assert ( + addmodel_subtract_field_options.addmodel_cluster_config is not None + ), f"{addmodel_subtract_field_options.addmodel_cluster_config=}, which should not happen" + addmodel_dask_runner = get_dask_runner( + cluster=addmodel_subtract_field_options.addmodel_cluster_config + ) + science_mss = flow_addmodel_to_mss.with_options( + task_runner=addmodel_dask_runner, name="Predict -- Addmodel" + )( + science_path_or_mss=science_mss, + addmodel_subtract_field_options=addmodel_subtract_field_options, + expected_ms=subtract_field_options.expected_ms, + data_column=subtract_field_options.data_column, + ) + + if subtract_field_options.attempt_subtract: + science_mss = task_subtract_model_from_ms.map( + ms=science_mss, + data_column=subtract_field_options.subtract_data_column, + update_tracked_column=True, + ) + + channel_parset_list = [] + for channel, freq_mhz in enumerate(freqs_mhz): + logger.info(f"Imaging {channel=} {freq_mhz=}") + channel_range = (channel, channel + 1) + channel_wsclean_cmds = task_wsclean_imager.map( + in_ms=science_mss, + wsclean_container=subtract_field_options.wsclean_container, + channel_range=unmapped(channel_range), + strategy=unmapped(strategy), + mode="wsclean", + operation="subtractcube", + ) + channel_beam_shape = task_get_common_beam.submit( + wsclean_cmds=channel_wsclean_cmds, + cutoff=subtract_field_options.beam_cutoff, + filter="image.", + ) + channel_parset = _convolve_linmos( + wsclean_cmds=channel_wsclean_cmds, + beam_shape=channel_beam_shape, + linmos_suffix_str=f"ch{channel_range[0]}-{channel_range[1]}", + field_options=subtract_field_options, + convol_mode="image", + convol_filter="image.", + convol_suffix_str="optimal.image", + trim_linmos_fits=False, + remove_original_images=True, + ) + channel_parset_list.append(channel_parset) + + if subtract_field_options.stagger_delay_seconds: + sleep(subtract_field_options.stagger_delay_seconds) + + # 4 - cube concatenated each linmos field together to single file + + return + + +def setup_run_subtract_flow( + science_path: Path, + subtract_field_options: SubtractFieldOptions, + addmodel_subtract_field_options: AddModelSubtractFieldOptions, + cluster_config: Path, +) -> None: + logger.info(f"Processing {science_path=}") + science_sbid = get_sbid_from_path(path=science_path) + + dask_runner = get_dask_runner(cluster=cluster_config) + + flow_subtract_cube.with_options( + task_runner=dask_runner, name=f"Subtract Cube Pipeline -- {science_sbid}" + )( + science_path=science_path, + subtract_field_options=subtract_field_options, + addmodel_subtract_field_options=addmodel_subtract_field_options, + ) + + +def get_parser() -> ArgumentParser: + parser = ArgumentParser(description=__doc__) + parser.add_argument( + "--cli-config", is_config_file=True, help="Path to configuration file" + ) + parser.add_argument( + "science_path", + type=Path, + help="Path to the directory containing the beam-wise measurement sets", + ) + parser.add_argument( + "--cluster-config", + type=str, + default="petrichor", + help="Path to a cluster configuration file, or a known cluster name. ", + ) + + parser = add_options_to_parser(parser=parser, options_class=SubtractFieldOptions) + parser = add_options_to_parser( + parser=parser, options_class=AddModelSubtractFieldOptions + ) + + return parser + + +def cli() -> None: + parser = get_parser() + + args = parser.parse_args() + + subtract_field_options = create_options_from_parser( + parser_namespace=args, options_class=SubtractFieldOptions + ) + addmodel_subtract_field_options = create_options_from_parser( + parser_namespace=args, options_class=AddModelSubtractFieldOptions + ) + if addmodel_subtract_field_options.addmodel_cluster_config is None: + addmodel_subtract_field_options.with_options( + addmodel_cluster_config=args.cluster_config + ) + + setup_run_subtract_flow( + science_path=args.science_path, + subtract_field_options=subtract_field_options, + addmodel_subtract_field_options=addmodel_subtract_field_options, + cluster_config=args.cluster_config, + ) + + +if __name__ == "__main__": + cli() diff --git a/flint/sclient.py b/flint/sclient.py index fa0426c4..b337b965 100644 --- a/flint/sclient.py +++ b/flint/sclient.py @@ -16,6 +16,7 @@ def run_singularity_command( command: str, bind_dirs: Optional[Union[Path, Collection[Path]]] = None, stream_callback_func: Optional[Callable] = None, + ignore_logging_output: bool = False, ) -> None: """Executes a command within the context of a nominated singularity container @@ -25,6 +26,7 @@ def run_singularity_command( command (str): The command to execute bind_dirs (Optional[Union[Path,Collection[Path]]], optional): Specifies a Path, or list of Paths, to bind to in the container. Defaults to None. stream_callback_func (Optional[Callable], optional): Provide a function that is applied to each line of output text when singularity is running and `stream=True`. IF provide it should accept a single (string) parameter. If None, nothing happens. Defaultds to None. + ignore_logging_output (bool, optional): If `True` output from the executed singularity command is not logged. Defaults to False. Raises: FileNotFoundError: Thrown when container image not found @@ -65,7 +67,8 @@ def run_singularity_command( ) for line in output: - logger.info(line.rstrip()) + if not ignore_logging_output: + logger.info(line.rstrip()) if stream_callback_func: stream_callback_func(line) @@ -103,6 +106,7 @@ def wrapper( container: Path, bind_dirs: Optional[Union[Path, Collection[Path]]] = None, stream_callback_func: Optional[Callable] = None, + ignore_logging_output: bool = False, **kwargs, ) -> str: """Function that can be used as a decorator on an input function. This function @@ -113,6 +117,7 @@ def wrapper( container (Path): Path to the container that will be usede to execute the generated command bind_dirs (Optional[Union[Path,Collection[Path]]], optional): Specifies a Path, or list of Paths, to bind to in the container. Defaults to None. stream_callback_func (Optional[Callable], optional): Provide a function that is applied to each line of output text when singularity is running and `stream=True`. IF provide it should accept a single (string) parameter. If None, nothing happens. Defaultds to None. + ignore_logging_output (bool, optional): If `True` output from the executed singularity command is not logged. Defaults to False. Returns: str: The command that was executed @@ -129,6 +134,7 @@ def wrapper( image=container, command=f"{task_str}", bind_dirs=bind_dirs, + ignore_logging_output=ignore_logging_output, stream_callback_func=stream_callback_func, ) diff --git a/pyproject.toml b/pyproject.toml index c2a6eb05..ea684120 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,7 @@ flint_potato = "flint.peel.potato:cli" flint_leakage = "flint.leakage:cli" flint_flow_bandpass_calibrate = "flint.prefect.flows.bandpass_pipeline:cli" flint_flow_continuum_pipeline = "flint.prefect.flows.continuum_pipeline:cli" -flint_flow_continuum_mask_pipeline = "flint.prefect.flows.continuum_mask_pipeline:cli" +flint_flow_subtract_cube_pipeline = "flint.prefect.flows.subtract_cube_pipeline:cli" [[tool.mypy.overrides]] module = "astropy.*" diff --git a/tests/test_ms.py b/tests/test_ms.py index 7a980a93..0231a840 100644 --- a/tests/test_ms.py +++ b/tests/test_ms.py @@ -15,6 +15,7 @@ MS, check_column_in_ms, copy_and_preprocess_casda_askap_ms, + consistent_channelwise_frequencies, find_mss, get_phase_dir_from_ms, remove_columns_from_ms, @@ -24,7 +25,28 @@ from flint.utils import get_packaged_resource_path +def test_consistent_channelwise_frequencies(): + """Some steps the channels in a set of MSs all need + to be the same, in the same relative order""" + + # Make up some floating point numbners + freqs = [np.arange(100) * np.pi * 5 for _ in list(range(46))] + result = consistent_channelwise_frequencies(freqs=freqs) + assert np.all(result) + + result = consistent_channelwise_frequencies(freqs=np.array(freqs)) + assert np.all(result) + + freqs[10][4] = 4444444.0 + result = consistent_channelwise_frequencies(freqs=np.array(freqs)) + assert not result[10] + assert np.all(result[11:]) + assert np.all(result[:10]) + + def test_find_mss(tmpdir): + """Make sure that the glob finding of the MSs can actually find + the MSs. The expected count check is also assessed""" tmpdir = Path(tmpdir) for name in range(45): new_ms = tmpdir / f"SB1234.Pirate_1234+456.beam{name}.ms" @@ -40,6 +62,26 @@ def test_find_mss(tmpdir): _ = find_mss(mss_parent_path=tmpdir, expected_ms_count=49005) +def test_find_mss_withdatacolumn(tmpdir): + """Same as above but with setting a data column""" + tmpdir = Path(tmpdir) / "Another_Pirate" + for name in range(45): + new_ms: Path = tmpdir / f"SB1234.Pirate_1234+456.beam{name}.ms" + new_ms.mkdir(parents=True, exist_ok=True) + + new_folder = tmpdir / f"not_and_ms_{name}.folder" + new_folder.mkdir() + + res = find_mss( + mss_parent_path=tmpdir, expected_ms_count=45, data_column="BlackBeard" + ) + assert len(res) == 45 + assert all([ms.column == "BlackBeard" for ms in res]) + + with pytest.raises(AssertionError): + _ = find_mss(mss_parent_path=tmpdir, expected_ms_count=49005) + + @pytest.fixture def casda_example(tmpdir): ms_zip = Path( @@ -367,3 +409,61 @@ def test_subtract_model_from_data_column_ms_column(tmpdir): with table(str(ms.path)) as tab: data = tab.getcol("DATA") assert np.all(data == 0 + 0j) + + +def test_subtract_model_from_data_column_ms_column_new_column(tmpdir): + """Ensure we can subtact the model from the data via taql. This test will + add a new column via taql and will ensure the result is as expected. + + >>> NEW_COLUMN=DATA-MODEL_DATA + """ + ms_zip = Path( + get_packaged_resource_path( + package="flint.data.tests", + filename="scienceData.EMU_0529-60.SB50538.EMU_0529-60.beam08_averaged_cal.leakage.ms.zip", + ) + ) + outpath = Path(tmpdir) / "taqlsubtract3" + + shutil.unpack_archive(ms_zip, outpath) + + ms_path = ( + Path(outpath) + / "scienceData.EMU_0529-60.SB50538.EMU_0529-60.beam08_averaged_cal.leakage.ms" + ) + + ms = Path(ms_path) + assert ms.exists() + ms = MS(path=ms, column="DATA") + + from casacore.tables import maketabdesc, makearrcoldesc + + with table(str(ms.path), readonly=False) as tab: + data = tab.getcol("DATA") + ones = np.ones_like(data, dtype=data.dtype) + + tab.putcol(columnname="DATA", value=ones) + + if "MODEL_DATA" not in tab.colnames(): + coldesc = tab.getdminfo("DATA") + coldesc["NAME"] = "MODEL_DATA" + tab.addcols( + maketabdesc(makearrcoldesc("MODEL_DATA", 0.0 + 0j, ndim=2)), coldesc + ) + tab.flush() + tab.putcol(columnname="MODEL_DATA", value=ones) + tab.flush() + + colnames = tab.colnames() + + assert "NEW_COLUMN" not in colnames, "Column already exists" + + ms = subtract_model_from_data_column( + ms=ms, model_column="MODEL_DATA", data_column="DATA", output_column="NEW_COLUMN" + ) + with table(str(ms.path)) as tab: + data = tab.getcol("NEW_COLUMN") + assert np.all(data == 0 + 0j) + + data = tab.getcol("DATA") + assert np.all(data == ones) diff --git a/tests/test_naming.py b/tests/test_naming.py index 948792ce..2c9f9328 100644 --- a/tests/test_naming.py +++ b/tests/test_naming.py @@ -5,6 +5,7 @@ import pytest +from flint.ms import MS from flint.naming import ( CASDANameComponents, FITSMaskNames, @@ -14,6 +15,7 @@ casda_ms_format, create_fits_mask_names, create_image_cube_name, + create_imaging_name_prefix, create_ms_name, extract_beam_from_name, extract_components_from_name, @@ -28,6 +30,25 @@ ) +def test_create_imaging_name_prefix(): + """Creates the name that will be used for output image + products""" + ms = MS.cast(ms=Path("/Jack/Sparrow/SB63789.EMU_1743-51.beam03.round4.ms")) + + name = create_imaging_name_prefix(ms=ms) + assert name == "SB63789.EMU_1743-51.beam03.round4" + + for pol in ("I", "i"): + name = create_imaging_name_prefix(ms=ms, pol=pol) + assert name == "SB63789.EMU_1743-51.beam03.round4.i" + + name = create_imaging_name_prefix(ms=ms, pol=pol, channel_range=(100, 108)) + assert name == "SB63789.EMU_1743-51.beam03.round4.i.ch100-108" + + name = create_imaging_name_prefix(ms=ms, channel_range=(100, 108)) + assert name == "SB63789.EMU_1743-51.beam03.round4.ch100-108" + + def test_get_cube_fits_from_paths(): """Identify the files that contain the cube field and are fits""" files = [ diff --git a/tests/test_prefect_subtractcube_flow.py b/tests/test_prefect_subtractcube_flow.py new file mode 100644 index 00000000..0e666e7d --- /dev/null +++ b/tests/test_prefect_subtractcube_flow.py @@ -0,0 +1,9 @@ +"""Tests around the subtract cube imaging flow""" + +from flint.prefect.flows.subtract_cube_pipeline import get_parser + + +def test_get_parser(): + """Make sure the parser can actually be builts""" + # This is a silly one but I was bitten by it not working, arrrrhh matey + _ = get_parser() diff --git a/tests/test_wsclean.py b/tests/test_wsclean.py index 7fcdc943..42d5c60f 100644 --- a/tests/test_wsclean.py +++ b/tests/test_wsclean.py @@ -181,6 +181,14 @@ def test_regex_rename_wsclean_title(): out_ex = "SB39400.RACS_0635-31.beam33.poli.i.MFS.image" assert _rename_wsclean_title(name_str=ex) == out_ex + ex = "SB39400.RACS_0635-31.beam33.ch109-110-i-MFS-image" + out_ex = "SB39400.RACS_0635-31.beam33.ch109-110.i.MFS.image" + assert _rename_wsclean_title(name_str=ex) == out_ex + + ex = "SB39400.RACS_0635-31.beam33.i.ch109-110-i-MFS-image" + out_ex = "SB39400.RACS_0635-31.beam33.i.ch109-110.i.MFS.image" + assert _rename_wsclean_title(name_str=ex) == out_ex + def test_regex_stokes_wsclean_title(): """Test whether all stokes values are picked up properly""" @@ -284,6 +292,13 @@ def test_resolve_key_value_to_cli(): assert res.bindpath is None assert res.unknown == ("temp_dir", unknown) + ignore = WSCleanOptions + res = _resolve_wsclean_key_value_to_cli_str("flint_this_should_be_ignored", ignore) + assert res.cmd is None + assert res.bindpath is None + assert res.unknown is None + assert res.ignore + def test_create_wsclean_name(ms_example): """Test the creation of a wsclean name argument""" @@ -465,3 +480,22 @@ def test_wsclean_output_named_nomfs(): assert image_set.psf is not None assert len(image_set.psf) == 4 assert isinstance(image_set.psf[0], Path) + + +def test_wsclean_names_no_subbands(): + """The spectral line modes image per channel, so there is therefore no subband type + in the wsclean named output""" + image_set = get_wsclean_output_names( + prefix="JackSparrow", subbands=1, include_mfs=False + ) + + assert isinstance(image_set, ImageSet) + assert image_set.prefix == "JackSparrow" + + assert image_set.image + assert len(image_set.image) == 1 + assert image_set.image[0] == Path("JackSparrow-image.fits") + + assert image_set.psf + assert len(image_set.psf) == 1 + assert image_set.psf[0] == Path("JackSparrow-psf.fits")