Skip to content

Commit

Permalink
Merge pull request #58 from AlecThomson/tempdir
Browse files Browse the repository at this point in the history
Tempdir
  • Loading branch information
AlecThomson authored Mar 27, 2024
2 parents 8d77477 + 1f9d774 commit a55e4cb
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 15 deletions.
4 changes: 2 additions & 2 deletions arrakis/configs/petrichor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ cluster_kwargs:
local_directory: $LOCALDIR
silence_logs: 'info'
adapt_kwargs:
minimum: 1
maximum: 36
minimum_jobs: 1
maximum_jobs: 36
wait_count: 20
target_duration: "300s"
interval: "30s"
66 changes: 54 additions & 12 deletions arrakis/imager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import pickle
import shutil
from glob import glob
from pathlib import Path
from subprocess import CalledProcessError
Expand All @@ -27,6 +28,7 @@
from tqdm.auto import tqdm

from arrakis.logger import TqdmToLogger, logger
from arrakis.utils.io import parse_env_path
from arrakis.utils.msutils import (
beam_from_ms,
field_idx_from_ms,
Expand Down Expand Up @@ -128,7 +130,8 @@ def image_beam(
ms: Path,
field_idx: int,
out_dir: Path,
prefix: str,
temp_dir: Path,
prefix: Path,
simage: Path,
pols: str = "IQU",
nchan: int = 36,
Expand Down Expand Up @@ -157,18 +160,31 @@ def image_beam(
data_column: str = "CORRECTED_DATA",
no_mf_weighting: bool = False,
no_update_model_required: bool = True,
beam_fitting_size: Optional[float] = 1.25,
) -> ImageSet:
"""Image a single beam"""
logger = get_run_logger()
# Evaluate the temp directory if a ENV variable is used
temp_dir = parse_env_path(temp_dir)
if out_dir != temp_dir:
# Copy the MS to the temp directory
ms_temp = temp_dir / ms.name
logger.info(f"Copying {ms} to {ms_temp}")
ms_temp = ms_temp.resolve(strict=False)
shutil.copytree(ms, ms_temp)
ms = ms_temp
# Update the prefix
prefix = temp_dir / prefix.name

commands = []
# Do any I cleaning separately
do_stokes_I = "I" in pols
if do_stokes_I:
command = wsclean(
mslist=[ms.resolve(strict=True).as_posix()],
temp_dir=temp_dir.resolve(strict=True).as_posix(),
use_mpi=False,
name=prefix,
name=prefix.resolve().as_posix(),
pol="I",
verbose=True,
channels_out=nchan,
Expand Down Expand Up @@ -199,6 +215,7 @@ def image_beam(
data_column=data_column,
no_mf_weighting=no_mf_weighting,
no_update_model_required=no_update_model_required,
beam_fitting_size=beam_fitting_size,
)
commands.append(command)
pols = pols.replace("I", "")
Expand All @@ -220,8 +237,9 @@ def image_beam(

command = wsclean(
mslist=[ms.resolve(strict=True).as_posix()],
temp_dir=temp_dir.resolve(strict=True).as_posix(),
use_mpi=False,
name=prefix,
name=prefix.resolve().as_posix(),
pol=pols,
verbose=True,
channels_out=nchan,
Expand All @@ -247,11 +265,12 @@ def image_beam(
nmiter=nmiter,
local_rms=local_rms,
local_rms_window=local_rms_window,
multiscale=multiscale,
multiscale=multiscale if not squared_channel_joining else False,
multiscale_scale_bias=multiscale_scale_bias,
data_column=data_column,
no_mf_weighting=no_mf_weighting,
no_update_model_required=no_update_model_required,
beam_fitting_size=beam_fitting_size,
)
commands.append(command)

Expand Down Expand Up @@ -282,14 +301,26 @@ def image_beam(
logger.error(f"{e=}")
raise e

if out_dir != temp_dir:
# Copy the images to the output directory
logger.info(f"Copying images to {out_dir}")
all_fits_files = list(temp_dir.glob(f"{prefix.name}*.fits"))
for fits_file in tqdm(all_fits_files, desc="Copying images", file=TQDM_OUT):
shutil.copy(fits_file, out_dir)

# Update the prefix
prefix = out_dir / prefix.name

prefix_str = prefix.resolve().as_posix()

# Check rms of image to check for divergence
if do_stokes_I:
pols += "I"
for pol in pols:
mfs_image = (
f"{prefix}-MFS-image.fits"
f"{prefix_str}-MFS-image.fits"
if pol == "I"
else f"{prefix}-MFS-{pol}-image.fits"
else f"{prefix_str}-MFS-{pol}-image.fits"
)
rms = mad_std(fits.getdata(mfs_image), ignore_nan=True)
if rms > 1:
Expand All @@ -303,9 +334,9 @@ def image_beam(
aux_lists = {}
for pol in pols:
imglob = (
f"{prefix}-*[0-9]-image.fits"
f"{prefix_str}-*[0-9]-image.fits"
if pol == "I"
else f"{prefix}-*[0-9]-{pol}-image.fits"
else f"{prefix_str}-*[0-9]-{pol}-image.fits"
)
image_list = sorted(glob(imglob))
image_lists[pol] = image_list
Expand All @@ -314,17 +345,17 @@ def image_beam(

for aux in ["model", "psf", "residual", "dirty"]:
aux_list = (
sorted(glob(f"{prefix}-*[0-9]-{aux}.fits"))
sorted(glob(f"{prefix_str}-*[0-9]-{aux}.fits"))
if pol == "I" or aux == "psf"
else sorted(glob(f"{prefix}-*[0-9]-{pol}-{aux}.fits"))
else sorted(glob(f"{prefix_str}-*[0-9]-{pol}-{aux}.fits"))
)
aux_lists[(pol, aux)] = aux_list

logger.info(f"Found {len(aux_list)} images for {pol=} {aux=} {ms}.")

logger.info("Constructing ImageSet")
image_set = ImageSet(
ms=ms, prefix=prefix, image_lists=image_lists, aux_lists=aux_lists
ms=ms, prefix=prefix_str, image_lists=image_lists, aux_lists=aux_lists
)

logger.debug(f"{image_set=}")
Expand Down Expand Up @@ -572,6 +603,7 @@ def fix_ms_askap_corrs(ms: Path, *args, **kwargs) -> Path:
def main(
msdir: Path,
out_dir: Path,
temp_dir: Optional[Path] = None,
cutoff: Optional[float] = None,
robust: float = -0.5,
pols: str = "IQU",
Expand Down Expand Up @@ -612,6 +644,9 @@ def main(

logger.info(f"Will image {len(mslist)} MS files in {msdir} to {out_dir}")
cleans = []
if temp_dir is None:
temp_dir = out_dir
logger.info(f"Using {temp_dir} as temp directory")

# Do this in serial since CASA gets upset
prefixs = {}
Expand Down Expand Up @@ -640,7 +675,8 @@ def main(
ms=ms_fix,
field_idx=field_idxs[ms],
out_dir=out_dir,
prefix=prefixs[ms].resolve(strict=False).as_posix(),
temp_dir=temp_dir,
prefix=prefixs[ms],
simage=simage.resolve(strict=True),
robust=robust,
pols=pols,
Expand Down Expand Up @@ -759,6 +795,11 @@ def imager_parser(parent_parser: bool = False) -> argparse.ArgumentParser:
type=Path,
help="Directory to output images",
)
parser.add_argument(
"--temp_dir",
type=Path,
help="Temporary directory to store intermediate files",
)
parser.add_argument(
"--psf_cutoff",
type=float,
Expand Down Expand Up @@ -929,6 +970,7 @@ def cli():
main(
msdir=args.msdir,
out_dir=args.outdir,
temp_dir=args.temp_dir,
cutoff=args.psf_cutoff,
robust=args.robust,
pols=args.pols,
Expand Down
1 change: 1 addition & 0 deletions arrakis/process_spice.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ def main(args: configargparse.Namespace) -> None:
)(
msdir=args.msdir,
out_dir=args.outdir,
temp_dir=args.temp_dir,
cutoff=args.psf_cutoff,
robust=args.robust,
pols=args.pols,
Expand Down
16 changes: 16 additions & 0 deletions arrakis/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import stat
import warnings
from glob import glob
from pathlib import Path
from typing import Tuple

from astropy.table import Table
Expand All @@ -15,13 +16,28 @@

from arrakis.logger import TqdmToLogger, logger
from arrakis.utils.exceptions import SameFileError, SpecialFileError
from arrakis.utils.typing import PathLike

warnings.filterwarnings(action="ignore", category=SpectralCubeWarning, append=True)
warnings.simplefilter("ignore", category=AstropyWarning)

TQDM_OUT = TqdmToLogger(logger, level=logging.INFO)


def parse_env_path(env_path: PathLike) -> Path:
"""Parse an environment path.
Args:
env_path (str): Environment path.
Returns:
Path: Parsed path.
"""
if isinstance(env_path, Path):
env_path = env_path.as_posix()
return Path(os.path.expandvars(env_path))


def rsync(src, tgt):
os.system(f"rsync -rPvh {src} {tgt}")

Expand Down
2 changes: 1 addition & 1 deletion arrakis/utils/msutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ def wsclean(

# Check for square channels and multiscale
if arguments["squared_channel_joining"] and arguments["multiscale"]:
logger.info("CAUTION - square channel joining and multiscale is unstable!")
logger.warning("CAUTION - square channel joining and multiscale is unstable!")

for key, value in arguments.items():
if type(value) is bool:
Expand Down
2 changes: 2 additions & 0 deletions arrakis/utils/typing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
"""Typing utilities"""

from pathlib import Path
from typing import TypeVar

import numpy as np
Expand All @@ -14,3 +15,4 @@
"ArrayLike", np.ndarray, pd.Series, pd.DataFrame, SkyCoord, Quantity
)
TableLike = TypeVar("TableLike", RMTable, Table)
PathLike = TypeVar("PathLike", str, Path)

0 comments on commit a55e4cb

Please sign in to comment.