Skip to content

Commit

Permalink
Use dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
AlecThomson committed Apr 10, 2024
1 parent 6cdd8e4 commit e4806b4
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions arrakis/linmos.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
from pprint import pformat
from typing import Dict, List
from typing import NamedTuple as Struct
from typing import Optional
from typing import Optional, Tuple

import pandas as pd
import pymongo
from astropy.utils.exceptions import AstropyWarning
from prefect import flow, task, unmapped
Expand Down Expand Up @@ -43,7 +44,7 @@ class ImagePaths(Struct):
@task(name="Find images")
def find_images(
field: str,
beams: dict,
beams_row: Tuple[int, pd.Series],
stoke: str,
datadir: Path,
) -> ImagePaths:
Expand All @@ -62,15 +63,17 @@ def find_images(
ImagePaths: List of images and weights.
"""
logger.setLevel(logging.INFO)

src_name = beams["Source_ID"]
field_beams = beams["beams"][field]
beams = beams_row[1]
src_name = beams.Source_ID
field_beams = beams.beams[field]

# First check that the images exist
image_list: List[Path] = []
for bm in list(set(field_beams["beam_list"])): # Ensure list of beams is unique!
imfile = Path(field_beams[f"{stoke.lower()}_beam{bm}_image_file"])
assert imfile.parent.name == src_name, "Looking in wrong directory!"
assert (
imfile.parent.name == src_name
), f"Looking in wrong directory! '{imfile.parent.name}'"
new_imfile = datadir.resolve() / imfile
image_list.append(new_imfile)
image_list = sorted(image_list)
Expand All @@ -81,7 +84,9 @@ def find_images(
weight_list: List[Path] = []
for bm in list(set(field_beams["beam_list"])): # Ensure list of beams is unique!
wgtsfile = Path(field_beams[f"{stoke.lower()}_beam{bm}_weight_file"])
assert wgtsfile.parent.name == src_name, "Looking in wrong directory!"
assert (
wgtsfile.parent.name == src_name
), f"Looking in wrong directory! '{wgtsfile.parent.name}'"
new_wgtsfile = datadir.resolve() / wgtsfile
weight_list.append(new_wgtsfile)
weight_list = sorted(weight_list)
Expand Down Expand Up @@ -278,6 +283,7 @@ def main(
datadir: Path,
host: str,
epoch: int,
sbid: Optional[int],
holofile: Optional[Path] = None,
username: Optional[str] = None,
password: Optional[str] = None,
Expand Down Expand Up @@ -316,36 +322,25 @@ def main(
logger.debug(f"{beams_col = }")
# Query the DB
query = {"$and": [{f"beams.{field}": {"$exists": True}}]}
if sbid is not None:
query["$and"].append({f"beams.{field}.SBIDs": sbid})

logger.info(f"The query is {query=}")

island_ids: List[str] = sorted(beams_col.distinct("Source_ID", query))
big_beams: List[dict] = list(
big_beams = pd.DataFrame(
beams_col.find({"Source_ID": {"$in": island_ids}}).sort("Source_ID")
)
big_comps: List[dict] = list(
comp_col.find({"Source_ID": {"$in": island_ids}}).sort("Source_ID")
)
comps: List[List[dict]] = []
for island_id in island_ids:
_comps = []
for c in big_comps:
if c["Source_ID"] == island_id:
_comps.append(c)
comps.append(_comps)

assert len(big_beams) == len(comps)

if limit is not None:
logger.critical(f"Limiting to {limit} islands")
big_beams = big_beams[:limit]
comps = comps[:limit]

all_parfiles = []
for stoke in stokeslist:
image_paths = find_images.map(
field=unmapped(field),
beams=big_beams,
beams_row=big_beams.iterrows(),
stoke=unmapped(stoke.capitalize()),
datadir=unmapped(cutdir),
)
Expand Down Expand Up @@ -420,10 +415,10 @@ def cli():
description=lin_parser.description,
)
args = parser.parse_args()

verbose = args.verbose
test_db(
host=args.host, username=args.username, password=args.password, verbose=verbose
host=args.host,
username=args.username,
password=args.password,
)

main(
Expand All @@ -433,6 +428,7 @@ def cli():
),
host=args.host,
epoch=args.epoch,
sbid=args.sbid,
holofile=Path(args.holofile),
username=args.username,
password=args.password,
Expand Down

0 comments on commit e4806b4

Please sign in to comment.