Skip to content

Commit

Permalink
Merge pull request #49 from AlecThomson/prefect_all
Browse files Browse the repository at this point in the history
Migrate the whole shebang to Prefect
  • Loading branch information
tjgalvin authored Dec 14, 2023
2 parents b3938d2 + c2132bf commit d8e725f
Show file tree
Hide file tree
Showing 23 changed files with 896 additions and 1,183 deletions.
2 changes: 0 additions & 2 deletions arrakis/.default_config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
# host: # Host of mongodb.
# username: # Username of mongodb.
# password: # Password of mongodb.
port: 8787 # Port to run Dask dashboard on.
# port_forward: # Platform to fowards dask port [None].
# dask_config: # Config file for Dask SlurmCLUSTER.
# holofile:
yanda: "1.3.0"
Expand Down
46 changes: 16 additions & 30 deletions arrakis/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,38 @@
from pathlib import Path
from typing import List, Union

from dask import delayed
from dask.distributed import Client, LocalCluster
from prefect import flow, task, unmapped

from arrakis.logger import logger
from arrakis.utils.pipeline import chunk_dask, logo_str
from arrakis.utils.pipeline import logo_str

logger.setLevel(logging.INFO)


@delayed
def cleanup(workdir: str, stoke: str) -> None:
@task(name="Cleanup directory")
def cleanup(workdir: str, stokeslist: List[str]) -> None:
"""Clean up beam images
Args:
workdir (str): Directory containing images
stoke (str): Stokes parameter
"""
# Clean up beam images
# old_files = glob(f"{workdir}/*.cutout.*.{stoke.lower()}.*beam[00-36]*.fits")
# for old in old_files:
# os.remove(old)
if os.path.basename(workdir) == "slurmFiles":
return
for stoke in stokeslist:
# Clean up beam images
# old_files = glob(f"{workdir}/*.cutout.*.{stoke.lower()}.*beam[00-36]*.fits")
# for old in old_files:
# os.remove(old)

pass
...


@flow(name="Cleanup")
def main(
datadir: Path,
stokeslist: Union[List[str], None] = None,
verbose=True,
) -> None:
"""Clean up beam images
Expand All @@ -55,20 +58,9 @@ def main(
if os.path.isdir(os.path.join(cutdir, name))
]
)

outputs = []
for file in files:
if os.path.basename(file) == "slurmFiles":
continue
for stoke in stokeslist:
output = cleanup(file, stoke)
outputs.append(output)

futures = chunk_dask(
outputs=outputs,
task_name="cleanup",
progress_text="Running cleanup",
verbose=verbose,
outputs = cleanup.map(
workdir=files,
stokeslist=unmapped(stokeslist),
)

logger.info("Cleanup done!")
Expand Down Expand Up @@ -107,14 +99,8 @@ def cli():
if verbose:
logger.setLevel(logging.DEBUG)

cluster = LocalCluster(n_workers=20)
client = Client(cluster)

main(datadir=Path(args.outdir), stokeslist=None, verbose=verbose)

client.close()
cluster.close()


if __name__ == "__main__":
cli()
38 changes: 13 additions & 25 deletions arrakis/configs/default.yaml
Original file line number Diff line number Diff line change
@@ -1,25 +1,13 @@
# Set up for Magnus
cores: 24
processes: 12
name: 'spice-worker'
memory: "60GB"
project: 'ja3'
queue: 'workq'
n_workers: 1000
walltime: '6:00:00'
job_extra: ['-M magnus']
# interface for the workers
interface: "ipogif0"
log_directory: 'spice_logs'
env_extra: [
'export OMP_NUM_THREADS=1',
'source /home/$(whoami)/.bashrc',
'conda activate spice'
]
python: 'srun -n 1 -c 24 python'
extra: [
"--lifetime", "11h",
"--lifetime-stagger", "5m",
]
death_timeout: 300
local_directory: '/dev/shm'
# Set up for local mahine
cluster_class: "distributed.LocalCluster"
cluster_kwargs:
cores: 1
processes: 1
name: 'spice-worker'
memory: "8GB"
adapt_kwargs:
minimum: 1
maximum: 8
wait_count: 20
target_duration: "300s"
interval: "30s"
25 changes: 0 additions & 25 deletions arrakis/configs/galaxy.yaml

This file was deleted.

25 changes: 0 additions & 25 deletions arrakis/configs/magnus.yaml

This file was deleted.

52 changes: 25 additions & 27 deletions arrakis/configs/petrichor.yaml
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
# Set up for Petrichor
cores: 8
processes: 8
name: 'spice-worker'
memory: "64GiB"
account: 'OD-217087'
#queue: 'workq'
walltime: '0-8:00:00'
job_extra_directives: ['--qos express']
# interface for the workers
interface: "ib0"
log_directory: 'spice_logs'
job_script_prologue: [
'module load singularity',
]
# job_script_prologue: [
# 'export OMP_NUM_THREADS=1',
# 'source /home/$(whoami)/.bashrc',
# 'conda activate spice'
# ]
# python: 'srun -n 1 -c 64 python'
#worker_extra_args: [
# "--lifetime", "23h",
# "--lifetime-stagger", "5m",
#]
death_timeout: 1000
local_directory: $LOCALDIR
silence_logs: 'info'
cluster_class: "dask_jobqueue.SLURMCluster"
cluster_kwargs:
cores: 8
processes: 8
name: 'spice-worker'
memory: "64GiB"
account: 'OD-217087'
#queue: 'workq'
walltime: '0-01:00:00'
job_extra_directives: ['--qos express']
# interface for the workers
interface: "ib0"
log_directory: 'spice_logs'
job_script_prologue: [
'module load singularity',
'unset SINGULARITY_BINDPATH'
]
local_directory: $LOCALDIR
silence_logs: 'info'
adapt_kwargs:
minimum: 1
maximum: 36
wait_count: 20
target_duration: "300s"
interval: "30s"
Loading

0 comments on commit d8e725f

Please sign in to comment.