Skip to content

Commit

Permalink
Merge pull request #69 from AlecThomson/queries
Browse files Browse the repository at this point in the history
Queries
  • Loading branch information
AlecThomson authored May 9, 2024
2 parents 169cf3b + f245b03 commit ed5f180
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 403 deletions.
6 changes: 3 additions & 3 deletions arrakis/configs/rm_petrichor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cluster_kwargs:
name: 'spice-worker'
memory: "128GiB"
account: 'OD-217087'
walltime: '0-00:15:00'
walltime: '0-00:45:00'
job_extra_directives: ['--qos express']
# interface for the workers
interface: "ib0"
Expand All @@ -18,10 +18,10 @@ cluster_kwargs:
]
local_directory: $LOCALDIR
silence_logs: 'info'
worker_extra_args: ["--lifetime", "14m", "--lifetime-stagger", "2m", "--memory-limit", "128GiB"]
worker_extra_args: ["--memory-limit", "128GiB"]
adapt_kwargs:
minimum: 108
maximum: 512
maximum: 108
wait_count: 20
target_duration: "5s"
interval: "10s"
70 changes: 30 additions & 40 deletions arrakis/makecat.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,23 +877,11 @@ def main(

logger.info("Starting component collection query")
tick = time.time()
save_name = field if sbid is None else f"{field}_{sbid}"
query = {
"$and": [
{"Source_ID": {"$in": all_island_ids}},
{
(
f"{field}.rmsynth1d"
if sbid is None
else f"{field}_{sbid}.rmsynth1d"
): True
},
{
(
f"{field}.rmclean1d"
if sbid is None
else f"{field}_{sbid}.rmclean1d"
): True
},
{"rm_outputs_1d.field": save_name, "rm_outputs_1d.rmsynth1d": True},
]
}

Expand All @@ -902,27 +890,39 @@ def main(
fields.update({n: 1})
for n in columns_possum.sourcefinder_columns:
fields.update({n: 1})

fields.update(
{
(
f"{field}.rmsynth_summary"
if sbid is None
else f"{field}_{sbid}.rmsynth_summary"
): 1
"rmsynth_summary": {
"$arrayElemAt": [
"$rm_outputs_1d.rmsynth_summary",
{"$indexOfArray": ["$rm_outputs_1d.field", save_name]},
]
}
}
)
fields.update(
{
(
f"{field}.rmclean_summary"
if sbid is None
else f"{field}_{sbid}.rmclean_summary"
): 1
"rmclean_summary": {
"$arrayElemAt": [
"$rm_outputs_1d.rmclean_summary",
{"$indexOfArray": ["$rm_outputs_1d.field", save_name]},
]
}
}
)
fields.update({f"{field}.header" if sbid is None else f"{field}_{sbid}.header": 1})

comps_df = pd.DataFrame(comp_col.find(query, fields))
fields.update(
{
"header": {
"$arrayElemAt": [
"$rm_outputs_1d.header",
{"$indexOfArray": ["$rm_outputs_1d.field", save_name]},
]
}
}
)
pipeline = [{"$match": query}, {"$project": fields}]
comps_df = pd.DataFrame(comp_col.aggregate(pipeline))
comps_df.set_index("Source_ID", inplace=True)
tock = time.time()
logger.info(f"Finished component collection query - {tock-tick:.2f}s")
Expand Down Expand Up @@ -982,25 +982,15 @@ def main(
if src == "synth":
for src_id, comp in comps_df.iterrows():
try:
data += [
comp[field if sbid is None else f"{field}_{sbid}"][
"rmclean_summary"
][col]
]
data += [comp["rmclean_summary"][col]]
except KeyError:
data += [
comp[field if sbid is None else f"{field}_{sbid}"][
"rmsynth_summary"
][col]
]
data += [comp["rmsynth_summary"][col]]
new_col = Column(data=data, name=name, dtype=typ, unit=unit)
rmtab.add_column(new_col)

if src == "header":
for src_id, comp in comps_df.iterrows():
data += [
comp[field if sbid is None else f"{field}_{sbid}"]["header"][col]
]
data += [comp["header"][col]]
new_col = Column(data=data, name=name, dtype=typ, unit=unit)
rmtab.add_column(new_col)

Expand Down
1 change: 0 additions & 1 deletion arrakis/process_region.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def process_merge(args, host: str, inter_dir: str, task_runner) -> None:
dimension=args.dimension,
verbose=args.verbose,
database=args.database,
do_validate=args.validate,
limit=args.limit,
savePlots=args.save_plots,
weightType=args.weight_type,
Expand Down
2 changes: 0 additions & 2 deletions arrakis/process_spice.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def process_spice(args, host: str, task_runner: BaseTaskRunner) -> None:
dimension=args.dimension,
verbose=args.verbose,
database=args.database,
do_validate=args.validate,
limit=args.limit,
savePlots=args.save_plots,
weightType=args.weight_type,
Expand Down Expand Up @@ -156,7 +155,6 @@ def process_spice(args, host: str, task_runner: BaseTaskRunner) -> None:
maxIter=args.max_iter,
gain=args.gain,
window=args.window,
showPlots=args.show_plots,
rm_verbose=args.rm_verbose,
)
if not args.skip_rmclean
Expand Down
Loading

0 comments on commit ed5f180

Please sign in to comment.