From e4806b401a39d4b23b7ae390d10f9ae6f482243e Mon Sep 17 00:00:00 2001 From: "Thomson, Alec (CASS, Kensington)" Date: Wed, 10 Apr 2024 12:21:49 +1000 Subject: [PATCH] Use dataframe --- arrakis/linmos.py | 46 +++++++++++++++++++++------------------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/arrakis/linmos.py b/arrakis/linmos.py index 37f4a644..f9890ccc 100644 --- a/arrakis/linmos.py +++ b/arrakis/linmos.py @@ -10,8 +10,9 @@ from pprint import pformat from typing import Dict, List from typing import NamedTuple as Struct -from typing import Optional +from typing import Optional, Tuple +import pandas as pd import pymongo from astropy.utils.exceptions import AstropyWarning from prefect import flow, task, unmapped @@ -43,7 +44,7 @@ class ImagePaths(Struct): @task(name="Find images") def find_images( field: str, - beams: dict, + beams_row: Tuple[int, pd.Series], stoke: str, datadir: Path, ) -> ImagePaths: @@ -62,15 +63,17 @@ def find_images( ImagePaths: List of images and weights. """ logger.setLevel(logging.INFO) - - src_name = beams["Source_ID"] - field_beams = beams["beams"][field] + beams = beams_row[1] + src_name = beams.Source_ID + field_beams = beams.beams[field] # First check that the images exist image_list: List[Path] = [] for bm in list(set(field_beams["beam_list"])): # Ensure list of beams is unique! imfile = Path(field_beams[f"{stoke.lower()}_beam{bm}_image_file"]) - assert imfile.parent.name == src_name, "Looking in wrong directory!" + assert ( + imfile.parent.name == src_name + ), f"Looking in wrong directory! '{imfile.parent.name}'" new_imfile = datadir.resolve() / imfile image_list.append(new_imfile) image_list = sorted(image_list) @@ -81,7 +84,9 @@ def find_images( weight_list: List[Path] = [] for bm in list(set(field_beams["beam_list"])): # Ensure list of beams is unique! wgtsfile = Path(field_beams[f"{stoke.lower()}_beam{bm}_weight_file"]) - assert wgtsfile.parent.name == src_name, "Looking in wrong directory!" + assert ( + wgtsfile.parent.name == src_name + ), f"Looking in wrong directory! '{wgtsfile.parent.name}'" new_wgtsfile = datadir.resolve() / wgtsfile weight_list.append(new_wgtsfile) weight_list = sorted(weight_list) @@ -278,6 +283,7 @@ def main( datadir: Path, host: str, epoch: int, + sbid: Optional[int], holofile: Optional[Path] = None, username: Optional[str] = None, password: Optional[str] = None, @@ -316,36 +322,25 @@ def main( logger.debug(f"{beams_col = }") # Query the DB query = {"$and": [{f"beams.{field}": {"$exists": True}}]} + if sbid is not None: + query["$and"].append({f"beams.{field}.SBIDs": sbid}) logger.info(f"The query is {query=}") island_ids: List[str] = sorted(beams_col.distinct("Source_ID", query)) - big_beams: List[dict] = list( + big_beams = pd.DataFrame( beams_col.find({"Source_ID": {"$in": island_ids}}).sort("Source_ID") ) - big_comps: List[dict] = list( - comp_col.find({"Source_ID": {"$in": island_ids}}).sort("Source_ID") - ) - comps: List[List[dict]] = [] - for island_id in island_ids: - _comps = [] - for c in big_comps: - if c["Source_ID"] == island_id: - _comps.append(c) - comps.append(_comps) - - assert len(big_beams) == len(comps) if limit is not None: logger.critical(f"Limiting to {limit} islands") big_beams = big_beams[:limit] - comps = comps[:limit] all_parfiles = [] for stoke in stokeslist: image_paths = find_images.map( field=unmapped(field), - beams=big_beams, + beams_row=big_beams.iterrows(), stoke=unmapped(stoke.capitalize()), datadir=unmapped(cutdir), ) @@ -420,10 +415,10 @@ def cli(): description=lin_parser.description, ) args = parser.parse_args() - - verbose = args.verbose test_db( - host=args.host, username=args.username, password=args.password, verbose=verbose + host=args.host, + username=args.username, + password=args.password, ) main( @@ -433,6 +428,7 @@ def cli(): ), host=args.host, epoch=args.epoch, + sbid=args.sbid, holofile=Path(args.holofile), username=args.username, password=args.password,