Skip to content

Commit

Permalink
Use updated fitscube to corm larger than memory cubes (#196)
Browse files Browse the repository at this point in the history
* initial fitscube 0.5.0

* fixed the wsclean channel range naming extraction

* corrected data type expected

* max_workers set to 4

* attempting prefect sync

* corrected call into sync

* async runner finction

* another attempt

* function change

* calling coro combine fits

* attempt 938 for asyncio silly

* padded 0 channel range

* expanded regex to allow chNNNN-NNNN

* added a todo

* restructure naming prefix

* fixed the zero-padding

* create name from common fields

* using new common name format

* added round to shorthand

* added another test

* removing original images

* removing t he original images after linmos

* added the weights_fits attribute / combining weights

* added a test for long to short names

* managing output suffix

* removed unnecessary log

* adding cube to suffix string

* cube right place

* changed the cube create function

* added todo for create naming class

* corrected some types

---------

Co-authored-by: tgalvin <[email protected]>
  • Loading branch information
tjgalvin and tgalvin authored Dec 10, 2024
1 parent ff590bd commit 9946a0e
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 63 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
logging
- added flag to disable logging singularity command output to the
`flint.logging.logger`
- subtract flow will remove files whenever possible (remove original files
after convolving, removing convolved files after linmos, remove linmos after
combining)

# 0.2.8

Expand Down
4 changes: 4 additions & 0 deletions flint/coadd/linmos.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class LinmosCommand(NamedTuple):
"""The output location that the generated linmos parset has been written to"""
image_fits: Path
"""Path to the output linmos image created (or will be). """
weight_fits: Path
"""Path to the output weight image formed through the linmos process"""


class BoundingBox(NamedTuple):
Expand Down Expand Up @@ -500,6 +502,7 @@ def generate_linmos_parameter_set(
f"linmos.weightstate = Inherent\n"
f"linmos.cutoff = {cutoff}\n"
f"linmos.finalcutoff = 0.01\n"
"linmos.imageaccess.axis = 1\n" # WSClean outputs frequency as second dimension (so 1 in zero indexing)
)
# Construct the holography section of the linmos parset
parset += _get_holography_linmos_options(holofile=holofile, pol_axis=pol_axis)
Expand Down Expand Up @@ -576,6 +579,7 @@ def linmos_images(
cmd=linmos_cmd_str,
parset=linmos_parset,
image_fits=linmos_names.image_fits.absolute(),
weight_fits=linmos_names.weight_fits.absolute(),
)

# Trim the fits image to remove empty pixels
Expand Down
28 changes: 4 additions & 24 deletions flint/imager/wsclean.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def _rename_wsclean_title(name_str: str) -> str:
Returns:
str: The modified string if a wsclean string was matched, otherwise the input `name-str`
"""
search_re = r"(-(i|q|u|v|xx|xy|yx|yy))?-((MFS|[0-9]{4}))(-t[0-9]{5})?-(image|dirty|model|residual|psf)"
search_re = r"(-(i|q|u|v|xx|xy|yx|yy))?(-(MFS|[0-9]{4}))?(-t[0-9]{5})?-(image|dirty|model|residual|psf)"
match_re = re.compile(search_re)

logger.info(f"Searching {name_str=} for wsclean added components")
Expand Down Expand Up @@ -722,35 +722,15 @@ def combine_subbands_to_cube(
logger.info(f"Not enough subband images for {mode=}, not creating a cube")
continue

logger.info(f"Combining {len(subband_images)} images. {subband_images=}")
hdu1, freqs = combine_fits(file_list=subband_images)

# This changes the output cube to a shape of (chan, pol, dec, ra)
# which is what yandasoft linmos tasks like
new_header = hdu1[0].header # type: ignore
data_cube = hdu1[0].data # type: ignore

tmp_header = new_header.copy()
# Need to swap NAXIS 3 and 4 to make LINMOS happy - booo
for a, b in ((3, 4), (4, 3)):
new_header[f"CTYPE{a}"] = tmp_header[f"CTYPE{b}"]
new_header[f"CRPIX{a}"] = tmp_header[f"CRPIX{b}"]
new_header[f"CRVAL{a}"] = tmp_header[f"CRVAL{b}"]
new_header[f"CDELT{a}"] = tmp_header[f"CDELT{b}"]
new_header[f"CUNIT{a}"] = tmp_header[f"CUNIT{b}"]

# Cube is currently STOKES, FREQ, RA, DEC - needs to be FREQ, STOKES, RA, DEC
data_cube = np.moveaxis(data_cube, 1, 0)
hdu1[0].data = data_cube # type: ignore
hdu1[0].header = new_header # type: ignore

output_cube_name = create_image_cube_name(
image_prefix=Path(imageset.prefix), mode=mode
)

logger.info(f"Combining {len(subband_images)} images. {subband_images=}")
freqs = combine_fits(file_list=subband_images, out_cube=output_cube_name)

# Write out the hdu to preserve the beam table constructed in fitscube
logger.info(f"Writing {output_cube_name=}")
hdu1.writeto(output_cube_name)

output_freqs_name = Path(output_cube_name).with_suffix(".freqs_Hz.txt")
np.savetxt(output_freqs_name, freqs.to("Hz").value)
Expand Down
145 changes: 134 additions & 11 deletions flint/naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,101 @@ def get_fits_cube_from_paths(paths: List[Path]) -> List[Path]:
return cube_files


LONG_FIELD_TO_SHORTHAND = {"sbid": "SB", "beam": "beam", "ch": "ch", "round": "round"}
"""Name mapping between the longform of ProcessedFieldComponents and shorthands used"""


def _long_field_name_to_shorthand(long_name: str) -> str:
"""Name mapping between the longform of ProcessedFieldComponents and shorthands used"""
if long_name in LONG_FIELD_TO_SHORTHAND:
return LONG_FIELD_TO_SHORTHAND[long_name]

return ""


def create_name_from_common_fields(
in_paths: Tuple[Path, ...], additional_suffixes: Optional[str] = None
) -> Path:
"""Attempt to craft a base name using the field elements that are in common.
The expectation that these are paths that can be processed by the ``processed_name_format``
handler. Resulting fields that are common across all ``in_paths`` are preserved.
Only fields that are recognised as a known property are retained. Suffixes that do not
form a component are ignored. For example:
>>> "59058/SB59058.RACS_1626-84.ch0287-0288.linmos.fits"
the `linmos.fits` would be ignored.
All ``in_paths`` should be detected, otherwise an ValueError is raised
Args:
in_paths (Tuple[Path, ...]): Collection of input paths to consider
additional_suffixes (Optional[str], optional): Add an additional set of suffixes before returning. Defaults to None.
Raises:
ValueError: Raised if any of the ``in_paths`` fail to conform to ``flint`` processed name format
Returns:
Path: Common fields with the same base parent path
"""
from flint.options import options_to_dict

in_paths = tuple(Path(p) for p in in_paths)
parent = in_paths[0].parent
processed_components = list(map(processed_ms_format, in_paths))

if None in processed_components:
raise ValueError("Processed name format failed")
processed_components_dict = [options_to_dict(pc) for pc in processed_components]

keys_to_test = processed_components_dict[0].keys()
logger.info(f"{keys_to_test=}")
# One of the worst crimes on the seven seas I have ever done
# If a field is None, it was not detected. If a field is not constanst
# across all input paths, it is ignored. Should a field be considered
# common across all input paths, look up its short hand that
# would otherwise be usede and use it.
constant_fields = [
f"{_long_field_name_to_shorthand(long_name=key)}{processed_components_dict[0][key]}"
for key in keys_to_test
if len(set([pcd[key] for pcd in processed_components_dict])) == 1
and processed_components_dict[0][key] is not None
]
logger.info(f"Identified {constant_fields=}")

name = ".".join(constant_fields)
if additional_suffixes:
additional_suffixes = (
f".{additional_suffixes}"
if not additional_suffixes.startswith(".")
else additional_suffixes
)
name += additional_suffixes
return Path(parent) / Path(name)


# TODO: Need to assess the mode argument, and define literals that are accepted
def create_image_cube_name(
image_prefix: Path, mode: str, suffix: str = "cube.fits"
image_prefix: Path,
mode: Optional[Union[str, List[str]]] = None,
suffix: Optional[Union[str, List[str]]] = None,
) -> Path:
"""Create a consistent naming scheme when combining images into cube images. Intended to
be used when combining many subband images together into a single cube.
The name returned will be:
>> {image_prefix}.{mode}.{suffix}
>>> {image_prefix}.{mode}.{suffix}.cube.fits
Should ``mode`` or ``suffix`` be a list, they will be joined with '.' separators. Hence, no
'.' should be added.
This function will always output 'cube.fits' at the end of the returned file name.
Args:
image_prefix (Path): The unique path of the name. Generally this is the common part among the input planes
mode (str): Additional mode to add to the file name
suffix (str, optional): The final suffix to appended. Defaults to ".cube.fits".
mode (Optional[Union[str, List[str]]], optional): Additional mode/s to add to the file name. Defaults to None.
suffix (Optional[Union[str, List[str]]], optional): Additional suffix/s to add before the final 'cube.fits'. Defaults to None.
Returns:
Path: The final path and file name
Expand All @@ -50,6 +132,27 @@ def create_image_cube_name(
# it here for future proofing
output_cube_name = f"{str(Path(image_prefix))}.{mode}.{suffix}"

output_components = [str(Path(image_prefix))]
if mode:
# TODO: Assess what modes are actually allowed. Suggestion is to
# make a class of some sort with specified and known markers that
# are opted into. Hate this "everything and anything"
(
output_components.append(mode)
if isinstance(mode, str)
else output_components.extend(mode)
)
if suffix:
# TODO: See above. Need a class of acceptable suffixes to use
(
output_components.append(suffix)
if isinstance(suffix, str)
else output_components.extend(suffix)
)

output_components.append("cube.fits")

output_cube_name = ".".join(output_components)
return Path(output_cube_name)


Expand All @@ -72,13 +175,13 @@ def create_imaging_name_prefix(

ms_path = MS.cast(ms=ms).path

name = ms_path.stem
names = [ms_path.stem]
if pol:
name = f"{name}.{pol.lower()}"
names.append(f"{pol.lower()}")
if channel_range:
name = f"{name}.ch{channel_range[0]}-{channel_range[1]}"
names.append(f"ch{channel_range[0]:04}-{channel_range[1]:04}")

return name
return ".".join(names)


def get_beam_resolution_str(mode: str, marker: Optional[str] = None) -> str:
Expand Down Expand Up @@ -280,14 +383,16 @@ class ProcessedNameComponents(NamedTuple):
"""The sbid of the observation"""
field: str
"""The name of the field extracted"""
beam: str
beam: Optional[str] = None
"""The beam of the observation processed"""
spw: Optional[str] = None
"""The SPW of the observation. If there is only one spw this is None."""
round: Optional[str] = None
"""The self-calibration round detected. This might be represented as 'noselfcal' in some image products, e.g. linmos. """
pol: Optional[str] = None
"""The polarisation component, if it exists, in a filename. Examples are 'i','q','u','v'. Could be combinations in some cases depending on how it was created (e.g. based on wsclean pol option). """
channel_range: Optional[Tuple[int, int]] = None
"""The channel range encoded in an file name. Generally are zero-padded, and are two fields of the form ch1234-1235, where the upper bound is exclusive. Defaults to none."""


def processed_ms_format(
Expand All @@ -306,9 +411,19 @@ def processed_ms_format(
in_name = in_name.name if isinstance(in_name, Path) else in_name

logger.debug(f"Matching {in_name}")
# TODO: Should the Beam and field items be relaxed and allowed to be optional?
# TODOL At very least I think the beam should become options
# A raw string is used to avoid bad unicode escaping
regex = re.compile(
r"^SB(?P<sbid>[0-9]+)\.(?P<field>.+)\.beam(?P<beam>[0-9]+)((\.spw(?P<spw>[0-9]+))?)((\.round(?P<round>[0-9]+))?)((\.(?P<pol>(i|q|u|v|xx|yy|xy|yx)+))?)*"
(
r"^SB(?P<sbid>[0-9]+)"
r"\.(?P<field>[^.]+)"
r"((\.beam(?P<beam>[0-9]+))?)"
r"((\.spw(?P<spw>[0-9]+))?)"
r"((\.round(?P<round>[0-9]+))?)"
r"((\.(?P<pol>(i|q|u|v|xx|yy|xy|yx)+))?)"
r"((\.ch(?P<chl>([0-9]+))-(?P<chh>([0-9]+)))?)"
)
)
results = regex.match(in_name)

Expand All @@ -320,13 +435,16 @@ def processed_ms_format(

logger.debug(f"Matched groups are: {groups}")

channel_range = (int(groups["chl"]), int(groups["chh"])) if groups["chl"] else None

return ProcessedNameComponents(
sbid=groups["sbid"],
field=groups["field"],
beam=groups["beam"],
spw=groups["spw"],
round=groups["round"],
pol=groups["pol"],
channel_range=channel_range,
)


Expand Down Expand Up @@ -392,6 +510,10 @@ def extract_beam_from_name(name: Union[str, Path]) -> int:
"""

results = extract_components_from_name(name=name)
if results is None or results.beam is None:
raise ValueError(
f"Failed to convert to processed name format and find beam: {name=} {results=}"
)

return int(results.beam)

Expand Down Expand Up @@ -448,7 +570,8 @@ def create_ms_name(
ms_name_list.append(field)

if format_components:
ms_name_list.append(f"beam{int(format_components.beam):02d}")
if format_components.beam is not None:
ms_name_list.append(f"beam{int(format_components.beam):02d}")
if format_components.spw:
ms_name_list.append(f"spw{format_components.spw}")

Expand Down
31 changes: 11 additions & 20 deletions flint/prefect/common/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
FITSMaskNames,
get_beam_resolution_str,
get_fits_cube_from_paths,
processed_ms_format,
)
from flint.options import FieldOptions, SubtractFieldOptions
from flint.peel.potato import potato_peel
Expand Down Expand Up @@ -597,21 +596,21 @@ def task_linmos_images(
cutoff: float = 0.05,
field_summary: Optional[FieldSummary] = None,
trim_linmos_fits: bool = True,
remove_original_images: bool = False,
) -> LinmosCommand:
"""Run the yandasoft linmos task against a set of input images
Args:
images (Collection[Collection[Path]]): Images that will be co-added together
container (Path): Path to singularity container that contains yandasoft
filter (Optional[str], optional): Filter to extract the images that will be extracted from the set of input images. These will be co-added. If None all images are co-aded. Defaults to ".MFS.".
field_name (Optional[str], optional): Name of the field, which is included in the output images created. Defaults to None.
suffix_str (str, optional): Additional string added to the prefix of the output linmos image products. Defaults to "noselfcal".
holofile (Optional[Path], optional): The FITS cube with the beam corrections derived from ASKAP holography. Defaults to None.
sbid (Optional[Union[int,str]], optional): SBID of the data being imaged. Defaults to None.
parset_output_path (Optional[str], optional): Location to write the linmos parset file to. Defaults to None.
cutoff (float, optional): The primary beam attenuation cutoff supplied to linmos when coadding. Defaults to 0.05.
field_summary (Optional[FieldSummary], optional): The summary of the field, including (importantly) to orientation of the third-axis. Defaults to None.
trim_linmos_fits (bool, optional): Attempt to trim the output linmos files of as much empty space as possible. Defaults to True.
remove_original_images (bool, optional): If True remove the original image after they have been convolved. Defaults to False.
Returns:
LinmosCommand: The linmos command and associated meta-data
Expand All @@ -630,24 +629,12 @@ def task_linmos_images(
)
logger.info(f"Number of filtered images to linmos: {len(filter_images)}")

candidate_image = filter_images[0]
candidate_image_fields = processed_ms_format(in_name=candidate_image)
assert (
candidate_image_fields is not None
), f"{candidate_image=}, which should not happen"

if field_name is None:
field_name = candidate_image_fields.field
logger.info(f"Extracted {field_name=} from {candidate_image=}")

if sbid is None:
sbid = candidate_image_fields.sbid
logger.info(f"Extracted {sbid=} from {candidate_image=}")
from flint.naming import create_name_from_common_fields

base_name = f"SB{sbid}.{field_name}.{suffix_str}"

out_dir = Path(filter_images[0].parent)
out_name = out_dir / base_name
out_name = create_name_from_common_fields(
in_paths=tuple(filter_images), additional_suffixes=suffix_str
)
out_dir = out_name.parent
logger.info(f"Base output image name will be: {out_name}")

out_file_name = (
Expand All @@ -672,6 +659,9 @@ def task_linmos_images(
pol_axis=pol_axis,
trim_linmos_fits=trim_linmos_fits,
)
if remove_original_images:
logger.info(f"Removing {len(filter_images)} input images")
_ = [image_path.unlink() for image_path in filter_images] # type: ignore

return linmos_cmd

Expand Down Expand Up @@ -726,6 +716,7 @@ def _convolve_linmos(
field_summary=field_summary,
trim_linmos_fits=trim_linmos_fits,
filter=convol_filter,
remove_original_images=remove_original_images,
) # type: ignore

return parset
Expand Down
Loading

0 comments on commit 9946a0e

Please sign in to comment.