From 46c58bf2af256ec9df00af644bd5c9eb40fe6bb3 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Mon, 18 Nov 2024 14:27:13 +1100 Subject: [PATCH 01/16] added the wsclan save source list opion --- flint/imager/wsclean.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index ce53f761..9187452d 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -156,6 +156,8 @@ class WSCleanOptions(BaseOptions): """The path to a temporary directory where files will be wrritten. """ pol: str = "i" """The polarisation to be imaged""" + save_source_list: bool = False + """Saves the found clean components as a BBS/DP3 text sky model""" class WSCleanCommand(NamedTuple): From d897c125c9fc8687ad180bb4b83ee3b2eb320274 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Tue, 19 Nov 2024 11:25:17 +1100 Subject: [PATCH 02/16] added wait for before archiving --- flint/prefect/flows/continuum_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint/prefect/flows/continuum_pipeline.py b/flint/prefect/flows/continuum_pipeline.py index c7d3cd37..7b236992 100644 --- a/flint/prefect/flows/continuum_pipeline.py +++ b/flint/prefect/flows/continuum_pipeline.py @@ -307,7 +307,7 @@ def process_science_fields( beam_summaries = task_create_beam_summary.map( ms=preprocess_science_mss, imageset=wsclean_cmds ) - + archive_wait_for.extend(beam_summaries) archive_wait_for.extend(wsclean_cmds) beam_aegean_outputs = None From 3cf274b495bfd08f536969bbb621ccb435ed43e5 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Tue, 19 Nov 2024 11:57:39 +1100 Subject: [PATCH 03/16] wait_for=archive_wait_for for tarring --- flint/prefect/flows/continuum_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint/prefect/flows/continuum_pipeline.py b/flint/prefect/flows/continuum_pipeline.py index 7b236992..2cc1ec1f 100644 --- a/flint/prefect/flows/continuum_pipeline.py +++ b/flint/prefect/flows/continuum_pipeline.py @@ -497,7 +497,7 @@ def process_science_fields( # zip up the final measurement set, which is not included in the above loop if field_options.zip_ms: - task_zip_ms.map(in_item=wsclean_cmds) + task_zip_ms.map(in_item=wsclean_cmds, wait_for=archive_wait_for) if field_options.sbid_archive_path or field_options.sbid_copy_path and run_aegean: update_archive_options = get_options_from_strategy( From f2b318835704b237246b53deab638b1214b45e85 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Tue, 19 Nov 2024 17:52:35 +1100 Subject: [PATCH 04/16] added function to derive expected source list text name / tests --- flint/imager/wsclean.py | 34 ++++++++++++++++++++++++++++++++++ tests/test_wsclean.py | 26 ++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 9187452d..80018829 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -181,6 +181,40 @@ def with_options(self, **kwargs) -> WSCleanCommand: return WSCleanCommand(**_dict) +def get_wsclean_output_source_list_path(name_path: Path, pol: str = "i") -> Path: + """WSClean can produce a text file that describes the components + that it cleaned, their type, scale and brightness. These are + placed in a file that is: + + >> {name}.{pol}-sources.txt + + where ``name`` represented the `-name` component. Given + an input measurement set path or this `-name` value return + the expected source list text file. ``pol`` is the stokes + that the source is expected. + + Args: + name_path (Path): Value of the ``-name`` option. If `str` converted to a ``Path`` + pol (str, optional): The polarisation to add to the name. Defaults to "i". + + Returns: + Path: Path to the source list text file + """ + + # ye not be trusted + name_path = Path(name_path) + base_name = name_path.name + if ".ms" == Path(base_name).suffix: + base_name = Path(base_name).stem + + logger.info(f"{base_name=} extracted from {name_path=}") + + source_list_name = f"{base_name}.{pol}-sources.txt" + source_list_path = name_path.parent / source_list_name + + return source_list_path + + def _rename_wsclean_title(name_str: str) -> str: """Construct an apply a regular expression that aims to identify the wsclean appended properties string within a file and replace diff --git a/tests/test_wsclean.py b/tests/test_wsclean.py index 7a2ace49..aca99c32 100644 --- a/tests/test_wsclean.py +++ b/tests/test_wsclean.py @@ -20,6 +20,7 @@ create_wsclean_cmd, create_wsclean_name_argument, get_wsclean_output_names, + get_wsclean_output_source_list_path, rename_wsclean_prefix_in_imageset, ) from flint.ms import MS @@ -27,6 +28,31 @@ from flint.utils import get_packaged_resource_path +def test_get_wsclean_output_source_list_path(): + """Wsclean can be configured out output a source list of the + components, their brightness and relative size that were placed + throughout cleaning. Here we be testing whether we can + generate the expected name""" + + example = Path("/flint/pirates/SB58992.RACS_1726-73.beam22.ms") + source_path = Path("/flint/pirates/SB58992.RACS_1726-73.beam22.i-sources.txt") + + test_source_path = get_wsclean_output_source_list_path(name_path=example) + assert source_path == test_source_path + + example = Path("/flint/pirates/SB58992.RACS_1726-73.beam22") + source_path = Path("/flint/pirates/SB58992.RACS_1726-73.beam22.i-sources.txt") + + test_source_path = get_wsclean_output_source_list_path(name_path=example) + assert source_path == test_source_path + + example = "SB58992.RACS_1726-73.beam22" + source_path = Path("SB58992.RACS_1726-73.beam22.i-sources.txt") + + test_source_path = get_wsclean_output_source_list_path(name_path=example) + assert source_path == test_source_path + + @pytest.fixture def ms_example(tmpdir): ms_zip = Path( From 8a520388b746d621d01289a6c2f528fb35f51465 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Wed, 20 Nov 2024 15:16:47 +1100 Subject: [PATCH 05/16] initial attempt at using addmodel --- flint/calibrate/aocalibrate.py | 76 ++++++++++++++++++----- flint/imager/wsclean.py | 45 ++++++++++---- flint/ms.py | 30 +++++++++ flint/prefect/common/imaging.py | 4 ++ flint/prefect/flows/continuum_pipeline.py | 2 + tests/test_aocalibrate.py | 22 ++++++- tests/test_ms.py | 59 ++++++++++++++++-- 7 files changed, 205 insertions(+), 33 deletions(-) diff --git a/flint/calibrate/aocalibrate.py b/flint/calibrate/aocalibrate.py index 9c625863..319aa6dd 100644 --- a/flint/calibrate/aocalibrate.py +++ b/flint/calibrate/aocalibrate.py @@ -11,6 +11,7 @@ Dict, Iterable, List, + Literal, NamedTuple, Optional, Tuple, @@ -34,13 +35,28 @@ ) from flint.exceptions import PhaseOutlierFitError from flint.logging import logger -from flint.ms import MS, consistent_ms, get_beam_from_ms +from flint.ms import MS, consistent_ms, get_beam_from_ms, remove_columns_from_ms from flint.naming import get_aocalibrate_output_path +from flint.options import BaseOptions from flint.sclient import run_singularity_command from flint.utils import create_directory -class CalibrateOptions(NamedTuple): +class AddModelOptions(BaseOptions): + """Container for options into the ``addmodel`` program packaged + with ``aocalibrate``""" + + model_path: Path + """Path to the sky-model that will be inserted""" + ms_path: Path + """Path to the measurement set that will be interacted with""" + mode: Literal["a", "s", "c", "v"] + """The mode ``addmodel`` will be operating under, where where a=add model to visibilities (default), s=subtract model from visibilities, c=copy model to visibilities, z=zero visibilities""" + datacolumn: str + """The column that will be operated against""" + + +class CalibrateOptions(BaseOptions): """Structure used to represent options into the `calibrate` program These attributes have the same names as options into the `calibrate` @@ -60,14 +76,8 @@ class CalibrateOptions(NamedTuple): p: Optional[Tuple[Path, Path]] = None """Plot output names for the amplitude gain and phases""" - def with_options(self, **kwargs) -> CalibrateOptions: - options = self._asdict() - options.update(**kwargs) - return CalibrateOptions(**options) - - -class CalibrateCommand(NamedTuple): +class CalibrateCommand(BaseOptions): """The AO Calibrate command and output path of the corresponding solutions file""" cmd: str @@ -83,14 +93,8 @@ class CalibrateCommand(NamedTuple): preflagged: bool = False """Indicates whether the solution file has gone through preflagging routines. """ - def with_options(self, **kwargs) -> CalibrateCommand: - _dict = self._asdict() - _dict.update(**kwargs) - - return CalibrateCommand(**_dict) - -class ApplySolutions(NamedTuple): +class ApplySolutions(BaseOptions): """The applysolutions command to execute""" cmd: str @@ -855,6 +859,10 @@ class FlaggedAOSolution(NamedTuple): """The bandpass solutions after flagging, as saved in the solutions file""" +# TODO: These options are too much and should be placed +# into a BaseOptions + + def flag_aosolutions( solutions_path: Path, ref_ant: int = -1, @@ -1081,6 +1089,42 @@ def flag_aosolutions( return flagged_aosolutions +def add_model_options_to_command(add_model_options: AddModelOptions) -> str: + """Generate the command to execute ``addmodel`` + + Args: + add_model_options (AddModelOptions): The collection of supported options used to generate the command + + Returns: + str: The generated addmodel command + """ + logger.info("Generating addmodel command") + command = f"addmodel -datacolumn {add_model_options.datacolumn} -m {add_model_options.mode} " + command += f"{str(add_model_options.model_path)} {str(add_model_options.ms_path)}" + + return command + + +def add_model( + add_model_options: AddModelOptions, container: Path, remove_datacolumn: bool = False +) -> AddModelOptions: + if remove_datacolumn: + remove_columns_from_ms( + ms=add_model_options.ms_path, columns_to_remove=add_model_options.datacolumn + ) + add_model_command = add_model_options_to_command( + add_model_options=add_model_options + ) + + run_singularity_command( + image=container, + command=add_model_command, + bind_dirs=[add_model_options.ms_path, add_model_options.model_path], + ) + + return add_model_options + + def get_parser() -> ArgumentParser: parser = ArgumentParser( description="Run calibrate and apply the solutions given a measurement set and sky-model." diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 80018829..906b5240 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -26,6 +26,7 @@ import numpy as np from fitscube.combine_fits import combine_fits +from flint.calibrate.aocalibrate import AddModelOptions, add_model from flint.exceptions import CleanDivergenceError from flint.logging import logger from flint.ms import MS @@ -160,7 +161,7 @@ class WSCleanOptions(BaseOptions): """Saves the found clean components as a BBS/DP3 text sky model""" -class WSCleanCommand(NamedTuple): +class WSCleanCommand(BaseOptions): """Simple container for a wsclean command.""" cmd: str @@ -174,14 +175,10 @@ class WSCleanCommand(NamedTuple): cleanup: bool = True """Will clean up the dirty images/psfs/residuals/models when the imaging has completed""" - def with_options(self, **kwargs) -> WSCleanCommand: - _dict = self._asdict() - _dict.update(**kwargs) - return WSCleanCommand(**_dict) - - -def get_wsclean_output_source_list_path(name_path: Path, pol: str = "i") -> Path: +def get_wsclean_output_source_list_path( + name_path: Union[str, Path], pol: str = "i" +) -> Path: """WSClean can produce a text file that describes the components that it cleaned, their type, scale and brightness. These are placed in a file that is: @@ -194,7 +191,7 @@ def get_wsclean_output_source_list_path(name_path: Path, pol: str = "i") -> Path that the source is expected. Args: - name_path (Path): Value of the ``-name`` option. If `str` converted to a ``Path`` + name_path (Union[str,Path]): Value of the ``-name`` option. If `str` converted to a ``Path`` pol (str, optional): The polarisation to add to the name. Defaults to "i". Returns: @@ -521,7 +518,10 @@ def _resolve_wsclean_key_value_to_cli_str(key: str, value: Any) -> ResolvedCLIRe def create_wsclean_cmd( - ms: MS, wsclean_options: WSCleanOptions, container: Optional[Path] = None + ms: MS, + wsclean_options: WSCleanOptions, + container: Optional[Path] = None, + calibrate_container: Optional[Path] = None, ) -> WSCleanCommand: """Create a wsclean command from a WSCleanOptions container @@ -538,6 +538,7 @@ def create_wsclean_cmd( ms (MS): The measurement set to be imaged wsclean_options (WSCleanOptions): WSClean options to image with container (Optional[Path], optional): If a path to a container is provided the command is executed immediately. Defaults to None. + calibrate_container (Optional[Path], optional): Patht to the aocalibrate container with ``addmodel``. If not None and ``wsclean -save-source-list` is used the model will be predicted at full channel resolution Raises: ValueError: Raised when a option has not been successfully processed @@ -599,6 +600,7 @@ def create_wsclean_cmd( bind_dirs=tuple(bind_dir_paths), move_hold_directories=(move_directory, hold_directory), image_prefix_str=str(name_argument_path), + calibrate_container=calibrate_container, ) return wsclean_cmd @@ -728,6 +730,7 @@ def run_wsclean_imager( move_hold_directories: Optional[Tuple[Path, Optional[Path]]] = None, make_cube_from_subbands: bool = True, image_prefix_str: Optional[str] = None, + calibrate_container: Optional[Path] = None, ) -> WSCleanCommand: """Run a provided wsclean command. Optionally will clean up files, including the dirty beams, psfs and other assorted things. @@ -744,6 +747,7 @@ def run_wsclean_imager( move_hold_directories (Optional[Tuple[Path,Optional[Path]]], optional): The `move_directory` and `hold_directory` passed to the temporary context manager. If None no `hold_then_move_into` manager is used. Defaults to None. make_cube_from_subbands (bool, optional): Form a single FITS cube from the set of sub-band images wsclean produces. Defaults to False. image_prefix_str (Optional[str], optional): The name used to search for wsclean outputs. If None, it is guessed from the name and location of the MS. Defaults to None. + calibrate_container (Optional[Path], optional): Path to the ``aocalibrate`` container with the ``addmodel`` program. Should this be provided and the ``wsclean -save-source-list`` be used, then the model will be predicted at full channel resolution Returns: WSCleanCommand: The executed wsclean command with a populated imageset properter. @@ -797,6 +801,20 @@ def run_wsclean_imager( # prefix=prefix, output_type=output_type, ignore_mfs=False # ) + if calibrate_container and wsclean_cmd.options.save_source_list: + logger.info("Predicting the wsclean clean components SEDs") + source_list_path = get_wsclean_output_source_list_path( + name_path=prefix, pol="i" + ) + assert source_list_path.exists(), f"{source_list_path=} does not exist" + add_model_options = AddModelOptions( + model_path=source_list_path, + ms_path=wsclean_cmd.ms.path, + mode="c", + datacolumn="MODEL_DATA", + ) + add_model(add_model_options=add_model_options, container=calibrate_container) + imageset = get_wsclean_output_names( prefix=prefix, subbands=wsclean_cmd.options.channels_out, @@ -821,6 +839,7 @@ def wsclean_imager( ms: Union[Path, MS], wsclean_container: Path, update_wsclean_options: Optional[Dict[str, Any]] = None, + calibrate_container: Optional[Path] = None, ) -> WSCleanCommand: """Create and run a wsclean imager command against a measurement set. @@ -828,6 +847,7 @@ def wsclean_imager( ms (Union[Path,MS]): Path to the measurement set that will be imaged wsclean_container (Path): Path to the container with wsclean installed update_wsclean_options (Optional[Dict[str, Any]], optional): Additional options to update the generated WscleanOptions with. Keys should be attributes of WscleanOptions. Defaults to None. + calibrate_container (Optional[Path], optional): Patht to the aocalibrate container with ``addmodel``. If not None and ``wsclean -save-source-list` is used the model will be predicted at full channel resolution Returns: WSCleanCommand: _description_ @@ -844,7 +864,10 @@ def wsclean_imager( assert ms.column is not None, "A MS column needs to be elected for imaging. " wsclean_options = wsclean_options.with_options(data_column=ms.column) wsclean_cmd = create_wsclean_cmd( - ms=ms, wsclean_options=wsclean_options, container=wsclean_container + ms=ms, + wsclean_options=wsclean_options, + container=wsclean_container, + calibrate_container=calibrate_container, ) return wsclean_cmd diff --git a/flint/ms.py b/flint/ms.py index 879f0345..1c81c2c5 100644 --- a/flint/ms.py +++ b/flint/ms.py @@ -543,6 +543,36 @@ def rename_column_in_ms( return ms +def remove_columns_from_ms( + ms: Union[MS, Path], columns_to_remove: Union[str, List[str]] +) -> List[str]: + """Attempt to remove a collection of columns from a measurement set. + If any of the provided columns do not exist they are ignored. + + Args: + ms (Union[MS, Path]): The measurement set to inspect and remove columns from + columns_to_remove (Union[str, List[str]]): Collection of column names to remove. If a single column internally it is cast to a list of length 1. + + Returns: + List[str]: Collection of column names removed + """ + + if isinstance(columns_to_remove, str): + columns_to_remove = [columns_to_remove] + + ms = MS.cast(ms=ms) + with table(tablename=str(ms.path), readonly=False, ack=False) as tab: + colnames = tab.colnames() + columns_to_remove = [c for c in columns_to_remove if c in colnames] + if len(columns_to_remove) == 0: + logger.info(f"All columns provided do not exist in {ms.path}") + else: + logger.info(f"Removing {columns_to_remove=} from {ms.path}") + tab.removecols(columnnames=columns_to_remove) + + return columns_to_remove + + def preprocess_askap_ms( ms: Union[MS, Path], data_column: str = "DATA", diff --git a/flint/prefect/common/imaging.py b/flint/prefect/common/imaging.py index cf00533a..a9c80f03 100644 --- a/flint/prefect/common/imaging.py +++ b/flint/prefect/common/imaging.py @@ -283,6 +283,7 @@ def task_wsclean_imager( wsclean_container: Path, update_wsclean_options: Optional[Dict[str, Any]] = None, fits_mask: Optional[FITSMaskNames] = None, + calibrate_container: Optional[Path] = None, ) -> WSCleanCommand: """Run the wsclean imager against an input measurement set @@ -291,6 +292,7 @@ def task_wsclean_imager( wsclean_container (Path): Path to a singularity container with wsclean packages update_wsclean_options (Optional[Dict[str, Any]], optional): Options to update from the default wsclean options. Defaults to None. fits_mask (Optional[FITSMaskNames], optional): A path to a clean guard mask. Defaults to None. + calibrate_container (Optional[Path], optional): The aocalibrate container that has the ``addmodel`` program. is ``wsclean -save-source-list` is used then the model will be predicted at full resolution Returns: WSCleanCommand: A resulting wsclean command and resulting meta-data @@ -312,6 +314,7 @@ def task_wsclean_imager( ms=ms, wsclean_container=wsclean_container, update_wsclean_options=update_wsclean_options, + calibrate_container=calibrate_container, ) except CleanDivergenceError: # NOTE: If the cleaning failed retry with some larger images @@ -340,6 +343,7 @@ def task_wsclean_imager( ms=ms, wsclean_container=wsclean_container, update_wsclean_options=update_wsclean_options, + calibrate_container=calibrate_container, ) diff --git a/flint/prefect/flows/continuum_pipeline.py b/flint/prefect/flows/continuum_pipeline.py index 2cc1ec1f..79ff25e5 100644 --- a/flint/prefect/flows/continuum_pipeline.py +++ b/flint/prefect/flows/continuum_pipeline.py @@ -301,6 +301,7 @@ def process_science_fields( strategy=unmapped(strategy), mode="wsclean", round_info="initial", + calibrate_container=field_options.calibrate_container, ) # type: ignore # TODO: This should be waited! @@ -419,6 +420,7 @@ def process_science_fields( strategy=unmapped(strategy), mode="wsclean", round_info=current_round, + calibrate_container=field_options.calibrate_container, ) # type: ignore archive_wait_for.extend(wsclean_cmds) diff --git a/tests/test_aocalibrate.py b/tests/test_aocalibrate.py index 6f149be5..bbfc6208 100644 --- a/tests/test_aocalibrate.py +++ b/tests/test_aocalibrate.py @@ -13,17 +13,37 @@ smooth_data, ) from flint.calibrate.aocalibrate import ( + AddModelOptions, AOSolutions, CalibrateOptions, FlaggedAOSolution, calibrate_options_to_command, flag_aosolutions, + add_model_options_to_command, plot_solutions, select_refant, ) from flint.utils import get_packaged_resource_path +def test_generate_add_model_command(): + """Ensure we can actually generate the expected addmodel cli command""" + + add_model_options = AddModelOptions( + model_path=Path("/jack/sparrow/be/here/SB-sources.txt"), + ms_path=Path("/jack/sparrow/be/here/SB.ms"), + mode="c", + datacolumn="MODEL_DATA", + ) + + add_model_command = add_model_options_to_command( + add_model_options=add_model_options + ) + + expected_command = "addmodel -datacolumn MODEL_DATA -m c /jack/sparrow/be/here/SB-sources.txt /jack/sparrow/be/here/SB.ms" + assert add_model_command == expected_command + + def test_calibrate_options_to_command(): default_cal = CalibrateOptions(datacolumn="DATA", m=Path("/example/1934.model")) ex_ms_path = Path("/example/data.ms") @@ -83,7 +103,7 @@ def test_calibrate_options_to_command3(): assert ( cmd - == "calibrate -datacolumn DATA -m /example/1934.model -minuv 300 -maxuv 5000 -i 40 -p amps.plot phase.plot /example/data.ms /example/sols.calibrate" + == "calibrate -datacolumn DATA -m /example/1934.model -minuv 300.0 -maxuv 5000.0 -i 40 -p amps.plot phase.plot /example/data.ms /example/sols.calibrate" ) diff --git a/tests/test_ms.py b/tests/test_ms.py index 53bc5f00..9fba67f4 100644 --- a/tests/test_ms.py +++ b/tests/test_ms.py @@ -6,17 +6,18 @@ from pathlib import Path import numpy as np +from pydantic import ValidationError import pytest from casacore.tables import table from flint.calibrate.aocalibrate import ApplySolutions -from flint.exceptions import MSError from flint.ms import ( MS, check_column_in_ms, copy_and_preprocess_casda_askap_ms, find_mss, get_phase_dir_from_ms, + remove_columns_from_ms, rename_ms_and_columns_for_selfcal, ) from flint.utils import get_packaged_resource_path @@ -213,10 +214,58 @@ def test_ms_from_options(): def test_raise_error_ms_from_options(): + """Ensure validation error is raised when not casting correct + ms path type""" + # TODO: remove this test as pydantic prevents this from happening path = Path("example.ms") - solutions = ApplySolutions( - cmd="none", solution_path=Path("example_sols.bin"), ms=path + with pytest.raises(ValidationError): + _ = ApplySolutions(cmd="none", solution_path=Path("example_sols.bin"), ms=path) + + # with pytest.raises((MSError, ValidationError)): + # MS.cast(solutions) + + +@pytest.fixture +def ms_remove_example(tmpdir): + """Create a copy of the MS that will be modified.""" + ms_zip = Path( + get_packaged_resource_path( + package="flint.data.tests", + filename="SB39400.RACS_0635-31.beam0.small.ms.zip", + ) ) + outpath = Path(tmpdir) / "Removecols_39400" - with pytest.raises(MSError): - MS.cast(solutions) + shutil.unpack_archive(ms_zip, outpath) + + ms_path = Path(outpath) / "SB39400.RACS_0635-31.beam0.small.ms" + + return ms_path + + +def _get_column_names(ms_path): + with table(str(ms_path)) as tab: + column_names = tab.colnames() + + return column_names + + +def test_remove_columns_from_ms(ms_remove_example): + """Load an example MS to remove columns from""" + original_columns = _get_column_names(ms_path=ms_remove_example) + + removed_columns = remove_columns_from_ms( + ms=ms_remove_example, columns_to_remove="DATA" + ) + + updated_columns = _get_column_names(ms_path=ms_remove_example) + diff = set(original_columns) - set(updated_columns) + assert len(diff) == 1 + assert list(diff)[0] == "DATA" + assert removed_columns[0] == "DATA" + assert len(removed_columns) == 1 + + removed_columns = remove_columns_from_ms( + ms=ms_remove_example, columns_to_remove="DATA" + ) + assert len(removed_columns) == 0 From 6898283164f76f78540607ae2fe3c96e12057234 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Wed, 20 Nov 2024 22:15:16 +1100 Subject: [PATCH 06/16] Removed collection from BaseOption WSCleanCommand --- flint/imager/wsclean.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 906b5240..1a1e2ba0 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -51,15 +51,15 @@ class ImageSet(NamedTuple): prefix: str """Prefix of the images and other output products. This should correspond to the -name argument from wsclean""" - image: Collection[Path] + image: List[Path] """Images produced. """ - psf: Optional[Collection[Path]] = None + psf: Optional[List[Path]] = None """References to the PSFs produced by wsclean. """ - dirty: Optional[Collection[Path]] = None + dirty: Optional[List[Path]] = None """Dirty images. """ - model: Optional[Collection[Path]] = None + model: Optional[List[Path]] = None """Model images. """ - residual: Optional[Collection[Path]] = None + residual: Optional[List[Path]] = None """Residual images.""" From 5ade5be303c8aa1a78f68e301972f576e71912e4 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Thu, 21 Nov 2024 10:05:34 +1100 Subject: [PATCH 07/16] Updated source list name logic --- flint/imager/wsclean.py | 10 ++++++---- tests/test_wsclean.py | 12 +++++++++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 1a1e2ba0..2195a72b 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -177,7 +177,7 @@ class WSCleanCommand(BaseOptions): def get_wsclean_output_source_list_path( - name_path: Union[str, Path], pol: str = "i" + name_path: Union[str, Path], pol: Optional[str] = None ) -> Path: """WSClean can produce a text file that describes the components that it cleaned, their type, scale and brightness. These are @@ -192,7 +192,7 @@ def get_wsclean_output_source_list_path( Args: name_path (Union[str,Path]): Value of the ``-name`` option. If `str` converted to a ``Path`` - pol (str, optional): The polarisation to add to the name. Defaults to "i". + pol (Optional[str], optional): The polarisation to add to the name. If None the -source.txt suffix is simply appended. Defaults to None. Returns: Path: Path to the source list text file @@ -206,7 +206,9 @@ def get_wsclean_output_source_list_path( logger.info(f"{base_name=} extracted from {name_path=}") - source_list_name = f"{base_name}.{pol}-sources.txt" + source_list_name = ( + f"{base_name}.{pol}-sources.txt" if pol else f"{base_name}-sources.txt" + ) source_list_path = name_path.parent / source_list_name return source_list_path @@ -330,7 +332,7 @@ def get_wsclean_output_names( if isinstance(output_types, str): output_types = (output_types,) - images: Dict[str, Collection[Path]] = {} + images: Dict[str, List[Path]] = {} for image_type in ("image", "dirty", "model", "residual"): if image_type not in output_types: continue diff --git a/tests/test_wsclean.py b/tests/test_wsclean.py index aca99c32..7fcdc943 100644 --- a/tests/test_wsclean.py +++ b/tests/test_wsclean.py @@ -37,19 +37,25 @@ def test_get_wsclean_output_source_list_path(): example = Path("/flint/pirates/SB58992.RACS_1726-73.beam22.ms") source_path = Path("/flint/pirates/SB58992.RACS_1726-73.beam22.i-sources.txt") - test_source_path = get_wsclean_output_source_list_path(name_path=example) + test_source_path = get_wsclean_output_source_list_path(name_path=example, pol="i") assert source_path == test_source_path example = Path("/flint/pirates/SB58992.RACS_1726-73.beam22") source_path = Path("/flint/pirates/SB58992.RACS_1726-73.beam22.i-sources.txt") - test_source_path = get_wsclean_output_source_list_path(name_path=example) + test_source_path = get_wsclean_output_source_list_path(name_path=example, pol="i") assert source_path == test_source_path example = "SB58992.RACS_1726-73.beam22" source_path = Path("SB58992.RACS_1726-73.beam22.i-sources.txt") - test_source_path = get_wsclean_output_source_list_path(name_path=example) + test_source_path = get_wsclean_output_source_list_path(name_path=example, pol="i") + assert source_path == test_source_path + + example = "SB58992.RACS_1726-73.beam22" + source_path = Path("SB58992.RACS_1726-73.beam22-sources.txt") + + test_source_path = get_wsclean_output_source_list_path(name_path=example, pol=None) assert source_path == test_source_path From f0aa81976c8a402871ef2c8fc1121b0e6027b5fb Mon Sep 17 00:00:00 2001 From: tgalvin Date: Thu, 21 Nov 2024 11:03:07 +1100 Subject: [PATCH 08/16] removing default pol=i in call get wsclean suorce list path --- flint/imager/wsclean.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 2195a72b..43b7ee3b 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -806,7 +806,7 @@ def run_wsclean_imager( if calibrate_container and wsclean_cmd.options.save_source_list: logger.info("Predicting the wsclean clean components SEDs") source_list_path = get_wsclean_output_source_list_path( - name_path=prefix, pol="i" + name_path=prefix, pol=None ) assert source_list_path.exists(), f"{source_list_path=} does not exist" add_model_options = AddModelOptions( From 3ee2d5bb5b95fe15f84f98d08898956abad7c598 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Thu, 21 Nov 2024 15:24:15 +1100 Subject: [PATCH 09/16] femove model data column --- flint/imager/wsclean.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 43b7ee3b..9a24b7f4 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -815,7 +815,11 @@ def run_wsclean_imager( mode="c", datacolumn="MODEL_DATA", ) - add_model(add_model_options=add_model_options, container=calibrate_container) + add_model( + add_model_options=add_model_options, + container=calibrate_container, + remove_datacolumn="MODEL_DATA", + ) imageset = get_wsclean_output_names( prefix=prefix, From 9712dbb1e664ab99d0b55452fc66083b9b53a50e Mon Sep 17 00:00:00 2001 From: tgalvin Date: Thu, 21 Nov 2024 16:34:58 +1100 Subject: [PATCH 10/16] removed addmodel from wsclean and added as actual task --- CHANGELOG.md | 2 ++ flint/imager/wsclean.py | 35 +++++++---------------- flint/options.py | 2 ++ flint/prefect/common/imaging.py | 4 --- flint/prefect/flows/continuum_pipeline.py | 20 +++++++++++-- 5 files changed, 33 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00b98694..41e4ae3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ - Removed `suppress_artefact` and `minimum_absolute_clip` functions from `flint.masking` - Added an adaptive box selection mode to the minimum absolute algorithm +- Update a MSs `MODEL_DATA` column using `addmodel` and a source list (see + `wsclean -save-source-list`) # 0.2.7 diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 9a24b7f4..6cd860af 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -26,7 +26,6 @@ import numpy as np from fitscube.combine_fits import combine_fits -from flint.calibrate.aocalibrate import AddModelOptions, add_model from flint.exceptions import CleanDivergenceError from flint.logging import logger from flint.ms import MS @@ -61,6 +60,8 @@ class ImageSet(NamedTuple): """Model images. """ residual: Optional[List[Path]] = None """Residual images.""" + source_list: Optional[Path] = None + """Path to a source list that accompanies the image data""" class WSCleanOptions(BaseOptions): @@ -277,6 +278,7 @@ def _wsclean_output_callback(line: str) -> None: raise CleanDivergenceError(f"Clean divergence detected: {line}") +# TODO: Update this function to also add int the source list def get_wsclean_output_names( prefix: str, subbands: int, @@ -732,7 +734,6 @@ def run_wsclean_imager( move_hold_directories: Optional[Tuple[Path, Optional[Path]]] = None, make_cube_from_subbands: bool = True, image_prefix_str: Optional[str] = None, - calibrate_container: Optional[Path] = None, ) -> WSCleanCommand: """Run a provided wsclean command. Optionally will clean up files, including the dirty beams, psfs and other assorted things. @@ -749,7 +750,6 @@ def run_wsclean_imager( move_hold_directories (Optional[Tuple[Path,Optional[Path]]], optional): The `move_directory` and `hold_directory` passed to the temporary context manager. If None no `hold_then_move_into` manager is used. Defaults to None. make_cube_from_subbands (bool, optional): Form a single FITS cube from the set of sub-band images wsclean produces. Defaults to False. image_prefix_str (Optional[str], optional): The name used to search for wsclean outputs. If None, it is guessed from the name and location of the MS. Defaults to None. - calibrate_container (Optional[Path], optional): Path to the ``aocalibrate`` container with the ``addmodel`` program. Should this be provided and the ``wsclean -save-source-list`` be used, then the model will be predicted at full channel resolution Returns: WSCleanCommand: The executed wsclean command with a populated imageset properter. @@ -803,24 +803,6 @@ def run_wsclean_imager( # prefix=prefix, output_type=output_type, ignore_mfs=False # ) - if calibrate_container and wsclean_cmd.options.save_source_list: - logger.info("Predicting the wsclean clean components SEDs") - source_list_path = get_wsclean_output_source_list_path( - name_path=prefix, pol=None - ) - assert source_list_path.exists(), f"{source_list_path=} does not exist" - add_model_options = AddModelOptions( - model_path=source_list_path, - ms_path=wsclean_cmd.ms.path, - mode="c", - datacolumn="MODEL_DATA", - ) - add_model( - add_model_options=add_model_options, - container=calibrate_container, - remove_datacolumn="MODEL_DATA", - ) - imageset = get_wsclean_output_names( prefix=prefix, subbands=wsclean_cmd.options.channels_out, @@ -829,6 +811,14 @@ def run_wsclean_imager( check_exists_when_adding=True, ) + if wsclean_cmd.options.save_source_list: + logger.info("Attaching the wsclean clean components SEDs") + source_list_path = get_wsclean_output_source_list_path( + name_path=prefix, pol=None + ) + assert source_list_path.exists(), f"{source_list_path=} does not exist" + imageset = imageset.with_options(source_list=source_list_path) + if make_cube_from_subbands: imageset = combine_subbands_to_cube( imageset=imageset, remove_original_images=True @@ -845,7 +835,6 @@ def wsclean_imager( ms: Union[Path, MS], wsclean_container: Path, update_wsclean_options: Optional[Dict[str, Any]] = None, - calibrate_container: Optional[Path] = None, ) -> WSCleanCommand: """Create and run a wsclean imager command against a measurement set. @@ -853,7 +842,6 @@ def wsclean_imager( ms (Union[Path,MS]): Path to the measurement set that will be imaged wsclean_container (Path): Path to the container with wsclean installed update_wsclean_options (Optional[Dict[str, Any]], optional): Additional options to update the generated WscleanOptions with. Keys should be attributes of WscleanOptions. Defaults to None. - calibrate_container (Optional[Path], optional): Patht to the aocalibrate container with ``addmodel``. If not None and ``wsclean -save-source-list` is used the model will be predicted at full channel resolution Returns: WSCleanCommand: _description_ @@ -873,7 +861,6 @@ def wsclean_imager( ms=ms, wsclean_options=wsclean_options, container=wsclean_container, - calibrate_container=calibrate_container, ) return wsclean_cmd diff --git a/flint/options.py b/flint/options.py index 44a2d3db..4cab7c8a 100644 --- a/flint/options.py +++ b/flint/options.py @@ -290,6 +290,8 @@ class FieldOptions(BaseOptions): """Specifies whether Stokes-V imaging will be carried out after the final round of imagine (whether or not self-calibration is enabled). """ coadd_cubes: bool = False """Co-add cubes formed throughout imaging together. Cubes will be smoothed channel-wise to a common resolution. Only performed on final set of images""" + update_model_data_with_source_list: bool = False + """Attempt to update a MSs MODEL_DATA column with a source list (e.g. source list output from wsclean)""" def dump_field_options_to_yaml( diff --git a/flint/prefect/common/imaging.py b/flint/prefect/common/imaging.py index a9c80f03..cf00533a 100644 --- a/flint/prefect/common/imaging.py +++ b/flint/prefect/common/imaging.py @@ -283,7 +283,6 @@ def task_wsclean_imager( wsclean_container: Path, update_wsclean_options: Optional[Dict[str, Any]] = None, fits_mask: Optional[FITSMaskNames] = None, - calibrate_container: Optional[Path] = None, ) -> WSCleanCommand: """Run the wsclean imager against an input measurement set @@ -292,7 +291,6 @@ def task_wsclean_imager( wsclean_container (Path): Path to a singularity container with wsclean packages update_wsclean_options (Optional[Dict[str, Any]], optional): Options to update from the default wsclean options. Defaults to None. fits_mask (Optional[FITSMaskNames], optional): A path to a clean guard mask. Defaults to None. - calibrate_container (Optional[Path], optional): The aocalibrate container that has the ``addmodel`` program. is ``wsclean -save-source-list` is used then the model will be predicted at full resolution Returns: WSCleanCommand: A resulting wsclean command and resulting meta-data @@ -314,7 +312,6 @@ def task_wsclean_imager( ms=ms, wsclean_container=wsclean_container, update_wsclean_options=update_wsclean_options, - calibrate_container=calibrate_container, ) except CleanDivergenceError: # NOTE: If the cleaning failed retry with some larger images @@ -343,7 +340,6 @@ def task_wsclean_imager( ms=ms, wsclean_container=wsclean_container, update_wsclean_options=update_wsclean_options, - calibrate_container=calibrate_container, ) diff --git a/flint/prefect/flows/continuum_pipeline.py b/flint/prefect/flows/continuum_pipeline.py index 79ff25e5..4e6dbe6d 100644 --- a/flint/prefect/flows/continuum_pipeline.py +++ b/flint/prefect/flows/continuum_pipeline.py @@ -54,6 +54,7 @@ task_wsclean_imager, task_zip_ms, ) +from flint.prefect.common.ms import task_add_model_source_list_to_ms from flint.prefect.common.utils import ( task_archive_sbid, task_create_beam_summary, @@ -301,9 +302,17 @@ def process_science_fields( strategy=unmapped(strategy), mode="wsclean", round_info="initial", - calibrate_container=field_options.calibrate_container, ) # type: ignore + wsclean_cmds = ( + task_add_model_source_list_to_ms.map( + wsclean_command=wsclean_cmds, + calibrate_container=field_options.calibrate_container, + ) + if field_options.update_model_data_with_source_list + else wsclean_cmds + ) + # TODO: This should be waited! beam_summaries = task_create_beam_summary.map( ms=preprocess_science_mss, imageset=wsclean_cmds @@ -420,8 +429,15 @@ def process_science_fields( strategy=unmapped(strategy), mode="wsclean", round_info=current_round, - calibrate_container=field_options.calibrate_container, ) # type: ignore + wsclean_cmds = ( + task_add_model_source_list_to_ms.map( + wsclean_command=wsclean_cmds, + calibrate_container=field_options.calibrate_container, + ) + if field_options.update_model_data_with_source_list + else wsclean_cmds + ) archive_wait_for.extend(wsclean_cmds) # Do source finding on the last round of self-cal'ed images From 3a4290eb199de5df1ce097c9dd89785b71ba1826 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Thu, 21 Nov 2024 19:40:29 +1100 Subject: [PATCH 11/16] added ms common prefect --- flint/prefect/common/ms.py | 46 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 flint/prefect/common/ms.py diff --git a/flint/prefect/common/ms.py b/flint/prefect/common/ms.py new file mode 100644 index 00000000..ad8fd243 --- /dev/null +++ b/flint/prefect/common/ms.py @@ -0,0 +1,46 @@ +"""Common prefect tasks around interacting with measurement sets""" + +from pathlib import Path +from typing import Optional + +from prefect import task + +from flint.calibrate.aocalibrate import AddModelOptions, add_model +from flint.logging import logger +from flint.imager.wsclean import WSCleanCommand + + +# TODO: This can be a dispatcher type function should +# other modes be added +def add_model_source_list_to_ms( + wsclean_command: WSCleanCommand, calibrate_container: Optional[Path] = None +) -> WSCleanCommand: + logger.info("Updating MODEL_DATA with source list") + ms = wsclean_command.ms + + source_list_path = wsclean_command.imageset.source_list + if source_list_path is None: + logger.info(f"{source_list_path=}, so not updating") + return ms + assert source_list_path.exists(), f"{source_list_path=} does not exist" + + if calibrate_container is None: + logger.info(f"{calibrate_container=}, so not updating") + return ms + assert calibrate_container.exists(), f"{calibrate_container=} does not exist" + + add_model_options = AddModelOptions( + model_path=source_list_path, + ms_path=ms.path, + mode="c", + datacolumn="MODEL_DATA", + ) + add_model( + add_model_options=add_model_options, + container=calibrate_container, + remove_datacolumn="MODEL_DATA", + ) + return wsclean_command + + +task_add_model_source_list_to_ms = task(add_model_source_list_to_ms) From 4bd20b7648e3899960e39a44bef7e8346eb6069f Mon Sep 17 00:00:00 2001 From: tgalvin Date: Thu, 21 Nov 2024 20:01:19 +1100 Subject: [PATCH 12/16] removed old calibrate --- flint/imager/wsclean.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 6cd860af..04908a19 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -525,7 +525,6 @@ def create_wsclean_cmd( ms: MS, wsclean_options: WSCleanOptions, container: Optional[Path] = None, - calibrate_container: Optional[Path] = None, ) -> WSCleanCommand: """Create a wsclean command from a WSCleanOptions container @@ -542,7 +541,6 @@ def create_wsclean_cmd( ms (MS): The measurement set to be imaged wsclean_options (WSCleanOptions): WSClean options to image with container (Optional[Path], optional): If a path to a container is provided the command is executed immediately. Defaults to None. - calibrate_container (Optional[Path], optional): Patht to the aocalibrate container with ``addmodel``. If not None and ``wsclean -save-source-list` is used the model will be predicted at full channel resolution Raises: ValueError: Raised when a option has not been successfully processed @@ -604,7 +602,6 @@ def create_wsclean_cmd( bind_dirs=tuple(bind_dir_paths), move_hold_directories=(move_directory, hold_directory), image_prefix_str=str(name_argument_path), - calibrate_container=calibrate_container, ) return wsclean_cmd From cb48baec2f904f416bb122b77c5c475a840b3c25 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Thu, 21 Nov 2024 20:56:30 +1100 Subject: [PATCH 13/16] Imageset to use BaseOptions --- flint/imager/wsclean.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint/imager/wsclean.py b/flint/imager/wsclean.py index 04908a19..a27dc8e8 100644 --- a/flint/imager/wsclean.py +++ b/flint/imager/wsclean.py @@ -44,7 +44,7 @@ ) -class ImageSet(NamedTuple): +class ImageSet(BaseOptions): """A structure to represent the images and auxiliary products produced by wsclean""" From 7ff16d7fa7f651391f2dd35953a3b3671ba1c629 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Fri, 22 Nov 2024 13:53:36 +1100 Subject: [PATCH 14/16] added tests / subtract_model_from_data_column --- CHANGELOG.md | 2 + flint/ms.py | 35 ++++++++++++++++- tests/test_ms.py | 98 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41e4ae3c..f7e4df1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ - Added an adaptive box selection mode to the minimum absolute algorithm - Update a MSs `MODEL_DATA` column using `addmodel` and a source list (see `wsclean -save-source-list`) +- Added a `taql` based function intended to be used to subtract model data from + nominated data, `flint.ms.subtract_model_from_data_column` # 0.2.7 diff --git a/flint/ms.py b/flint/ms.py index 1c81c2c5..2fdf08ba 100644 --- a/flint/ms.py +++ b/flint/ms.py @@ -94,7 +94,7 @@ def critical_ms_interaction( assert ( not output_ms.exists() ), f"The output measurement set {output_ms} already exists. " - + logger.info(f"Critical section for {input_ms=}") if copy: rsync_copy_directory(target_path=input_ms, out_path=output_ms) else: @@ -573,6 +573,39 @@ def remove_columns_from_ms( return columns_to_remove +def subtract_model_from_data_column( + ms: MS, model_column: str = "MODEL_DATA", data_column: Optional[str] = None +) -> MS: + """Execute a ``taql`` query to subtract the MODEL_DATA from a nominated data column. + This requires the ``model_column`` to already be inserted into the MS. Internally + the ``critical_ms_interaction`` context manager is used to highlight that the MS + is being modified should things fail when subtracting. + + Args: + ms (MS): The measurement set instance being considered + model_column (str, optional): The column with representing the model. Defaults to "MODEL_DATA". + data_column (Optional[str], optional): The column where the column will be subtracted. If ``None`` it is taken from the ``column`` nominated by the input ``MS`` instance. Defaults to None. + + Returns: + MS: The updated MS + """ + ms = MS.cast(ms) + data_column = data_column if data_column else ms.column + assert data_column is not None, f"{data_column=}, which is not allowed" + with critical_ms_interaction(input_ms=ms.path) as critical_ms: + with table(str(critical_ms), readonly=False) as tab: + logger.info("Extracting columns") + colnames = tab.colnames() + assert all( + [d in colnames for d in (model_column, data_column)] + ), f"{model_column=} or {data_column=} missing from {colnames=}" + + logger.info(f"Subtracting {model_column=} from {data_column=}") + taql(f"UPDATE $tab SET {data_column}={data_column}-{model_column}") + + return ms + + def preprocess_askap_ms( ms: Union[MS, Path], data_column: str = "DATA", diff --git a/tests/test_ms.py b/tests/test_ms.py index 9fba67f4..7a980a93 100644 --- a/tests/test_ms.py +++ b/tests/test_ms.py @@ -19,6 +19,7 @@ get_phase_dir_from_ms, remove_columns_from_ms, rename_ms_and_columns_for_selfcal, + subtract_model_from_data_column, ) from flint.utils import get_packaged_resource_path @@ -269,3 +270,100 @@ def test_remove_columns_from_ms(ms_remove_example): ms=ms_remove_example, columns_to_remove="DATA" ) assert len(removed_columns) == 0 + + +@pytest.fixture +def casda_taql_example(tmpdir): + ms_zip = Path( + get_packaged_resource_path( + package="flint.data.tests", + filename="scienceData.EMU_0529-60.SB50538.EMU_0529-60.beam08_averaged_cal.leakage.ms.zip", + ) + ) + outpath = Path(tmpdir) / "taqlsubtract" + + shutil.unpack_archive(ms_zip, outpath) + + ms_path = ( + Path(outpath) + / "scienceData.EMU_0529-60.SB50538.EMU_0529-60.beam08_averaged_cal.leakage.ms" + ) + + return ms_path + + +def test_subtract_model_from_data_column(casda_taql_example): + """Ensure we can subtact the model from the data via taql""" + ms = Path(casda_taql_example) + assert ms.exists() + ms = MS(path=ms) + + from casacore.tables import maketabdesc, makearrcoldesc + + with table(str(ms.path), readonly=False) as tab: + data = tab.getcol("DATA") + ones = np.ones_like(data, dtype=data.dtype) + + tab.putcol(columnname="DATA", value=ones) + + if "MODEL_DATA" not in tab.colnames(): + coldesc = tab.getdminfo("DATA") + coldesc["NAME"] = "MODEL_DATA" + tab.addcols( + maketabdesc(makearrcoldesc("MODEL_DATA", 0.0 + 0j, ndim=2)), coldesc + ) + tab.flush() + tab.putcol(columnname="MODEL_DATA", value=ones) + tab.flush() + + ms = subtract_model_from_data_column( + ms=ms, model_column="MODEL_DATA", data_column="DATA" + ) + with table(str(ms.path)) as tab: + data = tab.getcol("DATA") + assert np.all(data == 0 + 0j) + + +def test_subtract_model_from_data_column_ms_column(tmpdir): + """Ensure we can subtact the model from the data via taql""" + ms_zip = Path( + get_packaged_resource_path( + package="flint.data.tests", + filename="scienceData.EMU_0529-60.SB50538.EMU_0529-60.beam08_averaged_cal.leakage.ms.zip", + ) + ) + outpath = Path(tmpdir) / "taqlsubtract2" + + shutil.unpack_archive(ms_zip, outpath) + + ms_path = ( + Path(outpath) + / "scienceData.EMU_0529-60.SB50538.EMU_0529-60.beam08_averaged_cal.leakage.ms" + ) + + ms = Path(ms_path) + assert ms.exists() + ms = MS(path=ms, column="DATA") + + from casacore.tables import maketabdesc, makearrcoldesc + + with table(str(ms.path), readonly=False) as tab: + data = tab.getcol("DATA") + ones = np.ones_like(data, dtype=data.dtype) + + tab.putcol(columnname="DATA", value=ones) + + if "MODEL_DATA" not in tab.colnames(): + coldesc = tab.getdminfo("DATA") + coldesc["NAME"] = "MODEL_DATA" + tab.addcols( + maketabdesc(makearrcoldesc("MODEL_DATA", 0.0 + 0j, ndim=2)), coldesc + ) + tab.flush() + tab.putcol(columnname="MODEL_DATA", value=ones) + tab.flush() + + ms = subtract_model_from_data_column(ms=ms, model_column="MODEL_DATA") + with table(str(ms.path)) as tab: + data = tab.getcol("DATA") + assert np.all(data == 0 + 0j) From 6c0c3ddf5b493ac089d8abeb63cf74bc4a04883c Mon Sep 17 00:00:00 2001 From: tgalvin Date: Fri, 22 Nov 2024 15:22:41 +1100 Subject: [PATCH 15/16] fixed type mismatches, added asserts to ensure values not None --- flint/prefect/common/ms.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/flint/prefect/common/ms.py b/flint/prefect/common/ms.py index ad8fd243..240aeb8a 100644 --- a/flint/prefect/common/ms.py +++ b/flint/prefect/common/ms.py @@ -18,15 +18,19 @@ def add_model_source_list_to_ms( logger.info("Updating MODEL_DATA with source list") ms = wsclean_command.ms + assert ( + wsclean_command.imageset is not None + ), f"{wsclean_command.imageset=}, which is not allowed" + source_list_path = wsclean_command.imageset.source_list if source_list_path is None: logger.info(f"{source_list_path=}, so not updating") - return ms + return wsclean_command assert source_list_path.exists(), f"{source_list_path=} does not exist" if calibrate_container is None: logger.info(f"{calibrate_container=}, so not updating") - return ms + return wsclean_command assert calibrate_container.exists(), f"{calibrate_container=} does not exist" add_model_options = AddModelOptions( @@ -38,7 +42,7 @@ def add_model_source_list_to_ms( add_model( add_model_options=add_model_options, container=calibrate_container, - remove_datacolumn="MODEL_DATA", + remove_datacolumn=True, ) return wsclean_command From 899bc6424ea15bcc232a30ee335f5b170d01f9e4 Mon Sep 17 00:00:00 2001 From: tgalvin Date: Sat, 23 Nov 2024 20:46:59 +1100 Subject: [PATCH 16/16] docstring --- flint/calibrate/aocalibrate.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/flint/calibrate/aocalibrate.py b/flint/calibrate/aocalibrate.py index 319aa6dd..608d4adc 100644 --- a/flint/calibrate/aocalibrate.py +++ b/flint/calibrate/aocalibrate.py @@ -1108,6 +1108,17 @@ def add_model_options_to_command(add_model_options: AddModelOptions) -> str: def add_model( add_model_options: AddModelOptions, container: Path, remove_datacolumn: bool = False ) -> AddModelOptions: + """Use the ``addmodel`` program to predict the sky-model visibilities + from a compatible source list (e.g. ``wsclean -save-source-list``) + + Args: + add_model_options (AddModelOptions): The set of supported options to be supplied to ``addmodel`` + container (Path): The calibrate container that contains the ``addmodel`` program + remove_datacolumn (bool, optional): Whether to first removed the ``datacolumn`` specified in ``add_model_options`` before predicting. If False it should be overwritten. Defaults to False. + + Returns: + AddModelOptions: The options used to run ``addmodel`` (same as input) + """ if remove_datacolumn: remove_columns_from_ms( ms=add_model_options.ms_path, columns_to_remove=add_model_options.datacolumn