Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reordering applying solutions and fix ms visibility rotation #58

Merged
merged 13 commits into from
Jan 25, 2024
76 changes: 71 additions & 5 deletions flint/ms.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from fixms.fix_ms_corrs import fix_ms_corrs
from fixms.fix_ms_dir import fix_ms_dir

from flint.exceptions import MSError
from flint.logging import logger
from flint.naming import create_ms_name
from flint.utils import rsync_copy_directory
Expand Down Expand Up @@ -54,13 +55,28 @@ def ms(self) -> MS:
def cast(cls, ms: Union[MS, Path]) -> MS:
"""Create a MS instance, if necessary, given eith a Path or MS.

If the input is neither a MS instance or Path, the object will
be checked to see if it has a `.ms` attribute. If it does then
this will be used.

Args:
ms (Union[MS, Path]): The input type to consider

Raises:
MSError: Raised when the input ms can not be cast to an MS instance

Returns:
MS: A normalised MS
"""
ms = ms if isinstance(ms, MS) else MS(path=ms)
if isinstance(ms, MS):
# Nothing to do
pass
elif isinstance(ms, Path):
ms = MS(path=ms)
elif "ms" in dir(ms) and isinstance(ms.ms, MS):
ms = ms.ms
else:
raise MSError("Unable to convert to MS object. ")

return ms

Expand Down Expand Up @@ -314,7 +330,10 @@ def describe_ms(ms: Union[MS, Path], verbose: bool = True) -> MSSummary:


def split_by_field(
ms: Union[MS, Path], field: Optional[str] = None, out_dir: Optional[Path] = None
ms: Union[MS, Path],
field: Optional[str] = None,
out_dir: Optional[Path] = None,
column: Optional[str] = None,
) -> List[MS]:
"""Attempt to split an input measurement set up by the unique FIELDs recorded

Expand All @@ -323,6 +342,7 @@ def split_by_field(
field (Optional[str], optional): Desired field to extract. If None, all are split. Defaults to None.
out_dir (Optional[Path], optional): Output directory to write the fresh MSs to. If None, write to same directory as
parent MS. Defaults to None.
column (Optional[str], optional): If not None, set the column attribute of the output MS instance to this. Defaults to None.

Returns:
List[MS]: The output MSs split by their field name.
Expand Down Expand Up @@ -361,7 +381,9 @@ def split_by_field(
logger.info(f"Writing {str(out_path)} for {split_name}")
sub_ms.copy(str(out_path), deep=True)

out_mss.append(MS(path=out_path, beam=get_beam_from_ms(out_path)))
out_mss.append(
MS(path=out_path, beam=get_beam_from_ms(out_path), column=column)
)

return out_mss

Expand Down Expand Up @@ -456,12 +478,51 @@ def consistent_ms(ms1: MS, ms2: MS) -> bool:
return result


def rename_column_in_ms(
ms: MS,
original_column_name: str,
new_column_name: str,
update_tracked_column: bool = False,
) -> MS:
"""Rename a column in a measurement set. Optionally update the tracked
`data` column attribute of the input measurement set.

Args:
ms (MS): Measurement set with the column to rename
original_column_name (str): The name of the column that will be changed
new_column_name (str): The new name of the column set in `original_column_name`
update_tracked_column (bool, optional): Whether the `data` attribute of `ms` will be updated to `new_column_name`. Defaults to False.

Returns:
MS: The measurement set operated on
"""
ms = MS.cast(ms=ms)

with table(tablename=str(ms.path), readonly=False, ack=False) as tab:
colnames = tab.colnames()
assert (
original_column_name in colnames
), f"{original_column_name=} missing from {ms}"
assert (
new_column_name not in colnames
), f"{new_column_name=} already exists in {ms}"

logger.info(f"Renaming {original_column_name} to {new_column_name}")
tab.renamecol(oldname=original_column_name, newname=new_column_name)

if update_tracked_column:
ms = ms.with_options(column=new_column_name)

return ms


def preprocess_askap_ms(
ms: Union[MS, Path],
data_column: str = "DATA",
instrument_column: str = "INSTRUMENT_DATA",
overwrite: bool = True,
skip_rotation: bool = False,
fix_stokes_factor: bool = False,
) -> MS:
"""The ASKAP MS stores its data in a way that is not immediatedly accessible
to other astronomical software, like wsclean or casa. For each measurement set
Expand All @@ -482,6 +543,7 @@ def preprocess_askap_ms(
instrument_column (str, optional): The name of the column that will hold the original `data_column` data. Defaults to 'INSTRUMENT_DATA'
overwrite (bool, optional): If the `instrument_column` and `data_column` both exist and `overwrite=True` the `data_column` will be overwritten. Otherwise, a `ValueError` is raised. Defaults to True.
skip_rotation (bool, optional): If true, the visibilities are not rotated Defaults to False.
fix_stokes_factor (bool, optional): Apply the stokes scaling factor (aruses in different definition of Stokes between Ynadasoft and other applications) when rotation visibilities. This should be set to False is the bandpass solutions have already absorded this scaling term. Defaults to False.

Returns:
MS: An updated measurement set with the corrections applied.
Expand Down Expand Up @@ -522,15 +584,19 @@ def preprocess_askap_ms(
if skip_rotation:
# TODO: Should we copy the DATA to INSTRUMENT_DATA?
logger.info("Skipping the rotation of the visibilities. ")
ms = ms.with_options(column=data_column)
logger.info(f"Returning {ms=}.")
return ms.with_options(column=data_column)
return ms

logger.info("Applying roation matrix to correlations. ")
logger.info(
f"Rotating visibilities for {ms.path} with data_column={instrument_column} amd corrected_data_column={data_column}"
)
fix_ms_corrs(
ms=ms.path, data_column=instrument_column, corrected_data_column=data_column
ms=ms.path,
data_column=instrument_column,
corrected_data_column=data_column,
fix_stokes_factor=fix_stokes_factor,
)

return ms.with_options(column=data_column)
Expand Down
4 changes: 2 additions & 2 deletions flint/prefect/common/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
create_snr_mask_wbutter_from_fits,
extract_beam_mask_from_mosaic,
)
from flint.ms import MS, preprocess_askap_ms, split_by_field
from flint.ms import MS, preprocess_askap_ms, rename_column_in_ms, split_by_field
from flint.naming import FITSMaskNames, processed_ms_format
from flint.options import FieldOptions
from flint.prefect.common.utils import upload_image_as_artifact
Expand All @@ -46,7 +46,7 @@
task_split_by_field = task(split_by_field)
task_select_solution_for_ms = task(select_aosolution_for_ms)
task_create_apply_solutions_cmd = task(create_apply_solutions_cmd)

task_rename_column_in_ms = task(rename_column_in_ms)

# Tasks below are extracting componented from earlier stages, or are
# otherwise doing something important
Expand Down
1 change: 1 addition & 0 deletions flint/prefect/flows/bandpass_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def calibrate_bandpass_flow(
flagger_container=flagger_container,
model_path=model_path,
source_name_prefix=source_name_prefix,
skip_rotation=True,
)

return output_split_bandpass_path
Expand Down
40 changes: 27 additions & 13 deletions flint/prefect/flows/continuum_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
task_get_common_beam,
task_linmos_images,
task_preprocess_askap_ms,
task_rename_column_in_ms,
task_run_bane_and_aegean,
task_select_solution_for_ms,
task_split_by_field,
Expand Down Expand Up @@ -64,9 +65,14 @@ def process_science_fields(
Path(split_path / science_folder_name).absolute().resolve()
)

if not output_split_science_path.exists():
logger.info(f"Creating {str(output_split_science_path)}")
output_split_science_path.mkdir(parents=True)
if output_split_science_path.exists():
logger.critical(
f"{output_split_science_path=} already exists. It should not. Exiting. "
)
raise ValueError("Output science directory already exists. ")

logger.info(f"Creating {str(output_split_science_path)}")
output_split_science_path.mkdir(parents=True)

logger.info(f"{field_options=}")

Expand All @@ -84,27 +90,35 @@ def process_science_fields(
logger.info(f"Constructed the following {calibrate_cmds=}")

split_science_mss = task_split_by_field.map(
ms=science_mss, field=None, out_dir=unmapped(output_split_science_path)
ms=science_mss,
field=None,
out_dir=unmapped(output_split_science_path),
column=unmapped("DATA"),
)
flat_science_mss = task_flatten.submit(split_science_mss)

preprocess_science_mss = task_preprocess_askap_ms.map(
ms=flat_science_mss,
data_column=unmapped("DATA"),
instrument_column=unmapped("INSTRUMENT_DATA"),
overwrite=True,
)
solutions_paths = task_select_solution_for_ms.map(
calibrate_cmds=unmapped(calibrate_cmds), ms=preprocess_science_mss
calibrate_cmds=unmapped(calibrate_cmds), ms=flat_science_mss
)
apply_solutions_cmds = task_create_apply_solutions_cmd.map(
ms=preprocess_science_mss,
ms=flat_science_mss,
solutions_file=solutions_paths,
container=field_options.calibrate_container,
)
column_rename_mss = task_rename_column_in_ms.map(
ms=apply_solutions_cmds,
original_column_name=unmapped("DATA"),
new_column_name=unmapped("INSTRUMENT_DATA"),
)
preprocess_science_mss = task_preprocess_askap_ms.map(
ms=column_rename_mss,
data_column=unmapped("CORRECTED_DATA"),
instrument_column=unmapped("DATA"),
overwrite=True,
)

flagged_mss = task_flag_ms_aoflagger.map(
ms=apply_solutions_cmds, container=field_options.flagger_container, rounds=1
ms=preprocess_science_mss, container=field_options.flagger_container, rounds=1
)

if field_options.no_imaging:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ numpy = "<1.24"
python-casacore = "*"
scipy = "*"
spython = "^0.3.1"
fixms = "^0.1.1"
fixms = "^0.1.5"
matplotlib = "*"
prefect = "^2.10.0"
prefect-dask = "^0.2.4"
Expand Down
27 changes: 27 additions & 0 deletions tests/test_ms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"""
from pathlib import Path

import pytest

from flint.calibrate.aocalibrate import ApplySolutions
from flint.exceptions import MSError
from flint.ms import MS


Expand All @@ -11,3 +15,26 @@ def test_ms_self_attribute():
ms = MS(path=ex)

assert ms.ms.path == ex


def test_ms_from_options():
path = Path("example.ms")
solutions = ApplySolutions(
cmd="none", solution_path=Path("example_sols.bin"), ms=MS(path=path)
)

example_ms = MS.cast(solutions)
ms = MS(path=path)

assert isinstance(example_ms, MS)
assert example_ms == ms


def test_raise_error_ms_from_options():
path = Path("example.ms")
solutions = ApplySolutions(
cmd="none", solution_path=Path("example_sols.bin"), ms=path
)

with pytest.raises(MSError):
MS.cast(solutions)
Loading