Skip to content

Commit

Permalink
Merge pull request #66 from AlecThomson/submit
Browse files Browse the repository at this point in the history
Submit
  • Loading branch information
AlecThomson authored Apr 30, 2024
2 parents 3a3efde + 4843be0 commit 1af1389
Show file tree
Hide file tree
Showing 22 changed files with 275 additions and 228 deletions.
32 changes: 16 additions & 16 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 24.4.0
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.13.2
hooks:
- id: isort
args: ["--profile=black"]
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.1
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
# - repo: https://github.com/pre-commit/pre-commit-hooks
# rev: v4.6.0
# hooks:
# - id: trailing-whitespace
# - id: end-of-file-fixer
# - id: check-yaml
# - id: check-added-large-files

ci:
autofix_commit_msg: |
Expand Down
1 change: 1 addition & 0 deletions arrakis/cleanup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""DANGER ZONE: Purge directories of un-needed FITS files."""

import argparse
import logging
import shutil
Expand Down
4 changes: 2 additions & 2 deletions arrakis/configs/petrichor.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Set up for Petrichor
cluster_class: "dask_jobqueue.SLURMCluster"
cluster_kwargs:
cores: 16
cores: 20
processes: 1
name: 'spice-worker'
memory: "128GiB"
memory: "160GiB"
account: 'OD-217087'
#queue: 'workq'
walltime: '0-12:00:00'
Expand Down
10 changes: 2 additions & 8 deletions arrakis/cutout.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
#!/usr/bin/env python
"""Produce cutouts from RACS cubes"""

import argparse
import logging
import os
import pickle
import warnings
from concurrent.futures import ThreadPoolExecutor
from glob import glob
from pathlib import Path
from pprint import pformat
from shutil import copyfile
from typing import Dict, List
from typing import List, Optional, Set, TypeVar
from typing import NamedTuple as Struct
from typing import Optional, Set, TypeVar, Union

import astropy.units as u
import numpy as np
Expand All @@ -36,7 +33,6 @@
validate_sbid_field_pair,
)
from arrakis.utils.fitsutils import fix_header
from arrakis.utils.io import try_mkdir
from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser

iers.conf.auto_download = False
Expand Down Expand Up @@ -79,7 +75,6 @@ def cutout_weight(
beam_num: int,
dryrun=False,
) -> pymongo.UpdateOne:

# Update database
myquery = {"Source_ID": source_id}

Expand Down Expand Up @@ -337,7 +332,6 @@ def big_cutout(
password: Optional[str] = None,
limit: Optional[int] = None,
) -> List[pymongo.UpdateOne]:

wild = f"image.restored.{stoke.lower()}*contcube*beam{beam_num:02}.conv.fits"
images = list(datadir.glob(wild))
if len(images) == 0:
Expand Down
100 changes: 49 additions & 51 deletions arrakis/frion.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
#!/usr/bin/env python3
"""Correct for the ionosphere in parallel"""

import argparse
import logging
import os
from glob import glob
from pathlib import Path
from pprint import pformat
from shutil import copyfile
from typing import Callable, Dict, List
from typing import Callable, Dict, List, Optional, Union
from typing import NamedTuple as Struct
from typing import Optional, Union

import astropy.units as u
import numpy as np
import pymongo
from astropy.time import Time, TimeDelta
from FRion import correct, predict
from prefect import flow, task, unmapped
from prefect import flow, task
from tqdm.auto import tqdm

from arrakis.logger import UltimateHelpFormatter, logger
from arrakis.logger import TqdmToLogger, UltimateHelpFormatter, logger
from arrakis.utils.database import (
get_db,
get_field_db,
test_db,
validate_sbid_field_pair,
)
from arrakis.utils.fitsutils import getfreq
from arrakis.utils.io import try_mkdir
from arrakis.utils.pipeline import generic_parser, logo_str
from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser

logger.setLevel(logging.INFO)
TQDM_OUT = TqdmToLogger(logger, level=logging.INFO)


class Prediction(Struct):
Expand Down Expand Up @@ -89,8 +88,8 @@ def predict_worker(
start_time: Time,
end_time: Time,
freq: np.ndarray,
cutdir: str,
plotdir: str,
cutdir: Path,
plotdir: Path,
server: str = "ftp://ftp.aiub.unibe.ch/CODE/",
prefix: str = "",
formatter: Optional[Union[str, Callable]] = None,
Expand All @@ -114,8 +113,8 @@ def predict_worker(
"""
logger.setLevel(logging.INFO)

ifile = os.path.join(cutdir, beam["beams"][field]["i_file"])
i_dir = os.path.dirname(ifile)
ifile: Path = cutdir / beam["beams"][field]["i_file"]
i_dir = ifile.parent
iname = island["Source_ID"]
ra = island["RA"]
dec = island["Dec"]
Expand All @@ -137,7 +136,7 @@ def predict_worker(
ra=ra,
dec=dec,
timestep=300.0,
ionexPath=os.path.join(os.path.dirname(cutdir), "IONEXdata"),
ionexPath=cutdir.parent / "IONEXdata",
server=server,
proxy_server=proxy_server,
use_proxy=True, # Always use proxy - forces urllib
Expand All @@ -146,23 +145,10 @@ def predict_worker(
pre_download=pre_download,
**proxy_args,
)
logger.info(f"Predicted modulation for {iname}.")
predict_file = os.path.join(i_dir, f"{iname}_ion.txt")
predict.write_modulation(freq_array=freq, theta=theta, filename=predict_file)

plot_file = os.path.join(i_dir, f"{iname}_ion.png")
try:
predict.generate_plots(
times, RMs, theta, freq, position=[ra, dec], savename=plot_file
)
except Exception as e:
logger.error(f"Failed to generate plot: {e}")

plot_files = glob(os.path.join(i_dir, "*ion.png"))
logger.info(f"Plotting files: {plot_files=}")
for src in plot_files:
base = os.path.basename(src)
dst = os.path.join(plotdir, base)
copyfile(src, dst)
logger.info(f"Prediction file: {predict_file}")

myquery = {"Source_ID": iname}

Expand Down Expand Up @@ -324,29 +310,40 @@ def main(
formatter=ionex_formatter,
pre_download=ionex_predownload,
)
predictions = predict_worker.map(
island=islands,
field=unmapped(field),
beam=beams_cor,
start_time=unmapped(start_time),
end_time=unmapped(end_time),
freq=unmapped(freq.to(u.Hz).value),
cutdir=unmapped(cutdir),
plotdir=unmapped(plotdir),
server=unmapped(ionex_server),
prefix=unmapped(ionex_prefix),
proxy_server=unmapped(ionex_proxy_server),
formatter=unmapped(ionex_formatter),
pre_download=unmapped(ionex_predownload),
)

corrections = correct_worker.map(
beam=beams_cor,
outdir=unmapped(cutdir),
field=unmapped(field),
prediction=predictions,
island=islands,
)
predictions = []
corrections = []
assert len(islands) == len(beams_cor), "Islands and beams must be the same length"
for island, beam in tqdm(
zip(islands, beams_cor),
desc="Submitting tasks",
file=TQDM_OUT,
total=len(islands),
):
prediction = predict_worker.submit(
island=island,
field=field,
beam=beam,
start_time=start_time,
end_time=end_time,
freq=freq.to(u.Hz).value,
cutdir=cutdir,
plotdir=plotdir,
server=ionex_server,
prefix=ionex_prefix,
proxy_server=ionex_proxy_server,
formatter=ionex_formatter,
pre_download=ionex_predownload,
)
predictions.append(prediction)
correction = correct_worker.submit(
beam=beam,
outdir=cutdir,
field=field,
prediction=prediction,
island=island,
)
corrections.append(correction)

updates_arrays = [p.result().update for p in predictions]
updates = [c.result() for c in corrections]
Expand Down Expand Up @@ -426,9 +423,10 @@ def cli():
warnings.simplefilter("ignore", category=RuntimeWarning)

gen_parser = generic_parser(parent_parser=True)
work_parser = workdir_arg_parser(parent_parser=True)
f_parser = frion_parser(parent_parser=True)
parser = argparse.ArgumentParser(
parents=[gen_parser, f_parser],
parents=[gen_parser, work_parser, f_parser],
formatter_class=UltimateHelpFormatter,
description=f_parser.description,
)
Expand Down
8 changes: 4 additions & 4 deletions arrakis/imager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""Arrkis imager"""

import argparse
import hashlib
import logging
Expand All @@ -9,9 +10,8 @@
from glob import glob
from pathlib import Path
from subprocess import CalledProcessError
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import NamedTuple as Struct
from typing import Optional, Tuple, Union

import numpy as np
from astropy.io import fits
Expand Down Expand Up @@ -692,8 +692,8 @@ def main(
logger.info(f"Searching {msdir} for MS matching {ms_glob_pattern}.")
mslist = sorted(msdir.glob(ms_glob_pattern))

assert (len(mslist) > 0) & (
len(mslist) == num_beams
assert (
(len(mslist) > 0) & (len(mslist) == num_beams)
), f"Incorrect number of MS files found: {len(mslist)} / {num_beams} - glob pattern: {ms_glob_pattern}"

logger.info(f"Will image {len(mslist)} MS files in {msdir} to {out_dir}")
Expand Down
1 change: 1 addition & 0 deletions arrakis/init_database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""Create the Arrakis database"""

import json
import logging
import time
Expand Down
Loading

0 comments on commit 1af1389

Please sign in to comment.