Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use updated fitscube to corm larger than memory cubes #196

Merged
merged 31 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
logging
- added flag to disable logging singularity command output to the
`flint.logging.logger`
- subtract flow will remove files whenever possible (remove original files
after convolving, removing convolved files after linmos, remove linmos after
combining)

# 0.2.8

Expand Down
4 changes: 4 additions & 0 deletions flint/coadd/linmos.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class LinmosCommand(NamedTuple):
"""The output location that the generated linmos parset has been written to"""
image_fits: Path
"""Path to the output linmos image created (or will be). """
weight_fits: Path
"""Path to the output weight image formed through the linmos process"""


class BoundingBox(NamedTuple):
Expand Down Expand Up @@ -500,6 +502,7 @@ def generate_linmos_parameter_set(
f"linmos.weightstate = Inherent\n"
f"linmos.cutoff = {cutoff}\n"
f"linmos.finalcutoff = 0.01\n"
"linmos.imageaccess.axis = 1\n" # WSClean outputs frequency as second dimension (so 1 in zero indexing)
)
# Construct the holography section of the linmos parset
parset += _get_holography_linmos_options(holofile=holofile, pol_axis=pol_axis)
Expand Down Expand Up @@ -576,6 +579,7 @@ def linmos_images(
cmd=linmos_cmd_str,
parset=linmos_parset,
image_fits=linmos_names.image_fits.absolute(),
weight_fits=linmos_names.weight_fits.absolute(),
)

# Trim the fits image to remove empty pixels
Expand Down
28 changes: 4 additions & 24 deletions flint/imager/wsclean.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def _rename_wsclean_title(name_str: str) -> str:
Returns:
str: The modified string if a wsclean string was matched, otherwise the input `name-str`
"""
search_re = r"(-(i|q|u|v|xx|xy|yx|yy))?-((MFS|[0-9]{4}))(-t[0-9]{5})?-(image|dirty|model|residual|psf)"
search_re = r"(-(i|q|u|v|xx|xy|yx|yy))?(-(MFS|[0-9]{4}))?(-t[0-9]{5})?-(image|dirty|model|residual|psf)"
match_re = re.compile(search_re)

logger.info(f"Searching {name_str=} for wsclean added components")
Expand Down Expand Up @@ -722,35 +722,15 @@ def combine_subbands_to_cube(
logger.info(f"Not enough subband images for {mode=}, not creating a cube")
continue

logger.info(f"Combining {len(subband_images)} images. {subband_images=}")
hdu1, freqs = combine_fits(file_list=subband_images)

# This changes the output cube to a shape of (chan, pol, dec, ra)
# which is what yandasoft linmos tasks like
new_header = hdu1[0].header # type: ignore
data_cube = hdu1[0].data # type: ignore

tmp_header = new_header.copy()
# Need to swap NAXIS 3 and 4 to make LINMOS happy - booo
for a, b in ((3, 4), (4, 3)):
new_header[f"CTYPE{a}"] = tmp_header[f"CTYPE{b}"]
new_header[f"CRPIX{a}"] = tmp_header[f"CRPIX{b}"]
new_header[f"CRVAL{a}"] = tmp_header[f"CRVAL{b}"]
new_header[f"CDELT{a}"] = tmp_header[f"CDELT{b}"]
new_header[f"CUNIT{a}"] = tmp_header[f"CUNIT{b}"]

# Cube is currently STOKES, FREQ, RA, DEC - needs to be FREQ, STOKES, RA, DEC
data_cube = np.moveaxis(data_cube, 1, 0)
hdu1[0].data = data_cube # type: ignore
hdu1[0].header = new_header # type: ignore

output_cube_name = create_image_cube_name(
image_prefix=Path(imageset.prefix), mode=mode
)

logger.info(f"Combining {len(subband_images)} images. {subband_images=}")
freqs = combine_fits(file_list=subband_images, out_cube=output_cube_name)

# Write out the hdu to preserve the beam table constructed in fitscube
logger.info(f"Writing {output_cube_name=}")
hdu1.writeto(output_cube_name)

output_freqs_name = Path(output_cube_name).with_suffix(".freqs_Hz.txt")
np.savetxt(output_freqs_name, freqs.to("Hz").value)
Expand Down
145 changes: 134 additions & 11 deletions flint/naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,101 @@ def get_fits_cube_from_paths(paths: List[Path]) -> List[Path]:
return cube_files


LONG_FIELD_TO_SHORTHAND = {"sbid": "SB", "beam": "beam", "ch": "ch", "round": "round"}
"""Name mapping between the longform of ProcessedFieldComponents and shorthands used"""


def _long_field_name_to_shorthand(long_name: str) -> str:
"""Name mapping between the longform of ProcessedFieldComponents and shorthands used"""
if long_name in LONG_FIELD_TO_SHORTHAND:
return LONG_FIELD_TO_SHORTHAND[long_name]

return ""


def create_name_from_common_fields(
in_paths: Tuple[Path, ...], additional_suffixes: Optional[str] = None
) -> Path:
"""Attempt to craft a base name using the field elements that are in common.
The expectation that these are paths that can be processed by the ``processed_name_format``
handler. Resulting fields that are common across all ``in_paths`` are preserved.

Only fields that are recognised as a known property are retained. Suffixes that do not
form a component are ignored. For example:

>>> "59058/SB59058.RACS_1626-84.ch0287-0288.linmos.fits"

the `linmos.fits` would be ignored.

All ``in_paths`` should be detected, otherwise an ValueError is raised

Args:
in_paths (Tuple[Path, ...]): Collection of input paths to consider
additional_suffixes (Optional[str], optional): Add an additional set of suffixes before returning. Defaults to None.

Raises:
ValueError: Raised if any of the ``in_paths`` fail to conform to ``flint`` processed name format

Returns:
Path: Common fields with the same base parent path
"""
from flint.options import options_to_dict

in_paths = tuple(Path(p) for p in in_paths)
parent = in_paths[0].parent
processed_components = list(map(processed_ms_format, in_paths))

if None in processed_components:
raise ValueError("Processed name format failed")
processed_components_dict = [options_to_dict(pc) for pc in processed_components]

keys_to_test = processed_components_dict[0].keys()
logger.info(f"{keys_to_test=}")
# One of the worst crimes on the seven seas I have ever done
# If a field is None, it was not detected. If a field is not constanst
# across all input paths, it is ignored. Should a field be considered
# common across all input paths, look up its short hand that
# would otherwise be usede and use it.
constant_fields = [
f"{_long_field_name_to_shorthand(long_name=key)}{processed_components_dict[0][key]}"
for key in keys_to_test
if len(set([pcd[key] for pcd in processed_components_dict])) == 1
and processed_components_dict[0][key] is not None
]
logger.info(f"Identified {constant_fields=}")

name = ".".join(constant_fields)
if additional_suffixes:
additional_suffixes = (
f".{additional_suffixes}"
if not additional_suffixes.startswith(".")
else additional_suffixes
)
name += additional_suffixes
return Path(parent) / Path(name)


# TODO: Need to assess the mode argument, and define literals that are accepted
def create_image_cube_name(
image_prefix: Path, mode: str, suffix: str = "cube.fits"
image_prefix: Path,
mode: Optional[Union[str, List[str]]] = None,
suffix: Optional[Union[str, List[str]]] = None,
) -> Path:
"""Create a consistent naming scheme when combining images into cube images. Intended to
be used when combining many subband images together into a single cube.

The name returned will be:
>> {image_prefix}.{mode}.{suffix}
>>> {image_prefix}.{mode}.{suffix}.cube.fits

Should ``mode`` or ``suffix`` be a list, they will be joined with '.' separators. Hence, no
'.' should be added.

This function will always output 'cube.fits' at the end of the returned file name.

Args:
image_prefix (Path): The unique path of the name. Generally this is the common part among the input planes
mode (str): Additional mode to add to the file name
suffix (str, optional): The final suffix to appended. Defaults to ".cube.fits".
mode (Optional[Union[str, List[str]]], optional): Additional mode/s to add to the file name. Defaults to None.
suffix (Optional[Union[str, List[str]]], optional): Additional suffix/s to add before the final 'cube.fits'. Defaults to None.

Returns:
Path: The final path and file name
Expand All @@ -50,6 +132,27 @@ def create_image_cube_name(
# it here for future proofing
output_cube_name = f"{str(Path(image_prefix))}.{mode}.{suffix}"

output_components = [str(Path(image_prefix))]
if mode:
# TODO: Assess what modes are actually allowed. Suggestion is to
# make a class of some sort with specified and known markers that
# are opted into. Hate this "everything and anything"
(
output_components.append(mode)
if isinstance(mode, str)
else output_components.extend(mode)
)
if suffix:
# TODO: See above. Need a class of acceptable suffixes to use
(
output_components.append(suffix)
if isinstance(suffix, str)
else output_components.extend(suffix)
)

output_components.append("cube.fits")

output_cube_name = ".".join(output_components)
return Path(output_cube_name)


Expand All @@ -72,13 +175,13 @@ def create_imaging_name_prefix(

ms_path = MS.cast(ms=ms).path

name = ms_path.stem
names = [ms_path.stem]
if pol:
name = f"{name}.{pol.lower()}"
names.append(f"{pol.lower()}")
if channel_range:
name = f"{name}.ch{channel_range[0]}-{channel_range[1]}"
names.append(f"ch{channel_range[0]:04}-{channel_range[1]:04}")

return name
return ".".join(names)


def get_beam_resolution_str(mode: str, marker: Optional[str] = None) -> str:
Expand Down Expand Up @@ -280,14 +383,16 @@ class ProcessedNameComponents(NamedTuple):
"""The sbid of the observation"""
field: str
"""The name of the field extracted"""
beam: str
beam: Optional[str] = None
"""The beam of the observation processed"""
spw: Optional[str] = None
"""The SPW of the observation. If there is only one spw this is None."""
round: Optional[str] = None
"""The self-calibration round detected. This might be represented as 'noselfcal' in some image products, e.g. linmos. """
pol: Optional[str] = None
"""The polarisation component, if it exists, in a filename. Examples are 'i','q','u','v'. Could be combinations in some cases depending on how it was created (e.g. based on wsclean pol option). """
channel_range: Optional[Tuple[int, int]] = None
"""The channel range encoded in an file name. Generally are zero-padded, and are two fields of the form ch1234-1235, where the upper bound is exclusive. Defaults to none."""


def processed_ms_format(
Expand All @@ -306,9 +411,19 @@ def processed_ms_format(
in_name = in_name.name if isinstance(in_name, Path) else in_name

logger.debug(f"Matching {in_name}")
# TODO: Should the Beam and field items be relaxed and allowed to be optional?
# TODOL At very least I think the beam should become options
# A raw string is used to avoid bad unicode escaping
regex = re.compile(
r"^SB(?P<sbid>[0-9]+)\.(?P<field>.+)\.beam(?P<beam>[0-9]+)((\.spw(?P<spw>[0-9]+))?)((\.round(?P<round>[0-9]+))?)((\.(?P<pol>(i|q|u|v|xx|yy|xy|yx)+))?)*"
(
r"^SB(?P<sbid>[0-9]+)"
r"\.(?P<field>[^.]+)"
r"((\.beam(?P<beam>[0-9]+))?)"
r"((\.spw(?P<spw>[0-9]+))?)"
r"((\.round(?P<round>[0-9]+))?)"
r"((\.(?P<pol>(i|q|u|v|xx|yy|xy|yx)+))?)"
r"((\.ch(?P<chl>([0-9]+))-(?P<chh>([0-9]+)))?)"
)
)
results = regex.match(in_name)

Expand All @@ -320,13 +435,16 @@ def processed_ms_format(

logger.debug(f"Matched groups are: {groups}")

channel_range = (int(groups["chl"]), int(groups["chh"])) if groups["chl"] else None

return ProcessedNameComponents(
sbid=groups["sbid"],
field=groups["field"],
beam=groups["beam"],
spw=groups["spw"],
round=groups["round"],
pol=groups["pol"],
channel_range=channel_range,
)


Expand Down Expand Up @@ -392,6 +510,10 @@ def extract_beam_from_name(name: Union[str, Path]) -> int:
"""

results = extract_components_from_name(name=name)
if results is None or results.beam is None:
raise ValueError(
f"Failed to convert to processed name format and find beam: {name=} {results=}"
)

return int(results.beam)

Expand Down Expand Up @@ -448,7 +570,8 @@ def create_ms_name(
ms_name_list.append(field)

if format_components:
ms_name_list.append(f"beam{int(format_components.beam):02d}")
if format_components.beam is not None:
ms_name_list.append(f"beam{int(format_components.beam):02d}")
if format_components.spw:
ms_name_list.append(f"spw{format_components.spw}")

Expand Down
31 changes: 11 additions & 20 deletions flint/prefect/common/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
FITSMaskNames,
get_beam_resolution_str,
get_fits_cube_from_paths,
processed_ms_format,
)
from flint.options import FieldOptions, SubtractFieldOptions
from flint.peel.potato import potato_peel
Expand Down Expand Up @@ -597,21 +596,21 @@ def task_linmos_images(
cutoff: float = 0.05,
field_summary: Optional[FieldSummary] = None,
trim_linmos_fits: bool = True,
remove_original_images: bool = False,
) -> LinmosCommand:
"""Run the yandasoft linmos task against a set of input images

Args:
images (Collection[Collection[Path]]): Images that will be co-added together
container (Path): Path to singularity container that contains yandasoft
filter (Optional[str], optional): Filter to extract the images that will be extracted from the set of input images. These will be co-added. If None all images are co-aded. Defaults to ".MFS.".
field_name (Optional[str], optional): Name of the field, which is included in the output images created. Defaults to None.
suffix_str (str, optional): Additional string added to the prefix of the output linmos image products. Defaults to "noselfcal".
holofile (Optional[Path], optional): The FITS cube with the beam corrections derived from ASKAP holography. Defaults to None.
sbid (Optional[Union[int,str]], optional): SBID of the data being imaged. Defaults to None.
parset_output_path (Optional[str], optional): Location to write the linmos parset file to. Defaults to None.
cutoff (float, optional): The primary beam attenuation cutoff supplied to linmos when coadding. Defaults to 0.05.
field_summary (Optional[FieldSummary], optional): The summary of the field, including (importantly) to orientation of the third-axis. Defaults to None.
trim_linmos_fits (bool, optional): Attempt to trim the output linmos files of as much empty space as possible. Defaults to True.
remove_original_images (bool, optional): If True remove the original image after they have been convolved. Defaults to False.

Returns:
LinmosCommand: The linmos command and associated meta-data
Expand All @@ -630,24 +629,12 @@ def task_linmos_images(
)
logger.info(f"Number of filtered images to linmos: {len(filter_images)}")

candidate_image = filter_images[0]
candidate_image_fields = processed_ms_format(in_name=candidate_image)
assert (
candidate_image_fields is not None
), f"{candidate_image=}, which should not happen"

if field_name is None:
field_name = candidate_image_fields.field
logger.info(f"Extracted {field_name=} from {candidate_image=}")

if sbid is None:
sbid = candidate_image_fields.sbid
logger.info(f"Extracted {sbid=} from {candidate_image=}")
from flint.naming import create_name_from_common_fields

base_name = f"SB{sbid}.{field_name}.{suffix_str}"

out_dir = Path(filter_images[0].parent)
out_name = out_dir / base_name
out_name = create_name_from_common_fields(
in_paths=tuple(filter_images), additional_suffixes=suffix_str
)
out_dir = out_name.parent
logger.info(f"Base output image name will be: {out_name}")

out_file_name = (
Expand All @@ -672,6 +659,9 @@ def task_linmos_images(
pol_axis=pol_axis,
trim_linmos_fits=trim_linmos_fits,
)
if remove_original_images:
logger.info(f"Removing {len(filter_images)} input images")
_ = [image_path.unlink() for image_path in filter_images] # type: ignore

return linmos_cmd

Expand Down Expand Up @@ -726,6 +716,7 @@ def _convolve_linmos(
field_summary=field_summary,
trim_linmos_fits=trim_linmos_fits,
filter=convol_filter,
remove_original_images=remove_original_images,
) # type: ignore

return parset
Expand Down
Loading