Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up and use -save-source-list and addmodel for predicting at full channel resolution #190

Merged
merged 16 commits into from
Nov 25, 2024
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
- Removed `suppress_artefact` and `minimum_absolute_clip` functions from
`flint.masking`
- Added an adaptive box selection mode to the minimum absolute algorithm
- Update a MSs `MODEL_DATA` column using `addmodel` and a source list (see
`wsclean -save-source-list`)
- Added a `taql` based function intended to be used to subtract model data from
nominated data, `flint.ms.subtract_model_from_data_column`

# 0.2.7

Expand Down
87 changes: 71 additions & 16 deletions flint/calibrate/aocalibrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Dict,
Iterable,
List,
Literal,
NamedTuple,
Optional,
Tuple,
Expand All @@ -34,13 +35,28 @@
)
from flint.exceptions import PhaseOutlierFitError
from flint.logging import logger
from flint.ms import MS, consistent_ms, get_beam_from_ms
from flint.ms import MS, consistent_ms, get_beam_from_ms, remove_columns_from_ms
from flint.naming import get_aocalibrate_output_path
from flint.options import BaseOptions
from flint.sclient import run_singularity_command
from flint.utils import create_directory


class CalibrateOptions(NamedTuple):
class AddModelOptions(BaseOptions):
"""Container for options into the ``addmodel`` program packaged
with ``aocalibrate``"""

model_path: Path
"""Path to the sky-model that will be inserted"""
ms_path: Path
"""Path to the measurement set that will be interacted with"""
mode: Literal["a", "s", "c", "v"]
"""The mode ``addmodel`` will be operating under, where where a=add model to visibilities (default), s=subtract model from visibilities, c=copy model to visibilities, z=zero visibilities"""
datacolumn: str
"""The column that will be operated against"""


class CalibrateOptions(BaseOptions):
"""Structure used to represent options into the `calibrate` program

These attributes have the same names as options into the `calibrate`
Expand All @@ -60,14 +76,8 @@ class CalibrateOptions(NamedTuple):
p: Optional[Tuple[Path, Path]] = None
"""Plot output names for the amplitude gain and phases"""

def with_options(self, **kwargs) -> CalibrateOptions:
options = self._asdict()
options.update(**kwargs)

return CalibrateOptions(**options)


class CalibrateCommand(NamedTuple):
class CalibrateCommand(BaseOptions):
"""The AO Calibrate command and output path of the corresponding solutions file"""

cmd: str
Expand All @@ -83,14 +93,8 @@ class CalibrateCommand(NamedTuple):
preflagged: bool = False
"""Indicates whether the solution file has gone through preflagging routines. """

def with_options(self, **kwargs) -> CalibrateCommand:
_dict = self._asdict()
_dict.update(**kwargs)

return CalibrateCommand(**_dict)


class ApplySolutions(NamedTuple):
class ApplySolutions(BaseOptions):
"""The applysolutions command to execute"""

cmd: str
Expand Down Expand Up @@ -855,6 +859,10 @@ class FlaggedAOSolution(NamedTuple):
"""The bandpass solutions after flagging, as saved in the solutions file"""


# TODO: These options are too much and should be placed
# into a BaseOptions


def flag_aosolutions(
solutions_path: Path,
ref_ant: int = -1,
Expand Down Expand Up @@ -1081,6 +1089,53 @@ def flag_aosolutions(
return flagged_aosolutions


def add_model_options_to_command(add_model_options: AddModelOptions) -> str:
"""Generate the command to execute ``addmodel``

Args:
add_model_options (AddModelOptions): The collection of supported options used to generate the command

Returns:
str: The generated addmodel command
"""
logger.info("Generating addmodel command")
command = f"addmodel -datacolumn {add_model_options.datacolumn} -m {add_model_options.mode} "
command += f"{str(add_model_options.model_path)} {str(add_model_options.ms_path)}"

return command


def add_model(
add_model_options: AddModelOptions, container: Path, remove_datacolumn: bool = False
) -> AddModelOptions:
"""Use the ``addmodel`` program to predict the sky-model visibilities
from a compatible source list (e.g. ``wsclean -save-source-list``)

Args:
add_model_options (AddModelOptions): The set of supported options to be supplied to ``addmodel``
container (Path): The calibrate container that contains the ``addmodel`` program
remove_datacolumn (bool, optional): Whether to first removed the ``datacolumn`` specified in ``add_model_options`` before predicting. If False it should be overwritten. Defaults to False.

Returns:
AddModelOptions: The options used to run ``addmodel`` (same as input)
"""
if remove_datacolumn:
remove_columns_from_ms(
ms=add_model_options.ms_path, columns_to_remove=add_model_options.datacolumn
)
add_model_command = add_model_options_to_command(
add_model_options=add_model_options
)

run_singularity_command(
image=container,
command=add_model_command,
bind_dirs=[add_model_options.ms_path, add_model_options.model_path],
)

return add_model_options


def get_parser() -> ArgumentParser:
parser = ArgumentParser(
description="Run calibrate and apply the solutions given a measurement set and sky-model."
Expand Down
77 changes: 63 additions & 14 deletions flint/imager/wsclean.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,24 @@
)


class ImageSet(NamedTuple):
class ImageSet(BaseOptions):
"""A structure to represent the images and auxiliary products produced by
wsclean"""

prefix: str
"""Prefix of the images and other output products. This should correspond to the -name argument from wsclean"""
image: Collection[Path]
image: List[Path]
"""Images produced. """
psf: Optional[Collection[Path]] = None
psf: Optional[List[Path]] = None
"""References to the PSFs produced by wsclean. """
dirty: Optional[Collection[Path]] = None
dirty: Optional[List[Path]] = None
"""Dirty images. """
model: Optional[Collection[Path]] = None
model: Optional[List[Path]] = None
"""Model images. """
residual: Optional[Collection[Path]] = None
residual: Optional[List[Path]] = None
"""Residual images."""
source_list: Optional[Path] = None
"""Path to a source list that accompanies the image data"""


class WSCleanOptions(BaseOptions):
Expand Down Expand Up @@ -156,9 +158,11 @@ class WSCleanOptions(BaseOptions):
"""The path to a temporary directory where files will be wrritten. """
pol: str = "i"
"""The polarisation to be imaged"""
save_source_list: bool = False
"""Saves the found clean components as a BBS/DP3 text sky model"""


class WSCleanCommand(NamedTuple):
class WSCleanCommand(BaseOptions):
"""Simple container for a wsclean command."""

cmd: str
Expand All @@ -172,11 +176,43 @@ class WSCleanCommand(NamedTuple):
cleanup: bool = True
"""Will clean up the dirty images/psfs/residuals/models when the imaging has completed"""

def with_options(self, **kwargs) -> WSCleanCommand:
_dict = self._asdict()
_dict.update(**kwargs)

return WSCleanCommand(**_dict)
def get_wsclean_output_source_list_path(
name_path: Union[str, Path], pol: Optional[str] = None
) -> Path:
"""WSClean can produce a text file that describes the components
that it cleaned, their type, scale and brightness. These are
placed in a file that is:

>> {name}.{pol}-sources.txt

where ``name`` represented the `-name` component. Given
an input measurement set path or this `-name` value return
the expected source list text file. ``pol`` is the stokes
that the source is expected.

Args:
name_path (Union[str,Path]): Value of the ``-name`` option. If `str` converted to a ``Path``
pol (Optional[str], optional): The polarisation to add to the name. If None the -source.txt suffix is simply appended. Defaults to None.

Returns:
Path: Path to the source list text file
"""

# ye not be trusted
name_path = Path(name_path)
base_name = name_path.name
if ".ms" == Path(base_name).suffix:
base_name = Path(base_name).stem

logger.info(f"{base_name=} extracted from {name_path=}")

source_list_name = (
f"{base_name}.{pol}-sources.txt" if pol else f"{base_name}-sources.txt"
)
source_list_path = name_path.parent / source_list_name

return source_list_path


def _rename_wsclean_title(name_str: str) -> str:
Expand Down Expand Up @@ -242,6 +278,7 @@ def _wsclean_output_callback(line: str) -> None:
raise CleanDivergenceError(f"Clean divergence detected: {line}")


# TODO: Update this function to also add int the source list
def get_wsclean_output_names(
prefix: str,
subbands: int,
Expand Down Expand Up @@ -297,7 +334,7 @@ def get_wsclean_output_names(
if isinstance(output_types, str):
output_types = (output_types,)

images: Dict[str, Collection[Path]] = {}
images: Dict[str, List[Path]] = {}
for image_type in ("image", "dirty", "model", "residual"):
if image_type not in output_types:
continue
Expand Down Expand Up @@ -485,7 +522,9 @@ def _resolve_wsclean_key_value_to_cli_str(key: str, value: Any) -> ResolvedCLIRe


def create_wsclean_cmd(
ms: MS, wsclean_options: WSCleanOptions, container: Optional[Path] = None
ms: MS,
wsclean_options: WSCleanOptions,
container: Optional[Path] = None,
) -> WSCleanCommand:
"""Create a wsclean command from a WSCleanOptions container

Expand Down Expand Up @@ -769,6 +808,14 @@ def run_wsclean_imager(
check_exists_when_adding=True,
)

if wsclean_cmd.options.save_source_list:
logger.info("Attaching the wsclean clean components SEDs")
source_list_path = get_wsclean_output_source_list_path(
name_path=prefix, pol=None
)
assert source_list_path.exists(), f"{source_list_path=} does not exist"
imageset = imageset.with_options(source_list=source_list_path)

if make_cube_from_subbands:
imageset = combine_subbands_to_cube(
imageset=imageset, remove_original_images=True
Expand Down Expand Up @@ -808,7 +855,9 @@ def wsclean_imager(
assert ms.column is not None, "A MS column needs to be elected for imaging. "
wsclean_options = wsclean_options.with_options(data_column=ms.column)
wsclean_cmd = create_wsclean_cmd(
ms=ms, wsclean_options=wsclean_options, container=wsclean_container
ms=ms,
wsclean_options=wsclean_options,
container=wsclean_container,
)

return wsclean_cmd
Expand Down
65 changes: 64 additions & 1 deletion flint/ms.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def critical_ms_interaction(
assert (
not output_ms.exists()
), f"The output measurement set {output_ms} already exists. "

logger.info(f"Critical section for {input_ms=}")
if copy:
rsync_copy_directory(target_path=input_ms, out_path=output_ms)
else:
Expand Down Expand Up @@ -543,6 +543,69 @@ def rename_column_in_ms(
return ms


def remove_columns_from_ms(
ms: Union[MS, Path], columns_to_remove: Union[str, List[str]]
) -> List[str]:
"""Attempt to remove a collection of columns from a measurement set.
If any of the provided columns do not exist they are ignored.

Args:
ms (Union[MS, Path]): The measurement set to inspect and remove columns from
columns_to_remove (Union[str, List[str]]): Collection of column names to remove. If a single column internally it is cast to a list of length 1.

Returns:
List[str]: Collection of column names removed
"""

if isinstance(columns_to_remove, str):
columns_to_remove = [columns_to_remove]

ms = MS.cast(ms=ms)
with table(tablename=str(ms.path), readonly=False, ack=False) as tab:
colnames = tab.colnames()
columns_to_remove = [c for c in columns_to_remove if c in colnames]
if len(columns_to_remove) == 0:
logger.info(f"All columns provided do not exist in {ms.path}")
else:
logger.info(f"Removing {columns_to_remove=} from {ms.path}")
tab.removecols(columnnames=columns_to_remove)

return columns_to_remove


def subtract_model_from_data_column(
ms: MS, model_column: str = "MODEL_DATA", data_column: Optional[str] = None
) -> MS:
"""Execute a ``taql`` query to subtract the MODEL_DATA from a nominated data column.
This requires the ``model_column`` to already be inserted into the MS. Internally
the ``critical_ms_interaction`` context manager is used to highlight that the MS
is being modified should things fail when subtracting.

Args:
ms (MS): The measurement set instance being considered
model_column (str, optional): The column with representing the model. Defaults to "MODEL_DATA".
data_column (Optional[str], optional): The column where the column will be subtracted. If ``None`` it is taken from the ``column`` nominated by the input ``MS`` instance. Defaults to None.

Returns:
MS: The updated MS
"""
ms = MS.cast(ms)
data_column = data_column if data_column else ms.column
assert data_column is not None, f"{data_column=}, which is not allowed"
with critical_ms_interaction(input_ms=ms.path) as critical_ms:
with table(str(critical_ms), readonly=False) as tab:
logger.info("Extracting columns")
colnames = tab.colnames()
assert all(
[d in colnames for d in (model_column, data_column)]
), f"{model_column=} or {data_column=} missing from {colnames=}"

logger.info(f"Subtracting {model_column=} from {data_column=}")
taql(f"UPDATE $tab SET {data_column}={data_column}-{model_column}")

return ms


def preprocess_askap_ms(
ms: Union[MS, Path],
data_column: str = "DATA",
Expand Down
2 changes: 2 additions & 0 deletions flint/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ class FieldOptions(BaseOptions):
"""Specifies whether Stokes-V imaging will be carried out after the final round of imagine (whether or not self-calibration is enabled). """
coadd_cubes: bool = False
"""Co-add cubes formed throughout imaging together. Cubes will be smoothed channel-wise to a common resolution. Only performed on final set of images"""
update_model_data_with_source_list: bool = False
"""Attempt to update a MSs MODEL_DATA column with a source list (e.g. source list output from wsclean)"""


def dump_field_options_to_yaml(
Expand Down
Loading