Skip to content

Commit

Permalink
Add cats
Browse files Browse the repository at this point in the history
  • Loading branch information
AlecThomson committed Dec 12, 2023
1 parent b80c363 commit 4e373ff
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 46 deletions.
54 changes: 12 additions & 42 deletions arrakis/makecat.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from astropy.stats import sigma_clip
from astropy.table import Column, Table
from dask.diagnostics import ProgressBar
from prefect import flow, task, unmapped
from rmtable import RMTable
from scipy.stats import lognorm, norm
from tqdm import tqdm
Expand Down Expand Up @@ -218,6 +219,7 @@ def lognorm_from_percentiles(x1, p1, x2, p2):
return scale, np.exp(mean)


@task(name="Fix sigma_add")
def sigma_add_fix(tab):
sigma_Q_low = np.array(tab["sigma_add_Q"] - tab["sigma_add_Q_err_minus"])
sigma_Q_high = np.array(tab["sigma_add_Q"] + tab["sigma_add_Q_err_plus"])
Expand Down Expand Up @@ -292,7 +294,7 @@ def get_fit_func(
degree: int = 2,
do_plot: bool = False,
high_snr_cut: float = 30.0,
):
) -> Tuple[np.polynomial.Polynomial.fit, plt.Figure]:
"""Fit an envelope to define leakage sources
Args:
Expand All @@ -313,6 +315,10 @@ def get_fit_func(

logger.info(f"{np.sum(hi_snr)} sources with Stokes I SNR above {high_snr_cut=}.")

if len(hi_i_tab) < 100:
logger.critcal("Not enough high SNR sources to fit leakage envelope.")
return np.polynomial.Polynomial.fit([0], [0], deg=1), plt.figure()

# Get fractional pol
frac_P = np.array(hi_i_tab["fracpol"].value)
# Bin sources by separation from tile centre
Expand Down Expand Up @@ -340,7 +346,6 @@ def get_fit_func(

# Plot the fit
latexify(columns=2)
figure = plt.figure(facecolor="w")
fig = plt.figure(facecolor="w")
color = "tab:green"
stoke = {
Expand All @@ -360,17 +365,6 @@ def get_fit_func(
zorder=0,
rasterized=True,
)

# is_finite = np.logical_and(
# np.isfinite(hi_i_tab["beamdist"].to(u.deg).value), np.isfinite(frac_P)
# )
# hist2d(
# np.array(hi_i_tab["beamdist"].to(u.deg).value)[is_finite, np.newaxis],
# np.array(frac_P)[is_finite, np.newaxis],
# bins=(nbins, nbins),
# range=[[0, 5], [0, 0.05]],
# # color=color,
# )
plt.plot(bins_c, meds, alpha=1, c=color, label="Median", linewidth=2)
for s, ls in zip((1, 2), ("--", ":")):
for r in ("ups", "los"):
Expand Down Expand Up @@ -507,6 +501,7 @@ def masker(x):
return cat_out


@task(name="Add cuts and flags")
def cuts_and_flags(cat: RMTable) -> RMTable:
"""Cut out bad sources, and add flag columns
Expand Down Expand Up @@ -564,6 +559,7 @@ def cuts_and_flags(cat: RMTable) -> RMTable:
return cat_out, fit


@task(name="Get spectral indices")
def get_alpha(cat):
coefs_str = cat["stokesI_model_coef"]
coefs_err_str = cat["stokesI_model_coef_err"]
Expand Down Expand Up @@ -592,6 +588,7 @@ def get_alpha(cat):
)


@task(name="Get integration times")
def get_integration_time(cat, field_col):
logger.warn("Will be stripping the trailing field character prefix. ")
field_names = [
Expand Down Expand Up @@ -620,31 +617,6 @@ def get_integration_time(cat, field_col):
return np.array(tints) * u.s


# Stolen from GASKAP pipeline
# Credit to J. Dempsey
# https://github.com/GASKAP/GASKAP-HI-Absorption-Pipeline/
# https://github.com/GASKAP/GASKAP-HI-Absorption-Pipeline/blob/
# def add_col_metadata(vo_table, col_name, description, units=None, ucd=None, datatype=None):
# """Add metadata to a VO table column.

# Args:
# vo_table (vot.): VO Table
# col_name (str): Column name
# description (str): Long description of the column
# units (u.Unit, optional): Unit of column. Defaults to None.
# ucd (str, optional): UCD string. Defaults to None.
# datatype (_type_, optional): _description_. Defaults to None.
# """
# col = vo_table.get_first_table().get_field_by_id(col_name)
# col.description = description
# if units:
# col.unit = units
# if ucd:
# col.ucd = ucd
# if datatype:
# col.datatype = datatype


def add_metadata(vo_table: vot.tree.Table, filename: str):
"""Add metadata to VO Table for CASDA
Expand Down Expand Up @@ -732,6 +704,7 @@ def fix_blank_units(rmtab: TableLike) -> TableLike:
return rmtab


@task(name="Write votable")
def write_votable(rmtab: TableLike, outfile: str) -> None:
# Replace bad column names
fix_columns = {
Expand All @@ -752,6 +725,7 @@ def write_votable(rmtab: TableLike, outfile: str) -> None:
replace_nans(outfile)


@flow(name="Make catalogue")
def main(
field: str,
host: str,
Expand Down Expand Up @@ -862,10 +836,6 @@ def main(
alpha_dict = get_alpha(rmtab)
rmtab.add_column(Column(data=alpha_dict["alphas"], name="spectral_index"))
rmtab.add_column(Column(data=alpha_dict["alphas_err"], name="spectral_index_err"))
# rmtab.add_column(Column(data=alpha_dict["betas"], name="spectral_curvature"))
# rmtab.add_column(
# Column(data=alpha_dict["betas_err"], name="spectral_curvature_err")
# )

# Add integration time
field_col = get_field_db(
Expand Down
5 changes: 1 addition & 4 deletions arrakis/process_spice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
from arrakis.utils.database import test_db
from arrakis.utils.pipeline import logo_str, performance_report_prefect

# Defining tasks
cat_task = task(makecat.main, name="Catalogue")


@flow(name="Combining+Synthesis on Arrakis")
def process_spice(args, host: str) -> None:
Expand Down Expand Up @@ -171,7 +168,7 @@ def process_spice(args, host: str) -> None:
)

previous_future = (
cat_task.submit(
makecat.main(
field=args.field,
host=host,
epoch=args.epoch,
Expand Down

0 comments on commit 4e373ff

Please sign in to comment.