Skip to content

Commit

Permalink
Merge pull request #162 from tjgalvin/meanv
Browse files Browse the repository at this point in the history
Calculate mean leakage
  • Loading branch information
tjgalvin authored Aug 15, 2024
2 parents 1e62863 + 71a2ec8 commit cdab453
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions flint/leakage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from argparse import ArgumentParser
from pathlib import Path
from typing import Dict, NamedTuple, Union, Optional, Tuple
from typing import Dict, NamedTuple, Union, Optional

import astropy.units as u
import numpy as np
Expand Down Expand Up @@ -33,6 +33,8 @@ class LeakageFilters(NamedTuple):
"""The size of a box to search for peak polarised signal in"""
noise_box_size: int = 30
"""the size of a box to compute a local RMS noise measure from"""
mean_box_size: int = 10
"""The size of abox to compute a local mean measure over"""
source_snr: float = 40
"""Minimum stokes-I signal-to-noise ratio"""

Expand Down Expand Up @@ -225,12 +227,24 @@ def load_and_filter_components(
return comp_table


class PolStatistics(NamedTuple):
"""Simple container for statistics around the extraction of leakage polarisation statistics"""

peak: np.ndarray
"""The peak pixel value"""
noise: np.ndarray
"""The standard deviation of pixels within a box"""
mean: np.ndarray
"""The mean of pixels within a box"""


def extract_pol_stats_in_box(
pol_image: np.ndarray,
pixel_coords: PixelCoords,
search_box_size: int,
noise_box_size: int,
) -> Tuple[np.ndarray, np.ndarray]:
mean_box_size: int,
) -> PolStatistics:
"""Construct two boxes around nominated pixel coordinates to:
* extract the peak signal within
Expand All @@ -241,17 +255,20 @@ def extract_pol_stats_in_box(
pixel_coords (PixelCoords): Collection of pixel positioncs to evaluate the peak polarisation and noise at
search_box_size (int): Size of box to extract the maximum polarised signal from
noise_box_size (int): Size of box to calculate the RMS over
mean_box_size (int): Size of box to calculate an mean over
Returns:
Tuple[np.ndarray, np.ndarray]: Extracted peak polarised signal and noise
PolStatistics: Extracted statistics, including peak polarised signal, noise and mean
"""
y_max, x_max = pol_image.shape[-2:]

logger.info(f"{pol_image.shape=}, extracted {y_max=} and {x_max=}")

pol_peak = None
pol_noise = None
for idx, box_size in enumerate((search_box_size, noise_box_size)):
pol_mean = None
# TODO: This loop should be reformed to allow iterating over something that defines the mode and box
for idx, box_size in enumerate((search_box_size, noise_box_size, mean_box_size)):
box_delta = int(np.ceil(box_size / 2))

y_edge_min = np.maximum(pixel_coords.y - box_delta, 0).astype(int)
Expand Down Expand Up @@ -291,11 +308,21 @@ def extract_pol_stats_in_box(
for data in search_box
]
)
elif idx == 2:
pol_mean = np.array(
[
np.nanmean(data) if np.any(np.isfinite(data)) else np.nan
for data in search_box
]
)

assert pol_peak is not None, f"{pol_peak=}, which should not happen"
assert pol_noise is not None, f"{pol_noise=}, which should not happen"
assert pol_mean is not None, f"{pol_mean=}, which should not happen"

pol_stats = PolStatistics(peak=pol_peak, noise=pol_noise, mean=pol_mean)

return pol_peak, pol_noise
return pol_stats


def _get_output_catalogue_path(
Expand Down Expand Up @@ -354,18 +381,20 @@ def create_leakge_component_table(
i_values = components[peak_flux_col]

pol_pixel_coords = get_xy_pixel_coords(table=components, wcs=pol_fits.wcs)
pol_peak, pol_noise = extract_pol_stats_in_box(
pol_stats = extract_pol_stats_in_box(
pol_image=pol_fits.data,
pixel_coords=pol_pixel_coords,
search_box_size=leakage_filters.search_box_size,
noise_box_size=leakage_filters.noise_box_size,
mean_box_size=leakage_filters.mean_box_size,
)
frac_values = pol_peak / i_values
frac_values = pol_stats.peak / i_values

logger.info(f"{frac_values.shape=}")
components[f"{pol}_fraction"] = frac_values
components[f"{pol}_peak"] = pol_peak
components[f"{pol}_noise"] = pol_noise
components[f"{pol}_peak"] = pol_stats.peak
components[f"{pol}_noise"] = pol_stats.noise
components[f"{pol}_mean"] = pol_stats.mean

output_path = _get_output_catalogue_path(
input_path=catalogue if isinstance(catalogue, Path) else pol_image,
Expand Down

0 comments on commit cdab453

Please sign in to comment.