From c2d171552153d30a61a13dd27a20cae5e081e9f1 Mon Sep 17 00:00:00 2001 From: Clare Shanahan Date: Thu, 28 Mar 2024 13:06:23 -0400 Subject: [PATCH] added function to measure cross dispersion profile (#214) --- CHANGES.rst | 4 + docs/extraction_quickstart.rst | 2 +- specreduce/extract.py | 56 ++++----- specreduce/tests/test_utils.py | 170 ++++++++++++++++++++++++++ specreduce/utils/__init__.py | 2 + specreduce/utils/utils.py | 212 +++++++++++++++++++++++++++++++++ 6 files changed, 417 insertions(+), 29 deletions(-) create mode 100644 specreduce/tests/test_utils.py create mode 100644 specreduce/utils/utils.py diff --git a/CHANGES.rst b/CHANGES.rst index dd944aa3..1cdddde8 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -11,6 +11,10 @@ New Features the ``interp_degree_interpolated_profile`` parameter) to generate a continuously varying spatial profile that can be evaluated at any wavelength. [#173] +- Added a function to measure a cross-dispersion profile. A profile can be + obtained at a single pixel/wavelength, or an average profile can be obtained + from a range/set of wavelengths. [#214] + API Changes ^^^^^^^^^^^ diff --git a/docs/extraction_quickstart.rst b/docs/extraction_quickstart.rst index 11edb88a..353cb4a8 100644 --- a/docs/extraction_quickstart.rst +++ b/docs/extraction_quickstart.rst @@ -158,7 +158,7 @@ included here. Putting all these steps together, a simple extraction process might look something like:: - from specreduce.trace import FlatTrace + from specreduce.tracing import FlatTrace from specreduce.background import Background from specreduce.extract import BoxcarExtract diff --git a/specreduce/extract.py b/specreduce/extract.py index cabe856f..4b41ca29 100644 --- a/specreduce/extract.py +++ b/specreduce/extract.py @@ -118,34 +118,6 @@ def _ap_weight_image(trace, width, disp_axis, crossdisp_axis, image_shape): return wimage -def _align_along_trace(img, trace_array, disp_axis=1, crossdisp_axis=0): - """ - Given an arbitrary trace ``trace_array`` (an np.ndarray), roll - all columns of ``nddata`` to shift the NDData's pixels nearest - to the trace to the center of the spatial dimension of the - NDData. - """ - # TODO: this workflow does not support extraction for >2D spectra - if not (disp_axis == 1 and crossdisp_axis == 0): - # take the transpose to ensure the rows are the cross-disp axis: - img = img.T - - n_rows, n_cols = img.shape - - # indices of all columns, in their original order - rows = np.broadcast_to(np.arange(n_rows)[:, None], img.shape) - cols = np.broadcast_to(np.arange(n_cols), img.shape) - - # we want to "roll" each column so that the trace sits in - # the central row of the final image - shifts = trace_array.astype(int) - n_rows // 2 - - # we wrap the indices so we don't index out of bounds - shifted_rows = np.mod(rows + shifts[None, :], n_rows) - - return img[shifted_rows, cols] - - @dataclass class BoxcarExtract(SpecreduceOperation): """ @@ -816,6 +788,34 @@ def __call__(self, image=None, trace_object=None, spectral_axis=self.image.spectral_axis) +def _align_along_trace(img, trace_array, disp_axis=1, crossdisp_axis=0): + """ + Given an arbitrary trace ``trace_array`` (an np.ndarray), roll + all columns of ``nddata`` to shift the NDData's pixels nearest + to the trace to the center of the spatial dimension of the + NDData. + """ + # TODO: this workflow does not support extraction for >2D spectra + if not (disp_axis == 1 and crossdisp_axis == 0): + # take the transpose to ensure the rows are the cross-disp axis: + img = img.T + + n_rows, n_cols = img.shape + + # indices of all columns, in their original order + rows = np.broadcast_to(np.arange(n_rows)[:, None], img.shape) + cols = np.broadcast_to(np.arange(n_cols), img.shape) + + # we want to "roll" each column so that the trace sits in + # the central row of the final image + shifts = trace_array.astype(int) - n_rows // 2 + + # we wrap the indices so we don't index out of bounds + shifted_rows = np.mod(rows + shifts[None, :], n_rows) + + return img[shifted_rows, cols] + + @dataclass class OptimalExtract(HorneExtract): """ diff --git a/specreduce/tests/test_utils.py b/specreduce/tests/test_utils.py new file mode 100644 index 00000000..d205fd7c --- /dev/null +++ b/specreduce/tests/test_utils.py @@ -0,0 +1,170 @@ +import numpy as np +import pytest +from astropy.modeling import fitting, models +from specreduce.tracing import FitTrace +from specreduce.utils.utils import measure_cross_dispersion_profile +from specutils import Spectrum1D +from astropy.nddata import NDData +import astropy.units as u + + +def mk_gaussian_img(nrows=20, ncols=16, mean=10, stddev=4): + """ Makes a simple horizontal gaussian image.""" + + # note: this should become a fixture eventually, since other tests use + # similar functions to generate test data. + + np.random.seed(7) + col_model = models.Gaussian1D(amplitude=1, mean=mean, stddev=stddev) + index_arr = np.tile(np.arange(nrows), (ncols, 1)) + + return col_model(index_arr.T) + + +def mk_img_non_flat_trace(nrows=40, ncols=100, amp=10, stddev=2): + """ + Makes an image with a gaussian source that has a non-flat trace dispersed + along the x axis. + """ + spec2d = np.zeros((nrows, ncols)) + + for ii in range(spec2d.shape[1]): + mgaus = models.Gaussian1D(amplitude=amp, + mean=(9.+(20/spec2d.shape[1])*ii), + stddev=stddev) + rg = np.arange(0, spec2d.shape[0], 1) + gaus = mgaus(rg) + spec2d[:, ii] = gaus + + return spec2d + + +class TestMeasureCrossDispersionProfile(): + + @pytest.mark.parametrize('pixel', [None, 1, [1, 2, 3]]) + @pytest.mark.parametrize('width', [10, 9]) + def test_measure_cross_dispersion_profile(self, pixel, width): + """ + Basic test for `measure_cross_dispersion_profile`. Parametrized over + different options for `pixel` to test using all wavelengths, a single + wavelength, and a set of wavelengths, as well as different input types + (plain array, quantity, Spectrum1D, and NDData), as well as `width` to + use a window of all rows and a smaller window. + """ + + # test a few input formats + images = [] + mean = 5.0 + stddev = 4.0 + dat = mk_gaussian_img(nrows=10, ncols=10, mean=mean, stddev=stddev) + images.append(dat) # test unitless + images.append(dat * u.DN) + images.append(NDData(dat * u.DN)) + images.append(Spectrum1D(flux=dat * u.DN)) + + for img in images: + + # use a flat trace at trace_pos=10, a window of width 10 around the trace + # and use all 20 columns in image to create an average (median) + # cross dispersion profile + cdp = measure_cross_dispersion_profile(img, width=width, pixel=pixel) + + # make sure that if we fit a gaussian to the measured average profile, + # that we get out the same profile that was used to create the image. + # this should be exact since theres no noise in the image + fitter = fitting.LevMarLSQFitter() + mod = models.Gaussian1D() + fit_model = fitter(mod, np.arange(width), cdp) + + assert fit_model.mean.value == np.where(cdp == max(cdp))[0][0] + assert fit_model.stddev.value == stddev + + # test passing in a FlatTrace, and check the profile + cdp = measure_cross_dispersion_profile(img, width=width, pixel=pixel) + fit_model = fitter(mod, np.arange(width), cdp) + assert fit_model.mean.value == np.where(cdp == max(cdp))[0][0] + np.testing.assert_allclose(fit_model.stddev.value, stddev) + + @pytest.mark.filterwarnings("ignore:Model is linear in parameters") + def test_cross_dispersion_profile_non_flat_trace(self): + """ + Test measure_cross_dispersion_profile with a non-flat trace. + Tests with 'align_along_trace' set to both True and False, + to account for the changing center of the trace and measure + the true profile shape, or to 'blur' the profile, respectivley. + """ + + image = mk_img_non_flat_trace() + + # fit the trace + trace_fit = FitTrace(image) + + # when not aligning along trace and using the entire image + # rows for the window, the center of the profile should follow + # the shape of the trace + peak_locs = [9, 10, 12, 13, 15, 16, 17, 19, 20, 22, 23, 24, 26, 27, 29] + for i, pixel in enumerate(range(0, image.shape[1], 7)): + profile = measure_cross_dispersion_profile(image, + trace=trace_fit, + width=None, + pixel=pixel, + align_along_trace=False, + statistic='mean') + peak_loc = (np.where(profile == max(profile))[0][0]) + assert peak_loc == peak_locs[i] + + # when align_along_trace = True, the shape of the profile should + # not change since (there is some wiggling around though due to the + # fact that the trace is rolled to the nearest integer value. this can + # be smoothed with an interpolation option later on, but it is 'rough' + # for now). In this test case, the peak positions will all either + # be at pixel 20 or 21. + for i, pixel in enumerate(range(0, image.shape[1], 7)): + profile = measure_cross_dispersion_profile(image, + trace=trace_fit, + width=None, + pixel=pixel, + align_along_trace=True, + statistic='mean') + peak_loc = (np.where(profile == max(profile))[0][0]) + assert peak_loc in [20, 21] + + def test_errors_warnings(self): + img = mk_gaussian_img(nrows=10, ncols=10) + with pytest.raises(ValueError, + match='`crossdisp_axis` must be 0 or 1'): + measure_cross_dispersion_profile(img, crossdisp_axis=2) + + with pytest.raises(ValueError, match='`trace` must be Trace object, ' + 'number to specify the location ' + 'of a FlatTrace, or None to use ' + 'center of image.'): + measure_cross_dispersion_profile(img, trace='not a trace or a number') + + with pytest.raises(ValueError, match="`statistic` must be 'median' " + "or 'mean'."): + measure_cross_dispersion_profile(img, statistic='n/a') + + with pytest.raises(ValueError, match='Both `pixel` and `pixel_range` ' + 'can not be set simultaneously.'): + measure_cross_dispersion_profile(img, pixel=2, pixel_range=(2, 3)) + + with pytest.raises(ValueError, match='`pixels` must be an integer, ' + 'or list of integers to specify ' + 'where the crossdisperion profile ' + 'should be measured.'): + measure_cross_dispersion_profile(img, pixel='str') + + with pytest.raises(ValueError, match='`pixel_range` must be a tuple ' + 'of integers.'): + measure_cross_dispersion_profile(img, pixel_range=(2, 3, 5)) + + with pytest.raises(ValueError, match='Pixels chosen to measure cross ' + 'dispersion profile are out of ' + 'image bounds.'): + measure_cross_dispersion_profile(img, pixel_range=(2, 12)) + + with pytest.raises(ValueError, match='`width` must be an integer, ' + 'or None to use all ' + 'cross-dispersion pixels.'): + measure_cross_dispersion_profile(img, width='.') diff --git a/specreduce/utils/__init__.py b/specreduce/utils/__init__.py index eafc3ed5..b6069c08 100644 --- a/specreduce/utils/__init__.py +++ b/specreduce/utils/__init__.py @@ -1,3 +1,5 @@ """ General purpose utilities for specreduce """ + +from .utils import * # noqa diff --git a/specreduce/utils/utils.py b/specreduce/utils/utils.py new file mode 100644 index 00000000..8cdee452 --- /dev/null +++ b/specreduce/utils/utils.py @@ -0,0 +1,212 @@ +import numpy as np + +from specreduce.core import _ImageParser +from specreduce.tracing import Trace, FlatTrace +from specreduce.extract import _ap_weight_image, _align_along_trace + +__all__ = ['measure_cross_dispersion_profile', '_align_along_trace'] + + +def measure_cross_dispersion_profile(image, trace=None, crossdisp_axis=0, + width=None, pixel=None, pixel_range=None, + statistic='median', align_along_trace=True): + """ + Return a 1D (quantity) array of the cross-dispersion profile measured at a + specified pixel value ('wavelength', but potentially before calibration), + or the average profile across several pixel values (or range of pixel values) + along the dispersion axis. + + If a single number is specified for `pixel`, then the profile at that pixel + (i.e wavelength) will be returned. If several pixels are specified in a list or + array, then they will be averaged (median or mean, set by `statistic` which + defaults to median). Alternatively, `pixel_range` can be specified as a tuple + of integers specifying the minimum and maximum pixel range to average the + profile. `pixel` and `pixel_range` cannot be set simultaneously, and the default + is `pixel_range`=(min_image, max_image) to return an average profile across the + entire wavelength range of the image. + + The window in the cross dispersion axis for measuring the profile at + the pixel(s) specified is determined by `width` and `trace`. If a trace is + provided (either as a Trace object, or as a number specifying the location + on the cross-dispersion axis of a FlatTrace object) that will determine the + center of the profile on the cross-dispersion axis. Otherwise, if `trace` + is None, the center of the image will be the center of the returned profile + (i.e., the center row assuming a horizontal trace). + + If `width` is none, a window size of half the image in the cross-dispersion + axis will be used to measure the cross dispersion profile. Otherwise, an + integer value can be set for `width` which will determine the size of the + window around the trace used to measure the profile (this window moves with + the trace if trace is not flat). + + By default, if a non-flat trace is used the image will be aligned along the + trace. This can be controlled with the 'align_along_trace' parameter. + + Parameters + ---------- + image : `~astropy.nddata.NDData`-like or array-like, required + 2D image to measure cross-dispersion profile. + trace : Trace object, int, float, or None + A Trace object, a number to specify a FlatTrace, or None to use the + middle of the image. This is the position that defines the center of the + cross-dispersion profile. [default: None] + crossdisp_axis : int, optional + The index of the image's cross-dispersion axis. [default: 0] + width : tuple of int or None + Width around 'trace' to calculate profile. If None, then all rows in the + cross-dispersion axis will be used. [default: None] + pixel: int, list of int, or None + Pixel value(s) along the dispersion axis to return cross-dispersion profile. + If several specified in list, then the average (method set by `statistic`) + profile will be calculated. If None, and `pixel_range` is set, then + `pixel_range` will be used. [default: None] + pixel_range: tuple of int or None + Tuple of (min, max) defining the pixel range (along dispersion axis) to + calculate average cross-dispersion profile, up to and not inclusive of max. + If None, and `pixel` is not None, `pixel` will be used. If None and `pixel` + is also None, this will be interpreted as using the entire dispersion axis + to generate an average profile for the whole image. [default: None] + statistic: 'median' or 'mean' + If `pixel` specifies multiple pixels, or `pixel_range` is specified, an + average profile will be returned. This can be either `median` (default) + or `mean`. This is ignored if only one pixel is specified. [default: median] + align_along_trace: bool + Relevant only for non-flat traces. If True, "roll" each column so that + the trace sits in the central row before calculating average profile. This + will prevent any 'blurring' from averaging a non-flat trace at different + pixel/wavelengths. [default: True] + + """ + + if crossdisp_axis != 0 and crossdisp_axis != 1: + raise ValueError('`crossdisp_axis` must be 0 or 1.') + crossdisp_axis = int(crossdisp_axis) + disp_axis = 1 if crossdisp_axis == 0 else 0 + + unit = getattr(image, 'unit', None) + + # parse image, which will return a spectrum1D (note: this is not ideal, + # but will be addressed at some point) + parser = _ImageParser() + image = parser._parse_image(image, disp_axis=disp_axis) + + # which we then need to make back into a masked array + # again this way of parsing the image is not ideal but + # thats just how it is for now. + image = np.ma.MaskedArray(image.data, mask=image.mask) + + # transpose if disp_axis = 0 just for simplicity of calculations + # image is already copied so this won't modify input + if disp_axis == 0: + image = image.T + + nrows = image.shape[crossdisp_axis] + ncols = image.shape[disp_axis] + + if not isinstance(trace, Trace): # `trace` can be a trace obj + if trace is None: # if None, make a FlatTrace in the center of image + trace_pos = nrows / 2 + trace = FlatTrace(image, trace_pos) + elif isinstance(trace, (float, int)): # if float/int make a FlatTrace + trace = FlatTrace(image, trace) + else: + raise ValueError('`trace` must be Trace object, number to specify ' + 'the location of a FlatTrace, or None to use center' + ' of image.') + + if statistic not in ['median', 'mean']: + raise ValueError("`statistic` must be 'median' or 'mean'.") + + # determine if there is one pixel/wavelength selected or many as either a + # list or a tuple to specify a range + if pixel is not None: + if pixel_range is not None: + raise ValueError('Both `pixel` and `pixel_range` can not be set' + ' simultaneously.') + if isinstance(pixel, (int, float)): + pixels = np.array([int(pixel)]) + elif np.all([isinstance(x, (int, float)) for x in pixel]): + pixels = np.array([int(x) for x in pixel]) + else: + raise ValueError('`pixels` must be an integer, or list of integers ' + 'to specify where the crossdisperion profile should ' + 'be measured.') + else: # range is specified + if pixel_range is None: + pixels = np.arange(0, ncols) + else: # if not None, it should be a lower and upper bound + if len(pixel_range) != 2: + raise ValueError('`pixel_range` must be a tuple of integers.') + pixels = np.arange(min(pixel_range), max(pixel_range)) + + # now that we have all pixels that should be included in the profile, make + # sure that they are within image bounds. + # note: Should just warn instead and clip out out-of-bounds pixels, and only + # warn if there are none left? + if np.any(pixels < 0) or np.any(pixels > ncols - 1): + raise ValueError('Pixels chosen to measure cross dispersion profile are' + ' out of image bounds.') + + # now that we know which pixel(s) on the disp. axis we want to include + # figure out the range/window of pixels along the crossdisp axis to measure + # the profile + if width is None: # if None, use all rows + width = nrows + elif isinstance(width, (float, int)): + width = int(width) + else: + raise ValueError('`width` must be an integer, or None to use all ' + 'cross-dispersion pixels.') + width = int(width) + + # rectify trace, if _align_along_trace is True and trace is not flat + aligned_trace = None + if align_along_trace: + if not isinstance(trace, FlatTrace): + # note: img was transposed according to `crossdisp_axis`: disp_axis will always be 1 + aligned_trace = _align_along_trace(image, trace.trace, + disp_axis=1, + crossdisp_axis=0) + + # new trace will be a flat trace at the center of the image + trace_pos = nrows / 2 + trace = FlatTrace(aligned_trace, trace_pos) + + # create a weight image based on the trace and 'width' to mask around trace + + if width == nrows: + wimg = np.zeros(image.shape) + else: + wimg = _ap_weight_image(trace, width, disp_axis, crossdisp_axis, image.shape) + # invert mask to include, not exclude, pixels around trace + wimg = (1 - wimg).astype(int) + + # now that we have figured out the mask for the window in cross-disp. axis, + # select only the pixel(s) we want to include in measuring the avg. profile + pixel_mask = np.ones((image.shape)) + pixel_mask[:, pixels] = 0 + + # combine these masks to isolate the rows and cols used to measure profile + combined_mask = np.logical_or(pixel_mask, wimg) + + if aligned_trace is not None: + masked_arr = np.ma.MaskedArray(aligned_trace, combined_mask) + else: + masked_arr = np.ma.MaskedArray(image.data, combined_mask) + + # and measure the cross dispersion profile. if multiple pixels/wavelengths, + # this will be an average. we already transposed data based on disp_axis so + # axis is always 1 for this calculation + if statistic == 'mean': + avg_prof = np.ma.mean(masked_arr, axis=1) + else: # must be median, we already checked. + avg_prof = np.ma.median(masked_arr, axis=1) + + # and get profile + avg_prof = avg_prof.data[~avg_prof.mask] + + # and re-apply original unit, if there was one + if unit is not None: + avg_prof *= unit + + return avg_prof