Skip to content

Commit

Permalink
Use upserts
Browse files Browse the repository at this point in the history
  • Loading branch information
AlecThomson committed Apr 10, 2024
1 parent 8cc1949 commit 73ceb86
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 117 deletions.
154 changes: 69 additions & 85 deletions arrakis/rmclean_oncuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,89 +61,73 @@ def rmclean1d(
"""
iname = comp["Source_ID"]
cname = comp["Gaussian_ID"]

logger.debug(f"Working on {comp}")
save_name = field if sbid is None else f"{field}_{sbid}"
try:
rm1dfiles = comp["rm1dfiles"]
fdfFile = outdir / f"{rm1dfiles['FDF_dirty']}"
rmsfFile = outdir / f"{rm1dfiles['RMSF']}"
weightFile = outdir / f"{rm1dfiles['weights']}"
rmSynthFile = outdir / f"{rm1dfiles['summary_json']}"

prefix = os.path.join(os.path.abspath(os.path.dirname(fdfFile)), cname)

# Sanity checks
for f in [weightFile, fdfFile, rmsfFile, rmSynthFile]:
logger.debug(f"Checking {f.absolute()}")
if not f.exists():
logger.fatal(f"File does not exist: '{f}'.")
raise FileNotFoundError(f"File does not exist: '{f}'")

nBits = 32
mDict, aDict = do_RMclean_1D.readFiles(
fdfFile, rmsfFile, weightFile, rmSynthFile, nBits
)

# Run RM-CLEAN on the spectrum
outdict, arrdict = do_RMclean_1D.run_rmclean(
mDict=mDict,
aDict=aDict,
cutoff=cutoff,
maxIter=maxIter,
gain=gain,
nBits=nBits,
showPlots=showPlots,
verbose=rm_verbose,
prefixOut=prefix,
saveFigures=savePlots,
window=window,
)
# Ensure JSON serializable
for k, v in outdict.items():
if isinstance(v, np.float_):
outdict[k] = float(v)
elif isinstance(v, np.float32):
outdict[k] = float(v)
elif isinstance(v, np.int_):
outdict[k] = int(v)
elif isinstance(v, np.int32):
outdict[k] = int(v)
elif isinstance(v, np.ndarray):
outdict[k] = v.tolist()

# Save output
do_RMclean_1D.saveOutput(outdict, arrdict, prefixOut=prefix, verbose=rm_verbose)
if savePlots:
plt.close("all")
plotdir = outdir / "plots"
plot_files = list(fdfFile.parent.glob("*.pdf"))
for plot_file in plot_files:
copyfile(plot_file, plotdir / plot_file.name)

# Load into Mongo
myquery = {"Gaussian_ID": cname}

newvalues = {
"$set": {
save_name: {
"rmclean1d": True,
"rmclean_summary": outdict,
},
}
}
except KeyError:
logger.critical("Failed to load data! RM-CLEAN not applied to component!")
logger.critical(f"Island is {iname}, component is {cname}")
myquery = {"Gaussian_ID": cname}

newvalues = {
"$set": {
save_name: {
"rmclean1d": False,
},
}
}
rm1dfiles = comp[save_name]["rm1dfiles"]
fdfFile = outdir / f"{rm1dfiles['FDF_dirty']}"
rmsfFile = outdir / f"{rm1dfiles['RMSF']}"
weightFile = outdir / f"{rm1dfiles['weights']}"
rmSynthFile = outdir / f"{rm1dfiles['summary_json']}"

prefix = os.path.join(os.path.abspath(os.path.dirname(fdfFile)), cname)

# Sanity checks
for f in [weightFile, fdfFile, rmsfFile, rmSynthFile]:
logger.debug(f"Checking {f.absolute()}")
if not f.exists():
logger.fatal(f"File does not exist: '{f}'.")
raise FileNotFoundError(f"File does not exist: '{f}'")

nBits = 32
mDict, aDict = do_RMclean_1D.readFiles(
fdfFile, rmsfFile, weightFile, rmSynthFile, nBits
)

# Run RM-CLEAN on the spectrum
outdict, arrdict = do_RMclean_1D.run_rmclean(
mDict=mDict,
aDict=aDict,
cutoff=cutoff,
maxIter=maxIter,
gain=gain,
nBits=nBits,
showPlots=showPlots,
verbose=rm_verbose,
prefixOut=prefix,
saveFigures=savePlots,
window=window,
)
# Ensure JSON serializable
for k, v in outdict.items():
if isinstance(v, np.float_):
outdict[k] = float(v)
elif isinstance(v, np.float32):
outdict[k] = float(v)
elif isinstance(v, np.int_):
outdict[k] = int(v)
elif isinstance(v, np.int32):
outdict[k] = int(v)
elif isinstance(v, np.ndarray):
outdict[k] = v.tolist()

# Save output
do_RMclean_1D.saveOutput(outdict, arrdict, prefixOut=prefix, verbose=rm_verbose)
if savePlots:
plt.close("all")
plotdir = outdir / "plots"
plot_files = list(fdfFile.parent.glob("*.pdf"))
for plot_file in plot_files:
copyfile(plot_file, plotdir / plot_file.name)

# Load into Mongo
myquery = {"Gaussian_ID": cname}

to_update = comp[save_name]
to_update["rmclean1d"] = True
to_update["rmclean_summary"] = outdict

newvalues = {"$set": {save_name: to_update}}
return pymongo.UpdateOne(myquery, newvalues)


Expand Down Expand Up @@ -201,8 +185,10 @@ def rmclean3d(
)
# Load into Mongo
save_name = field if sbid is None else f"{field}_{sbid}"
to_update = island[save_name]
to_update["rmclean3d"] = True
myquery = {"Source_ID": iname}
newvalues = {"$set": {save_name: {"rmclean3d": True}}}
newvalues = {"$set": {save_name: to_update}}
return pymongo.UpdateOne(myquery, newvalues)


Expand Down Expand Up @@ -295,7 +281,7 @@ def main(
# Only get required values
{
"Source_ID": 1,
"rm3dfiles": 1,
f"{field}" if sbid is None else f"{field}_{sbid}": 1,
},
).sort("Source_ID")
)
Expand All @@ -311,7 +297,6 @@ def main(
): False
}
},
upsert=True,
)
logger.info(pformat(result.raw_result))

Expand All @@ -336,7 +321,7 @@ def main(
{
"Source_ID": 1,
"Gaussian_ID": 1,
"rm1dfiles": 1,
f"{field}" if sbid is None else f"{field}_{sbid}": 1,
},
).sort("Source_ID")
)
Expand All @@ -352,7 +337,6 @@ def main(
): True
}
},
upsert=True,
)
logger.info(pformat(result.raw_result))

Expand Down
69 changes: 38 additions & 31 deletions arrakis/rmsynth_oncuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def rmsynthoncut3d(
}
}
}
return pymongo.UpdateOne(myquery, badvalues)
return pymongo.UpdateOne(myquery, badvalues, upsert=True)

bkgq, rmsq = cubelet_bane(dataQ, header)
rmsq[rmsq == 0] = np.nan
Expand Down Expand Up @@ -227,7 +227,7 @@ def rmsynthoncut3d(
}
}
}
return pymongo.UpdateOne(myquery, newvalues)
return pymongo.UpdateOne(myquery, newvalues, upsert=True)


def cubelet_bane(cubelet: np.ndarray, header: fits.Header) -> Tuple[np.ndarray]:
Expand Down Expand Up @@ -476,6 +476,8 @@ def update_rmtools_dict(
for key, val in fit_dict["fit_flag"].items():
mDict[f"fit_flag_{key}"] = val

return mDict


@task(name="1D RM-synthesis")
def rmsynthoncut1d(
Expand Down Expand Up @@ -525,6 +527,7 @@ def rmsynthoncut1d(
rm_verbose (bool, optional): Verbose RMsynth. Defaults to False.
"""
logger.setLevel(logging.INFO)
save_name = field if sbid is None else f"{field}_{sbid}"
comp = comp_tuple[1]
beam = dict(beam_tuple[1])

Expand All @@ -547,8 +550,8 @@ def rmsynthoncut1d(
if np.isnan(spectrum.data).all():
logger.critical(f"Entire data is NaN for {iname} in {spectrum.filename}")
myquery = {"Gaussian_ID": cname}
badvalues = {"$set": {"rmsynth1d": False}}
return pymongo.UpdateOne(myquery, badvalues)
badvalues = {"$set": {save_name: {"rmsynth1d": False}}}
return pymongo.UpdateOne(myquery, badvalues, upsert=True)

prefix = f"{os.path.dirname(stokes_spectra.i.filename)}/{cname}"

Expand All @@ -573,14 +576,14 @@ def rmsynthoncut1d(
):
logger.critical(f"{cname} QU data is all NaNs.")
myquery = {"Gaussian_ID": cname}
badvalues = {"$set": {"rmsynth1d": False}}
return pymongo.UpdateOne(myquery, badvalues)
badvalues = {"$set": {save_name: {"rmsynth1d": False}}}
return pymongo.UpdateOne(myquery, badvalues, upsert=True)
# And I
if np.isnan(filtered_stokes_spectra.i.data).all():
logger.critical(f"{cname} I data is all NaNs.")
myquery = {"Gaussian_ID": cname}
badvalues = {"$set": {"rmsynth1d": False}}
return pymongo.UpdateOne(myquery, badvalues)
badvalues = {"$set": {save_name: {"rmsynth1d": False}}}
return pymongo.UpdateOne(myquery, badvalues, upsert=True)

data = [np.array(freq)]
bkg_data = [np.array(freq)]
Expand Down Expand Up @@ -681,8 +684,6 @@ def rmsynthoncut1d(
logger.debug(f"Heading for {cname} is {pformat(head_dict)}")

outer_dir = os.path.basename(os.path.dirname(filtered_stokes_spectra.i.filename))

save_name = field if sbid is None else f"{field}_{sbid}"
newvalues = {
"$set": {
save_name: {
Expand Down Expand Up @@ -734,7 +735,7 @@ def rmsynthoncut1d(
}
}
}
return pymongo.UpdateOne(myquery, newvalues)
return pymongo.UpdateOne(myquery, newvalues, upsert=True)


def rmsynthoncut_i(
Expand Down Expand Up @@ -999,37 +1000,42 @@ def main(

# Unset rmsynth in db
if dimension == "1d":
logger.info(f"Unsetting rmsynth1d for {n_comp} components")
query_1d = {"Source_ID": {"$in": island_ids}}
update_1d = {
"$set": {
(
f"{field}.rmsynth1d"
if sbid is None
else f"{field}_{sbid}.rmsynth1d"
): False
}
}
logger.info(pformat(update_1d))

result = comp_col.update_many(
query_1d,
{
"$set": {
(
f"{field}.rmsynth1d"
if sbid is None
else f"{field}_{sbid}.rmsynth1d"
): False
}
},
update_1d,
upsert=True,
)
logger.info(pformat(result.raw_result))

elif dimension == "3d":
logger.info(f"Unsetting rmsynth3d for {n_island} islands")
query_3d = {"Source_ID": {"$in": island_ids}}

update_3d = {
"$set": {
(
f"{field}.rmsynth3d"
if sbid is None
else f"{field}_{sbid}.rmsynth3d"
): False
}
}
logger.info(pformat(update_3d))
result = island_col.update(
query_3d,
{
"$set": {
(
f"{field}.rmsynth3d"
if sbid is None
else f"{field}_{sbid}.rmsynth3d"
): False
}
},
update_3d,
upsert=True,
)

Expand Down Expand Up @@ -1275,8 +1281,9 @@ def cli():
gen_parser = generic_parser(parent_parser=True)
work_parser = workdir_arg_parser(parent_parser=True)
synth_parser = rmsynth_parser(parent_parser=True)
common_parser = rm_common_parser(parent_parser=True)
parser = argparse.ArgumentParser(
parents=[gen_parser, work_parser, synth_parser],
parents=[gen_parser, work_parser, common_parser, synth_parser],
formatter_class=UltimateHelpFormatter,
description=synth_parser.description,
)
Expand Down
2 changes: 1 addition & 1 deletion arrakis/utils/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def generic_parser(parent_parser: bool = False) -> argparse.ArgumentParser:
)
parser.add_argument(
"--limit",
type=Optional[int],
type=int,
default=None,
help="Limit the number of islands to process.",
)
Expand Down

0 comments on commit 73ceb86

Please sign in to comment.