Skip to content

Commit

Permalink
Merge pull request #120 from tjgalvin/renamems
Browse files Browse the repository at this point in the history
Rename MSs as they are going through self-calibration and imaging rounds
  • Loading branch information
tjgalvin authored Jun 20, 2024
2 parents ac3a9d9 + 6686ce8 commit 950b3b7
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- added in skip rounds for masking and selfcal
- Basic handling of CASDA measurement sets (preprocessing)
- Basic handling of environment variables in Options, only supported in WSCleanOptions (no need for others yet)
- basic renaming of MS and shuffling column names in place of straight up copying the MS

## 0.2.4

Expand Down
28 changes: 0 additions & 28 deletions flint/imager/wsclean.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,34 +488,6 @@ def wsclean_imager(
return wsclean_cmd


def create_template_wsclean_options(
input_wsclean_options: WSCleanOptions,
) -> WSCleanOptions:
"""Construct a simple instance of WSClean options that will not
actually clean. This is intended to be used to get a representations
FITS header with appropriate WSC information.
Args:
input_wsclean_options (WSCleanOptions): The base set of wsclean options to use
Returns:
WSCleanOptions: Template options to use for the wsclean fits header creation
"""

template_options = WSCleanCommand(
size=input_wsclean_options.size,
channels_out=1,
nmiter=0,
niter=1,
data_column=input_wsclean_options.data_column,
scale=input_wsclean_options.scale,
name=f"{input_wsclean_options.name}_template",
)
logger.info(f"Template options are {template_options}")

return template_options


def get_parser() -> ArgumentParser:
parser = ArgumentParser(description="Routines related to wsclean")

Expand Down
85 changes: 83 additions & 2 deletions flint/ms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
annotations,
)

import shutil
from argparse import ArgumentParser
from contextlib import contextmanager
from curses.ascii import controlnames
from os import PathLike
from pathlib import Path
from shutil import rmtree
from typing import List, NamedTuple, Optional, Tuple, Union

import astropy.units as u
Expand Down Expand Up @@ -174,7 +175,7 @@ def critical_ms_interaction(
# If we get to here, things worked successfully, and we
# should return things back to normal.
if copy:
rmtree(input_ms)
shutil.rmtree(input_ms)
output_ms.rename(input_ms)
else:
output_ms.rename(target=input_ms)
Expand Down Expand Up @@ -758,6 +759,86 @@ def copy_and_preprocess_casda_askap_ms(
return ms.with_options(column=data_column)


def rename_ms_and_columns_for_selfcal(
ms: MS,
target: Union[str, Path],
corrected_data: str = "CORRECTED_DATA",
data: str = "DATA",
) -> MS:
"""Take an existing measurement set, rename it, and appropriately
rename the "DATA" and "CORRECTED_DATA" columns to support a new
round of imaging and self-calibration.
This could be considered for larger measurement sets where holding
multiple copies throughout rounds of self-calibration is not advisable.
Args:
ms (MS): The subject measurement set to rename
target (Union[str, Path]): The targett path the measurement set will be renamed to. This shoudl not already exist.
corrected_data (str, optional): The name of the column with the latest calibrated data. This becomes the `data` column. Defaults to "CORRECTED_DATA".
data (str, optional): The name of the column that will be subsequently processed. If it exists it will be removed. Defaults to "DATA".
Raises:
FileExistsError: Raised if `target` already exists
FileNotFoundError: Raise if `ms.path` does not exist
Returns:
MS: Updated MS container with new path and appropriate data column
"""

ms = MS.cast(ms)
target = Path(target)

if target.exists():
raise FileExistsError(f"{target} already exists!")
if not ms.path.exists() or not ms.path.is_dir():
raise FileNotFoundError(
f"{ms.path} does not exists or is not a directory (hence measurement set)."
)

logger.info(f"Renaming {ms.path} to {target=}")
ms.path.rename(target=target)

# Just some sanity incase None is passed through
if not (corrected_data or data):
return ms.with_options(path=target)

# Now move the corrected data column into the column to be imaged.
# For casa it really needs to be DATA
with table(str(target), readonly=False, ack=False) as tab:
colnames = tab.colnames()

if (
all([col in colnames for col in (corrected_data, data)])
and corrected_data != data
):
logger.info(f"Removing {data} and renaming {corrected_data}")
tab.removecols(columnnames=data)
tab.renamecol(oldname=corrected_data, newname=data)
elif corrected_data in colnames and data not in controlnames:
logger.info(f"Renaming {corrected_data=} to {data=}")
tab.renamecol(oldname=corrected_data, newname=data)
elif corrected_data not in colnames and data in colnames:
logger.warning(f"No {corrected_data=}, and {data=} seems to be present")
elif (
all([col in colnames for col in (corrected_data, data)])
and corrected_data == data
and corrected_data != "DATA"
):
data = "DATA"
logger.info(f"Renaming {corrected_data} to DATA")
tab.renamecol(corrected_data, data)

# This is a safe guard against my bad handling of the above / mutineers
# There could be interplay with these columns when potato peel is used
# as some MSs will have CORRECYED_DATA and others may not.
assert (
data == "DATA"
), f"Somehow data column is not DATA, instead {data=}. Likely a problem for casa."

return ms.with_options(path=target, column=data)


def find_mss(
mss_parent_path: Path, expected_ms_count: Optional[int] = 36
) -> Tuple[MS, ...]:
Expand Down
2 changes: 2 additions & 0 deletions flint/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,5 @@ class FieldOptions(NamedTuple):
"""Path that SBID archive tarballs will be created under. If None no archive tarballs are created. See ArchiveOptions. """
sbid_copy_path: Optional[Path] = None
"""Path that final processed products will be copied into. If None no copying of file products is performed. See ArchiveOptions. """
rename_ms: bool = False
"""Rename MSs throught rounds of imaging and self-cal instead of creating copies. This will delete data-columns throughout. """
9 changes: 6 additions & 3 deletions flint/prefect/common/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,20 +213,22 @@ def task_zip_ms(in_item: WSCleanCommand) -> Path:

@task
def task_gaincal_applycal_ms(
wsclean_cmd: WSCleanCommand,
ms: Union[MS, WSCleanCommand],
round: int,
update_gain_cal_options: Optional[Dict[str, Any]] = None,
archive_input_ms: bool = False,
skip_selfcal: bool = False,
rename_ms: bool = False,
) -> MS:
"""Perform self-calibration using CASA gaincal and applycal.
Args:
wsclean_cmd (WSCleanCommand): A resulting wsclean output. This is used purely to extract the ``.ms`` attribute.
ms (Union[MS, WSCleanCommand]): A resulting wsclean output. This is used purely to extract the ``.ms`` attribute.
round (int): Counter indication which self-calibration round is being performed. A name is included based on this.
update_gain_cal_options (Optional[Dict[str, Any]], optional): Options used to overwrite the default ``gaincal`` options. Defaults to None.
archive_input_ms (bool, optional): If True the input measurement set is zipped. Defaults to False.
skip_selfcal (bool, optional): Should this self-cal be skipped. If `True`, the a new MS is created but not calibrated the appropriate new name and returned.
rename_ms (bool, optional): It `True` simply rename a MS and adjust columns appropriately (potentially deleting them) instead of copying the complete MS. If `True` `archive_input_ms` is ignored. Defaults to False.
Raises:
ValueError: Raised when a ``.ms`` attribute can not be obtained
Expand All @@ -236,7 +238,7 @@ def task_gaincal_applycal_ms(
"""
# TODO: Need to do a better type system to include the .ms
# TODO: This needs to be expanded to handle multiple MS
ms = wsclean_cmd.ms
ms = ms if isinstance(ms, MS) else ms.ms # type: ignore

if not isinstance(ms, MS):
raise ValueError(
Expand All @@ -249,6 +251,7 @@ def task_gaincal_applycal_ms(
update_gain_cal_options=update_gain_cal_options,
archive_input_ms=archive_input_ms,
skip_selfcal=skip_selfcal,
rename_ms=rename_ms,
)


Expand Down
10 changes: 9 additions & 1 deletion flint/prefect/flows/continuum_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,12 @@ def process_science_fields(
)

cal_mss = task_gaincal_applycal_ms.map(
wsclean_cmd=wsclean_cmds,
ms=wsclean_cmds,
round=current_round,
update_gain_cal_options=unmapped(gain_cal_options),
archive_input_ms=field_options.zip_ms,
skip_selfcal=skip_gaincal_current_round,
rename_ms=field_options.rename_ms,
wait_for=[
field_summary
], # To make sure field summary is created with unzipped MSs
Expand Down Expand Up @@ -672,6 +673,12 @@ def get_parser() -> ArgumentParser:
action="store_true",
help="Skip checking whether the path containing bandpass solutions exists (e.g. if solutions have already been applied)",
)
parser.add_argument(
"--rename-ms",
action="store_true",
default=False,
help="Rename MSs throught rounds of imaging and self-cal instead of creating copies. This will delete data-columns throughout. ",
)

return parser

Expand Down Expand Up @@ -714,6 +721,7 @@ def cli() -> None:
imaging_strategy=args.imaging_strategy,
sbid_archive_path=args.sbid_archive_path,
sbid_copy_path=args.sbid_copy_path,
rename_ms=args.rename_ms,
)

setup_run_process_science_field(
Expand Down
Loading

0 comments on commit 950b3b7

Please sign in to comment.