Skip to content

Commit

Permalink
Create a BaseOptions class using pydantic (#184)
Browse files Browse the repository at this point in the history
* added initial BaseOptions pydantic

* added support for list/tuple/set

* fixed tests added _asdict

* attempt to handle optional[list]

* fixed field processing

* added docstrings

* fixed options regression

* added some tests

* using FieldOptions with BaseOptions

* added optional container path

* rejig cli around beam masks

* rejig of beam masks

* corrected argument name# Please enter the commit message for your changes. Lines starting

* added missing argument

* tweaks to archive cli

* added a tarball verification

* updated changelog / added error to raise

* added logs to verify tarballs

* added a few tests

* added more tests/fixed tests

* removed type ignore

* fixed use_beam_mask flag default

* added test for use-beam-mask / type ignore

* Added expect type hint

* added test for bandpass flow cli

* removed old test

---------

Co-authored-by: tgalvin <[email protected]>
  • Loading branch information
tjgalvin and tgalvin authored Oct 25, 2024
1 parent e1b8bfa commit c86c373
Show file tree
Hide file tree
Showing 16 changed files with 521 additions and 508 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
# dev

- added `wrapper_options_from_strategy` decorator helper function
- Created `BaseOptions` from a `pydantic.BaseModel` class.
- Added functions to
- create `ArgumentParser` options from a `BaseOptions` class
- load arguments into an `BaseOptions` class from a `ArgumentParser`
- starting to move some classes over to this approaches (including some CLIs)
- Added a verify tarball function to verify archives

# 0.2.7

Expand Down
96 changes: 69 additions & 27 deletions flint/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

import re
import shutil
import subprocess
import shlex
import tarfile
from argparse import ArgumentParser
from pathlib import Path
from typing import Any, Collection, Dict, List, Tuple

from flint.configuration import get_options_from_strategy
from flint.exceptions import TarArchiveError
from flint.logging import logger
from flint.options import (
DEFAULT_COPY_RE_PATTERNS,
DEFAULT_TAR_RE_PATTERNS,
ArchiveOptions,
add_options_to_parser,
)


Expand Down Expand Up @@ -89,13 +91,46 @@ def copy_files_into(copy_out_path: Path, files_to_copy: Collection[Path]) -> Pat
return copy_out_path


def verify_tarball(
tarball: Path,
) -> bool:
"""Verify that a tarball was created properly by examining its
table. Internally this calls ``tar`` through a subprocess call.
Hence, ``tar`` needs to be available on the system PATH.
Args:
tarball (Path): The tarball to examine
Returns:
bool: True if the ``tar``s exit code is 0, False otherwise
"""
tarball = Path(tarball) # trust nothing
assert (
tarball.exists() and tarball.is_file()
), f"{tarball} is not a file or does not exist"
assert tarball.suffix == ".tar", f"{tarball=} appears to not have a .tar extension"

cmd = f"tar -tvf {str(tarball)}"
logger.info(f"Verifying {tarball=}")
popen = subprocess.Popen(shlex.split(cmd), stderr=subprocess.PIPE)
with popen.stderr: # type: ignore
for line in iter(popen.stderr.readline, b""): # type: ignore
logger.error(line.decode().strip())
exitcode = popen.wait()

return exitcode == 0


# TODO: Add a clobber option
def tar_files_into(tar_out_path: Path, files_to_tar: Collection[Path]) -> Path:
def tar_files_into(
tar_out_path: Path, files_to_tar: Collection[Path], verify: bool = True
) -> Path:
"""Create a tar file given a desired output path and list of files to tar.
Args:
tar_out_path (Path): The output path of the tarball. The parent directory will be created if necessary.
files_to_tar (Collection[Path]): All the files to tarball up
verify (bool, optional): Verify that the tarball was correctly formed. Defaults to True.
Raises:
FileExistsError: The path of the tarball created
Expand All @@ -121,6 +156,14 @@ def tar_files_into(tar_out_path: Path, files_to_tar: Collection[Path]) -> Path:
tar.add(file, arcname=file.name)

logger.info(f"Created {tar_out_path}")

if verify:
tar_success = verify_tarball(tarball=tar_out_path)
if not tar_success:
raise TarArchiveError(f"Failed to verify {tar_out_path=}")

logger.info(f"{tar_out_path=} appears to be correctly formed")

return tar_out_path


Expand Down Expand Up @@ -200,28 +243,30 @@ def get_parser() -> ArgumentParser:
dest="mode", help="Operation mode of flint_archive"
)

list_parser = subparser.add_parser("list")
list_parser = subparser.add_parser(
"list", help="List the files that would be copied"
)
list_parser.add_argument(
"--base-path",
type=Path,
default=Path("."),
help="Base directory to perform glob expressions",
)

list_parser.add_argument(
"--file-patterns",
nargs="+",
default=DEFAULT_TAR_RE_PATTERNS,
type=str,
help="The regular expression patterns to evaluate",
)
list_parser.add_argument(
"--strategy-yaml-path",
type=Path,
default=None,
help="Path to a strategy file with a archive section. Overrides any --file-patterns. ",
)

list_parser.add_argument(
"--mode",
choices=("create", "copy"),
default="copy",
help="Which set of RE patterns to present, those for the tarball (create) or those for copy",
)
list_parser = add_options_to_parser(
parser=list_parser, options_class=ArchiveOptions
)
create_parser = subparser.add_parser("create", help="Create a tarfile archive")
create_parser.add_argument(
"tar_out_path", type=Path, help="Path of the output tar file to be created"
Expand All @@ -233,12 +278,8 @@ def get_parser() -> ArgumentParser:
help="Base directory to perform glob expressions",
)

create_parser.add_argument(
"--tar-file-patterns",
nargs="+",
default=DEFAULT_TAR_RE_PATTERNS,
type=str,
help="The regular expression patterns to evaluate inside the base path directory",
create_parser = add_options_to_parser(
parser=create_parser, options_class=ArchiveOptions
)
create_parser.add_argument(
"--strategy-yaml-path",
Expand All @@ -262,12 +303,8 @@ def get_parser() -> ArgumentParser:
help="Base directory to perform glob expressions",
)

copy_parser.add_argument(
"--copy-file-patterns",
nargs="+",
default=DEFAULT_COPY_RE_PATTERNS,
type=str,
help="The regular expression patterns to evaluate inside the base path directory",
copy_parser = add_options_to_parser(
parser=copy_parser, options_class=ArchiveOptions
)
copy_parser.add_argument(
"--strategy-yaml-path",
Expand All @@ -294,11 +331,16 @@ def cli() -> None:

files = resolve_glob_expressions(
base_path=args.base_path,
file_re_patterns=archive_options.tar_file_re_patterns,
file_re_patterns=(
archive_options.tar_file_re_patterns
if args.mode == "create"
else archive_options.copy_file_re_patterns
),
)

for count, file in enumerate(sorted(files)):
logger.info(f"{count} of {len(files)}, {file}")
logger.info(f"{len(files)} for mode={args.mode}")

elif args.mode == "create":
update_options_create: Dict[str, Any] = (
get_archive_options_from_yaml(strategy_yaml_path=args.strategy_yaml_path)
Expand Down
3 changes: 2 additions & 1 deletion flint/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Callable, Dict, ParamSpec, Optional, TypeVar, Union

from click import MissingParameter
from pydantic import ValidationError
import yaml

from flint.imager.wsclean import WSCleanOptions
Expand Down Expand Up @@ -423,7 +424,7 @@ def verify_configuration(input_strategy: Strategy, raise_on_error: bool = True)
)
try:
_ = MODE_OPTIONS_MAPPING[key](**options)
except TypeError as typeerror:
except (ValidationError, TypeError) as typeerror:
errors.append(
f"{key} mode in initial round incorrectly formed. {typeerror} "
)
Expand Down
4 changes: 4 additions & 0 deletions flint/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ class CleanDivergenceError(Exception):
"""Raised if it is detected that cleaning has diverged."""

pass


class TarArchiveError(Exception):
"""Raised it the flint tarball is not created successfullty"""
36 changes: 19 additions & 17 deletions flint/imager/wsclean.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@
from flint.logging import logger
from flint.ms import MS
from flint.naming import create_image_cube_name, create_imaging_name_prefix
from flint.options import options_to_dict
from flint.options import (
options_to_dict,
BaseOptions,
add_options_to_parser,
create_options_from_parser,
)
from flint.sclient import run_singularity_command
from flint.utils import (
get_environment_variable,
Expand All @@ -57,7 +62,7 @@ class ImageSet(NamedTuple):
"""Residual images."""


class WSCleanOptions(NamedTuple):
class WSCleanOptions(BaseOptions):
"""A basic container to handle WSClean options. These attributes should
conform to the same option name in the calling signature of wsclean
Expand Down Expand Up @@ -98,7 +103,7 @@ class WSCleanOptions(NamedTuple):
"""Enable multiscale deconvolution"""
multiscale_scale_bias: float = 0.75
"""Multiscale bias term"""
multiscale_scales: Optional[Collection[int]] = (
multiscale_scales: Tuple[int, ...] = (
0,
15,
25,
Expand Down Expand Up @@ -150,13 +155,6 @@ class WSCleanOptions(NamedTuple):
pol: str = "i"
"""The polarisation to be imaged"""

def with_options(self, **kwargs) -> WSCleanOptions:
"""Return a new instance of WSCleanOptions with updated components"""
_dict = self._asdict()
_dict.update(**kwargs)

return WSCleanOptions(**_dict)


class WSCleanCommand(NamedTuple):
"""Simple container for a wsclean command."""
Expand Down Expand Up @@ -834,11 +832,8 @@ def get_parser() -> ArgumentParser:
default=None,
help="Path to a singularity container with wsclean installed. ",
)
wsclean_parser.add_argument(
"--data-column",
type=str,
default="CORRECTED_DATA",
help="The column name to image. ",
wsclean_parser = add_options_to_parser(
parser=wsclean_parser, options_class=WSCleanOptions
)

return parser
Expand All @@ -856,8 +851,15 @@ def cli() -> None:
logger.setLevel(logging.DEBUG)

ms = MS(path=args.ms, column=args.data_column)

wsclean_imager(ms=ms, wsclean_container=args.wsclean_container)
wsclean_options: WSCleanOptions = create_options_from_parser(
parser_namespace=args,
options_class=WSCleanOptions, # type: ignore
)
wsclean_imager(
ms=ms,
wsclean_container=args.wsclean_container,
update_wsclean_options=wsclean_options._asdict(),
)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit c86c373

Please sign in to comment.