Skip to content

Commit

Permalink
Merge pull request #80 from tjgalvin/template
Browse files Browse the repository at this point in the history
Added a template approach to specify imaging and self-calibration settings
  • Loading branch information
tjgalvin authored Apr 11, 2024
2 parents d541eb9 + 7a00b5d commit a4b64a2
Show file tree
Hide file tree
Showing 5 changed files with 546 additions and 40 deletions.
235 changes: 209 additions & 26 deletions flint/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,33 @@

from argparse import ArgumentParser
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union

import yaml

from flint.imager.wsclean import WSCleanOptions
from flint.logging import logger
from flint.masking import MaskingOptions
from flint.selfcal.casa import GainCalOptions

KNOWN_HEADERS = ("defaults", "initial", "selfcal", "version")
FORMAT_VERSION = 0.1
MODE_OPTIONS_MAPPING = {
"wsclean": WSCleanOptions,
"gaincal": GainCalOptions,
"masking": MaskingOptions,
}


# A simple representation to pass around. Will help the type
# analysis and future pirates be clear with their mutinous
# intentions
class Strategy(dict):
"""Base representation for handling a loaded flint
strategy"""

pass


def get_selfcal_options_from_yaml(input_yaml: Optional[Path] = None) -> Dict:
"""Stub to represent interaction with a configurationf ile
Expand Down Expand Up @@ -154,7 +173,147 @@ def get_image_options_from_yaml(
}


def load_yaml(input_yaml: Path) -> Any:
def get_options_from_strategy(
strategy: Union[Strategy, None],
mode: str = "wsclean",
round: Union[str, int] = "initial",
max_round_override: bool = True,
) -> Dict[Any, Any]:
"""Extract a set of options from a strategy file to use in a pipeline
run. If the mode exists in the default section, these are used as a base.
If the mode exists and a round is specified, the options listed in the
round are used to update the defaults.
Args:
strategy (Union[Strategy,None]): A loaded instance of a strategy file. If `None` is provided then an empty dictionary is returned.
mode (str, optional): Which set of options to load. Typical values are `wsclean`, `gaincal` and `masking`. Defaults to "wsclean".
round (Union[str, int], optional): Which round to load options for. May be `initial` or an `int` (which indicated a self-calibration round). Defaults to "initial".
max_round_override (bool, optional): Check whether an integer round number is recorded. If it is higher than the largest self-cal round specified, set it to the last self-cal round. If False this is not performed. Defaults to True.
Raises:
ValueError: An unrecongised value for `round`.
AssertError: An unrecongised value for `round`.
Returns:
Dict[Any, Any]: Options specific to the requested set
"""

if strategy is None:
return {}

# Some sanity checks
assert isinstance(
strategy, (Strategy, dict)
), f"Unknown input strategy type {type(strategy)}"
assert round == "initial" or isinstance(
round, int
), f"{round=} not a known value or type. "

# Override the round if requested
if isinstance(round, int) and max_round_override and "selfcal" in strategy.keys():
round = min(round, max(strategy["selfcal"].keys()))

# step one, get the defaults
options = (
dict(**strategy["defaults"][mode])
if mode in strategy["defaults"].keys()
else {}
)
logger.debug(f"Defaults for {mode=}, {options=}")

# Now get the updates
if round == "initial":
# separate function to avoid a missing mode from raising valu error
if mode in strategy["initial"].keys():
update_options = dict(**strategy["initial"][mode])
logger.debug(f"Updating options with {update_options=}")
options.update(update_options)
elif isinstance(round, int):
# separate function to avoid a missing mode from raising valu error
if (
round in strategy["selfcal"].keys()
and mode in strategy["selfcal"][round].keys()
):
update_options = dict(**strategy["selfcal"][round][mode])
logger.debug(f"Updating options with {update_options=}")
options.update(update_options)
else:
raise ValueError(f"{round=} not recognised.")

return options


def verify_configuration(input_strategy: Strategy, raise_on_error: bool = True) -> bool:
"""Perform basic checks on the configuration file
Args:
input_strategy (Strategy): The loaded configuraiton file structure
raise_on_error (bool, optional): Whether to raise an error should an issue in thew config file be found. Defaults to True.
Raises:
ValueError: Whether structure is valid
Returns:
bool: Config file is not valid. Raised only if `raise_on_error` is `True`
"""

errors = []

if "defaults" not in input_strategy.keys():
errors.append("Default section missing from input configuration. ")

# make sure the main components of the file are there
unknown_headers = [
header for header in input_strategy.keys() if header not in KNOWN_HEADERS
]
if unknown_headers:
errors.append(f"{unknown_headers=} found. Supported headers: {KNOWN_HEADERS}")

if "initial" not in input_strategy.keys():
errors.append("No initial imaging round parameters")

for key in input_strategy["initial"].keys():
options = get_options_from_strategy(
strategy=input_strategy, mode=key, round="initial"
)
try:
_ = MODE_OPTIONS_MAPPING[key](**options)
except TypeError as typeerror:
errors.append(
f"{key} mode in initial round incorrectly formed. {typeerror} "
)

if "selfcal" in input_strategy.keys():
round_keys = input_strategy["selfcal"].keys()

if not all([isinstance(i, int) for i in round_keys]):
errors.append("The keys into the self-calibration should be ints. ")

for round in round_keys:
for mode in input_strategy["selfcal"][round].keys():
options = get_options_from_strategy(
strategy=input_strategy, mode=mode, round=round
)
try:
_ = MODE_OPTIONS_MAPPING[mode](**options)
except TypeError as typeerror:
errors.append(
f"{key} mode in initial round incorrectly formed. {typeerror} "
)

valid_config = len(errors) == 0
if not valid_config:
for error in errors:
logger.warning(error)

if raise_on_error:
raise ValueError("Configuration file not valid. ")

return valid_config


def load_strategy_yaml(input_yaml: Path, verify: bool = True) -> Strategy:
"""Load in a flint based configuration file, which
will be used to form the strategy for imaging of
a field.
Expand All @@ -165,31 +324,40 @@ def load_yaml(input_yaml: Path) -> Any:
Args:
input_yaml (Path): The imaging strategy to use
verify (bool, optional): Apply some basic checks to ensure a correctly formed strategy. Defaults to True.
Returns:
Any: The parameters of the imaging and self-calibration to use.
Strategy: The parameters of the imaging and self-calibration to use.
"""

logger.info(f"Loading {input_yaml} file. ")

with open(input_yaml, "r") as in_file:
input_strategy = yaml.load(in_file, Loader=yaml.Loader)
input_strategy = Strategy(yaml.load(in_file, Loader=yaml.Loader))

logger.info("Loaded strategy is: ")
if verify:
verify_configuration(input_strategy=input_strategy)

init_wsclean = WSCleanOptions(**input_strategy["initial"])
logger.info(f"The initial wsclean options:\n {init_wsclean}")
return input_strategy

if "selfcal" in input_strategy.keys():
for selfcal_round, selfcal in enumerate(input_strategy["selfcal"]):
wsclean = WSCleanOptions(**selfcal["imager"])
casa = GainCalOptions(**selfcal["gaincal"])

logger.info(f"Self-calibration round {selfcal_round}: ")
logger.info(f"wsclean options: {wsclean}")
logger.info(f"casa gaincal options: {casa}")
def write_strategy_to_yaml(strategy: Strategy, output_path: Path) -> Path:
"""Write the contents of a current strategy to a yaml file
return input_strategy
Args:
strategy (Strategy): The strategy to write out
output_path (Path): Where to write the output YAML file to
Returns:
Path: The path the output YAML file was written to
"""

logger.info(f"Writing stategy to {output_path=}")

with open(output_path, "w") as out_file:
yaml.dump(data=strategy, stream=out_file, sort_keys=False)

return output_path


def create_default_yaml(
Expand All @@ -210,24 +378,31 @@ def create_default_yaml(
logger.info("Generating a default stategy. ")
strategy: Dict[Any, Any] = {}

initial_wsclean = WSCleanOptions()
strategy["initial"] = initial_wsclean._asdict()
strategy["version"] = FORMAT_VERSION

strategy["defaults"] = {
"wsclean": WSCleanOptions()._asdict(),
"gaincal": GainCalOptions()._asdict(),
"masking": MaskingOptions()._asdict(),
}

strategy["initial"] = {"wsclean": {}}

if selfcal_rounds:
logger.info(f"Creating {selfcal_rounds} self-calibration rounds. ")
selfcal = []
selfcal = {}
for round in range(1, selfcal_rounds + 1):
selfcal.append(
{
"imager": WSCleanOptions()._asdict(),
"gaincal": GainCalOptions()._asdict(),
}
)
selfcal[round] = {
"wsclean": {},
"gaincal": {},
"masking": {},
}

strategy["selfcal"] = selfcal

with open(output_yaml, "w") as out_file:
logger.info(f"Writing {output_yaml}.")
yaml.dump(data=strategy, stream=out_file)
yaml.dump(data=strategy, stream=out_file, sort_keys=False)

return output_yaml

Expand Down Expand Up @@ -261,6 +436,10 @@ def get_parser() -> ArgumentParser:
type=Path,
help="Path to a strategy yaml file to load and inspect. ",
)
verify_parser = subparser.add_parser(
"verify", help="Verify a yaml file is correct, as far as we can tell. "
)
verify_parser.add_argument("input_yaml", type=Path, help="Path to a strategy file")

return parser

Expand All @@ -275,7 +454,11 @@ def cli() -> None:
output_yaml=args.output_yaml, selfcal_rounds=args.selfcal_rounds
)
elif args.mode == "load":
load_yaml(input_yaml=args.input_yaml)
load_strategy_yaml(input_yaml=args.input_yaml)
elif args.mode == "verify":
input_strategy = load_strategy_yaml(input_yaml=args.input_yaml)
if verify_configuration(input_strategy=input_strategy):
logger.info(f"{args.input_yaml} appears valid")
else:
logger.error(f"{args.mode=} is not set or not known. Check --help. ")

Expand Down
81 changes: 81 additions & 0 deletions flint/data/tests/test_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
version: 0.1
defaults:
wsclean:
abs_mem: 100
local_rms_window: 65
size: 10128
local_rms: true
force_mask_rounds: 10
auto_mask: 3.5
auto_threshold: 0.5
threshold: null
channels_out: 4
mgain: 0.7
nmiter: 15
niter: 750000
multiscale: true
multiscale_scale_bias: 0.75
multiscale_scales:
- 0
- 15
- 25
- 50
- 75
- 100
- 250
- 400
fit_spectral_pol: 2
weight: briggs -0.5
data_column: CORRECTED_DATA
scale: 2.5asec
gridder: wgridder
nwlayers: null
wgridder_accuracy: 0.0001
join_channels: true
minuv_l: null
minuvw_m: null
maxw: null
no_update_model_required: false
no_small_inversion: false
name: null
beam_fitting_size: 1.25
fits_mask: null
deconvolution_channels: null
gaincal:
solint: 60s
calmode: p
round: 0
minsnr: 0.0
uvrange: '>200m'
selectdata: true
gaintype: G
nspw: 1
masking:
base_snr_clip: 4
flood_fill: true
flood_fill_positive_seed_clip: 4.5
flood_fill_positive_flood_clip: 1.5
suppress_artefacts: true
suppress_artefacts_negative_seed_clip: 5
suppress_artefacts_guard_negative_dilation: 40
grow_low_snr_islands: true
grow_low_snr_clip: 1.75
grow_low_snr_island_size: 768
initial:
wsclean: {}
selfcal:
1:
wsclean:
data_column: "EXAMPLE"
gaincal: {}
masking: {}
2:
wsclean:
multiscale: false
gaincal: {}
masking: {}
3:
wsclean:
data_column: "TheLastRoundIs3"
gaincal: {}
masking: {}
2 changes: 2 additions & 0 deletions flint/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,5 @@ class FieldOptions(NamedTuple):
"""Construct beam masks from MFS images to use for the next round of imaging. """
use_beam_masks_from: int = 2
"""If `use_beam_masks` is True, start using them from this round of self-calibration"""
imaging_strategy: Optional[Path] = None
"""Path to a FLINT imaging yaml file that contains settings to use throughout imaging"""
Loading

0 comments on commit a4b64a2

Please sign in to comment.