Skip to content

Commit

Permalink
Merge pull request #131 from tjgalvin/archiveopts
Browse files Browse the repository at this point in the history
Pull ArchiveOptions from strategy file in CLI of flint_archive and continuum pipeline
  • Loading branch information
tjgalvin authored Jul 2, 2024
2 parents 3ac69df + b31732e commit 0b669e1
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 56 deletions.
107 changes: 64 additions & 43 deletions flint/archive.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,19 @@
"""Operations around preserving files and products from an flint run"""

from __future__ import ( # Used for mypy/pylance to like the return type of MS.with_options
annotations,
)

import re
import shutil
import tarfile
from argparse import ArgumentParser
from pathlib import Path
from typing import Collection, List, NamedTuple, Tuple
from typing import Any, Collection, Dict, List, Tuple

from flint.configuration import get_options_from_strategy
from flint.logging import logger

# TODO: Perhaps move these to flint.naming, and can be built up
# based on rules, e.g. imager used, source finder etc.
DEFAULT_TAR_RE_PATTERNS = (
r".*MFS.*image\.fits",
r".*linmos.*",
r".*weight\.fits",
r".*yaml",
r".*\.txt",
r".*png",
r".*beam[0-9]+\.ms\.zip",
r".*beam[0-9]+\.ms",
r".*\.caltable",
r".*\.tar",
r".*\.csv",
from flint.options import (
DEFAULT_COPY_RE_PATTERNS,
DEFAULT_TAR_RE_PATTERNS,
ArchiveOptions,
)
DEFAULT_COPY_RE_PATTERNS = (r".*linmos.*fits", r".*weight\.fits", r".*png", r".*csv")


class ArchiveOptions(NamedTuple):
"""Container for options related to archiving products from flint workflows"""

tar_file_re_patterns: Collection[str] = DEFAULT_TAR_RE_PATTERNS
"""Regular-expressions to use to collect files that should be tarballed"""
copy_file_re_patterns: Collection[str] = DEFAULT_COPY_RE_PATTERNS
"""Regular-expressions used to identify files to copy into a final location (not tarred)"""

def with_options(self, **kwargs) -> ArchiveOptions:
opts = self._asdict()
opts.update(**kwargs)

return ArchiveOptions(**opts)


def resolve_glob_expressions(
Expand Down Expand Up @@ -200,6 +170,24 @@ def copy_sbid_files_archive(
return copy_out_path


def get_archive_options_from_yaml(strategy_yaml_path: Path) -> Dict[str, Any]:
"""Load the archive options from a specified strategy file
Args:
strategy_yaml_path (Path): The path to the strategy yaml file containing archive options
Returns:
Dict[str, Any]: Loaded options for ArchiveOptions
"""
archive_options = get_options_from_strategy(
strategy=strategy_yaml_path, mode="archive", round="initial"
)

logger.info(f"{archive_options=}")

return archive_options


def get_parser() -> ArgumentParser:
parser = ArgumentParser(
description="Operations around archiving. Patterns are specified as regular expressions (not globs). "
Expand All @@ -224,6 +212,12 @@ def get_parser() -> ArgumentParser:
type=str,
help="The regular expression patterns to evaluate",
)
list_parser.add_argument(
"--strategy-yaml-path",
type=Path,
default=None,
help="Path to a strategy file with a archive section. Overrides any --file-patterns. ",
)

create_parser = subparser.add_parser("create", help="Create a tarfile archive")
create_parser.add_argument(
Expand All @@ -243,29 +237,41 @@ def get_parser() -> ArgumentParser:
type=str,
help="The regular expression patterns to evaluate inside the base path directory",
)
create_parser.add_argument(
"--strategy-yaml-path",
type=Path,
default=None,
help="Path to a strategy file with a archive section. Overrides any --file-patterns. ",
)

create_parser = subparser.add_parser(
copy_parser = subparser.add_parser(
"copy", help="Copy a set of files into a output directory"
)
create_parser.add_argument(
copy_parser.add_argument(
"copy_out_path",
type=Path,
help="Path of the output folder that files will be copied into",
)
create_parser.add_argument(
copy_parser.add_argument(
"--base-path",
type=Path,
default=Path("."),
help="Base directory to perform glob expressions",
)

create_parser.add_argument(
copy_parser.add_argument(
"--copy-file-patterns",
nargs="+",
default=DEFAULT_COPY_RE_PATTERNS,
type=str,
help="The regular expression patterns to evaluate inside the base path directory",
)
copy_parser.add_argument(
"--strategy-yaml-path",
type=Path,
default=None,
help="Path to a strategy file with a archive section. Overrides any --file-patterns. ",
)

return parser

Expand All @@ -276,7 +282,12 @@ def cli() -> None:
args = parser.parse_args()

if args.mode == "list":
archive_options = ArchiveOptions(tar_file_re_patterns=args.file_patterns)
update_options: Dict[str, Any] = (
get_archive_options_from_yaml(strategy_yaml_path=args.strategy_yaml_path)
if args.strategy_yaml_path
else dict(tar_file_re_patterns=args.file_patterns)
)
archive_options = ArchiveOptions(**update_options)

files = resolve_glob_expressions(
base_path=args.base_path,
Expand All @@ -286,15 +297,25 @@ def cli() -> None:
for count, file in enumerate(sorted(files)):
logger.info(f"{count} of {len(files)}, {file}")
elif args.mode == "create":
archive_options = ArchiveOptions(tar_file_re_patterns=args.tar_file_patterns)
update_options: Dict[str, Any] = (
get_archive_options_from_yaml(strategy_yaml_path=args.strategy_yaml_path)
if args.strategy_yaml_path
else dict(tar_file_re_patterhs=args.file_patterns)
)
archive_options = ArchiveOptions(**update_options)

create_sbid_tar_archive(
tar_out_path=args.tar_out_path,
base_path=args.base_path,
archive_options=archive_options,
)
elif args.mode == "copy":
archive_options = ArchiveOptions(copy_file_re_patterns=args.copy_file_globs)
update_options: Dict[str, Any] = (
get_archive_options_from_yaml(strategy_yaml_path=args.strategy_yaml_path)
if args.strategy_yaml_path
else dict(copy_file_re_patterhs=args.copy_file_patterns)
)
archive_options = ArchiveOptions(**update_options)

copy_sbid_files_archive(
copy_out_path=args.copy_out_path,
Expand Down
8 changes: 5 additions & 3 deletions flint/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

import yaml

from flint.archive import ArchiveOptions
from flint.imager.wsclean import WSCleanOptions
from flint.logging import logger
from flint.masking import MaskingOptions
from flint.naming import add_timestamp_to_path
from flint.options import ArchiveOptions
from flint.selfcal.casa import GainCalOptions
from flint.source_finding.aegean import AegeanOptions, BANEOptions

Expand Down Expand Up @@ -220,7 +220,7 @@ def get_image_options_from_yaml(


def get_options_from_strategy(
strategy: Union[Strategy, None],
strategy: Union[Strategy, None, Path],
mode: str = "wsclean",
round: Union[str, int] = "initial",
max_round_override: bool = True,
Expand All @@ -232,7 +232,7 @@ def get_options_from_strategy(
round are used to update the defaults.
Args:
strategy (Union[Strategy,None]): A loaded instance of a strategy file. If `None` is provided then an empty dictionary is returned.
strategy (Union[Strategy,None,Path]): A loaded instance of a strategy file. If `None` is provided then an empty dictionary is returned. If `Path` attempt to load the strategy file.
mode (str, optional): Which set of options to load. Typical values are `wsclean`, `gaincal` and `masking`. Defaults to "wsclean".
round (Union[str, int], optional): Which round to load options for. May be `initial` or an `int` (which indicated a self-calibration round). Defaults to "initial".
max_round_override (bool, optional): Check whether an integer round number is recorded. If it is higher than the largest self-cal round specified, set it to the last self-cal round. If False this is not performed. Defaults to True.
Expand All @@ -247,6 +247,8 @@ def get_options_from_strategy(

if strategy is None:
return {}
elif isinstance(strategy, Path):
strategy = load_strategy_yaml(input_yaml=strategy)

# Some sanity checks
assert isinstance(
Expand Down
14 changes: 14 additions & 0 deletions flint/data/tests/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ defaults:
grow_low_snr_island: true
grow_low_snr_island_clip: 1.75
grow_low_snr_island_size: 768
archive:
tar_file_re_patterns:
- .*MFS.*image\.fits
- .*linmos.*
- .*weight\.fits
- .*yaml
- .*\.csv
- testing_for_jack.txt
copy_file_re_patterns:
- .*linmos.*fits
- .*weight\.fits
- .*png
- .*csv
- testing_for_sparrow.csv
initial:
wsclean: {}
selfcal:
Expand Down
39 changes: 38 additions & 1 deletion flint/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
set of flint processing related options.
"""

from __future__ import ( # Used for mypy/pylance to like the return type of MS.with_options
annotations,
)

from pathlib import Path
from typing import List, NamedTuple, Optional, Union
from typing import Collection, List, NamedTuple, Optional, Union


class BandpassOptions(NamedTuple):
Expand Down Expand Up @@ -108,3 +112,36 @@ class FieldOptions(NamedTuple):
"""Path that final processed products will be copied into. If None no copying of file products is performed. See ArchiveOptions. """
rename_ms: bool = False
"""Rename MSs throught rounds of imaging and self-cal instead of creating copies. This will delete data-columns throughout. """


# TODO: Perhaps move these to flint.naming, and can be built up
# based on rules, e.g. imager used, source finder etc.
DEFAULT_TAR_RE_PATTERNS = (
r".*MFS.*image\.fits",
r".*linmos.*",
r".*weight\.fits",
r".*yaml",
r".*\.txt",
r".*png",
r".*beam[0-9]+\.ms\.zip",
r".*beam[0-9]+\.ms",
r".*\.caltable",
r".*\.tar",
r".*\.csv",
)
DEFAULT_COPY_RE_PATTERNS = (r".*linmos.*fits", r".*weight\.fits", r".*png", r".*csv")


class ArchiveOptions(NamedTuple):
"""Container for options related to archiving products from flint workflows"""

tar_file_re_patterns: Collection[str] = DEFAULT_TAR_RE_PATTERNS
"""Regular-expressions to use to collect files that should be tarballed"""
copy_file_re_patterns: Collection[str] = DEFAULT_COPY_RE_PATTERNS
"""Regular-expressions used to identify files to copy into a final location (not tarred)"""

def with_options(self, **kwargs) -> ArchiveOptions:
opts = self._asdict()
opts.update(**kwargs)

return ArchiveOptions(**opts)
17 changes: 10 additions & 7 deletions flint/prefect/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@

import base64
from pathlib import Path
from typing import Any, List, Optional, TypeVar
from typing import Any, Dict, List, Optional, TypeVar
from uuid import UUID

from prefect import task
from prefect.artifacts import create_markdown_artifact

from flint.archive import (
ArchiveOptions,
copy_sbid_files_archive,
create_sbid_tar_archive,
)
from flint.archive import copy_sbid_files_archive, create_sbid_tar_archive
from flint.logging import logger
from flint.naming import add_timestamp_to_path, get_sbid_from_path
from flint.options import ArchiveOptions
from flint.summary import (
create_beam_summary,
create_field_summary,
Expand Down Expand Up @@ -77,6 +74,7 @@ def task_archive_sbid(
archive_path: Optional[Path] = None,
copy_path: Optional[Path] = None,
max_round: Optional[int] = None,
update_archive_options: Optional[Dict[str, Any]] = None,
) -> Path:
"""Create a tarbal of files, or copy files, from a processing folder.
Expand All @@ -85,6 +83,7 @@ def task_archive_sbid(
archive_path (Optional[Path], optional): Location to create and store the tar ball at. If None no tarball is created. Defaults to None.
copy_path (Optional[Path], optional): Location to copy selected files into. If None no files are copied. Defaults to None.
max_round (Optional[int], optional): The last self-calibration round peformed. If provied some files form this round are copied (assuming wsclean imaging). If None, the default file patterns in ArchiveOptions are used. Defaults to None.
update_archive_options (Optional[Dict[str, Any]], optional): Additional options to provide to ArchiveOptions. Defaults to None.
Returns:
Path: The science folder files were copied from
Expand All @@ -94,12 +93,16 @@ def task_archive_sbid(

archive_options = ArchiveOptions()

if update_archive_options:
logger.info(f"Updating archive options with {update_archive_options=}")
archive_options = archive_options.with_options(**update_archive_options)

# TODO: What should this be? Just general new regexs passed through,
# or is this fine?
if max_round:
updated_file_patterns = tuple(archive_options.tar_file_re_patterns) + (
rf".*beam[0-9]+\.round{max_round}-.*-image\.fits",
rf".*beam[0-9]+\.round{max_round}\.ms\.zip",
rf".*beam[0-9]+\.round{max_round}\.ms\.(zip|tar)",
)
archive_options = archive_options.with_options(
tar_file_re_patterns=updated_file_patterns
Expand Down
4 changes: 4 additions & 0 deletions flint/prefect/flows/continuum_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,15 @@ def process_science_fields(
task_zip_ms.map(in_item=wsclean_cmds)

if field_options.sbid_archive_path or field_options.sbid_copy_path and run_aegean:
update_archive_options = get_options_from_strategy(
strategy=strategy, mode="archive"
)
task_archive_sbid.submit(
science_folder_path=output_split_science_path,
archive_path=field_options.sbid_archive_path,
copy_path=field_options.sbid_copy_path,
max_round=field_options.rounds if field_options.rounds else None,
update_archive_options=update_archive_options,
wait_for=archive_wait_for,
)

Expand Down
Loading

0 comments on commit 0b669e1

Please sign in to comment.