diff --git a/python/lsst/ip/isr/ampOffset.py b/python/lsst/ip/isr/ampOffset.py
index cc59c6ffb..5383ad780 100644
--- a/python/lsst/ip/isr/ampOffset.py
+++ b/python/lsst/ip/isr/ampOffset.py
@@ -18,88 +18,412 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-# import os
__all__ = ["AmpOffsetConfig", "AmpOffsetTask"]
-import lsst.pex.config as pexConfig
-import lsst.pipe.base as pipeBase
-from lsst.meas.algorithms import (SubtractBackgroundTask, SourceDetectionTask)
+import warnings
+import numpy as np
+from lsst.afw.math import MEANCLIP, StatisticsControl, makeStatistics
+from lsst.afw.table import SourceTable
+from lsst.meas.algorithms import SourceDetectionTask, SubtractBackgroundTask
+from lsst.pex.config import Config, ConfigurableField, Field
+from lsst.pipe.base import Task
-class AmpOffsetConfig(pexConfig.Config):
- """Configuration parameters for AmpOffsetTask.
- """
- ampEdgeInset = pexConfig.Field(
+
+class AmpOffsetConfig(Config):
+ """Configuration parameters for AmpOffsetTask."""
+
+ def setDefaults(self):
+ # The `binSize`` should be as large as possible, preventing background
+ # subtraction from inadvertently removing the amp offset signature.
+ # Here it's set to the shorter dimension of an LSST amp by default,
+ # which seems reasonable for LSST.
+ self.background.binSize = 509 # Amp width for LSSTCam
+ self.background.algorithm = "AKIMA_SPLINE"
+ self.background.useApprox = False
+ self.background.ignoredPixelMask = [
+ "BAD",
+ "SAT",
+ "INTRP",
+ "CR",
+ "EDGE",
+ "DETECTED",
+ "DETECTED_NEGATIVE",
+ "SUSPECT",
+ "NO_DATA",
+ ]
+ self.detection.reEstimateBackground = False
+
+ ampEdgeInset = Field(
doc="Number of pixels the amp edge strip is inset from the amp edge. A thin strip of pixels running "
"parallel to the edge of the amp is used to characterize the average flux level at the amp edge.",
dtype=int,
default=5,
)
- ampEdgeWidth = pexConfig.Field(
+ ampEdgeWidth = Field(
doc="Pixel width of the amp edge strip, starting at ampEdgeInset and extending inwards.",
dtype=int,
default=64,
)
- ampEdgeMinFrac = pexConfig.Field(
+ ampEdgeMinFrac = Field(
doc="Minimum allowed fraction of viable pixel rows along an amp edge. No amp offset estimate will be "
"generated for amp edges that do not have at least this fraction of unmasked pixel rows.",
dtype=float,
default=0.5,
)
- ampEdgeMaxOffset = pexConfig.Field(
+ ampEdgeMaxOffset = Field(
doc="Maximum allowed amp offset ADU value. If a measured amp offset value is larger than this, the "
"result will be discarded and therefore not used to determine amp pedestal corrections.",
dtype=float,
default=5.0,
)
- ampEdgeWindow = pexConfig.Field(
- doc="Pixel size of the sliding window used to generate rolling average amp offset values.",
+ ampEdgeWindow = Field(
+ doc="Pixel size of the sliding window used to generate rolling average amp offset values. It should "
+ "not exceed the shorter dimension of amps.",
dtype=int,
- default=512,
+ default=None, # This should be reconfigured for every instrument (HSC, LSSTCam, etc.)
)
- doBackground = pexConfig.Field(
+ doBackground = Field(
doc="Estimate and subtract background prior to amp offset estimation?",
dtype=bool,
default=True,
)
- background = pexConfig.ConfigurableField(
+ background = ConfigurableField(
doc="An initial background estimation step run prior to amp offset calculation.",
target=SubtractBackgroundTask,
)
- doDetection = pexConfig.Field(
+ doDetection = Field(
doc="Detect sources and update cloned exposure prior to amp offset estimation?",
dtype=bool,
default=True,
)
- detection = pexConfig.ConfigurableField(
+ detection = ConfigurableField(
doc="Source detection to add temporary detection footprints prior to amp offset calculation.",
target=SourceDetectionTask,
)
-class AmpOffsetTask(pipeBase.Task):
- """Calculate and apply amp offset corrections to an exposure.
- """
+class AmpOffsetTask(Task):
+ """Calculate and apply amp offset corrections to an exposure."""
+
ConfigClass = AmpOffsetConfig
_DefaultName = "isrAmpOffset"
def __init__(self, *args, **kwargs):
- super().__init__(**kwargs)
- # always load background subtask, even if doBackground=False;
- # this allows for default plane bit masks to be defined
+ super().__init__(*args, **kwargs)
+ # Always load background subtask, even if doBackground=False;
+ # this allows for default plane bit masks to be defined.
self.makeSubtask("background")
if self.config.doDetection:
self.makeSubtask("detection")
def run(self, exposure):
"""Calculate amp offset values, determine corrective pedestals for each
- amp, and update the input exposure in-place. This task is currently not
- implemented, and should be retargeted by a camera specific version.
+ amp, and update the input exposure in-place.
+
+ Parameters
+ ----------
+ exposure: `lsst.afw.image.Exposure`
+ Exposure to be corrected for amp offsets.
+ """
+
+ # Generate an exposure clone to work on and establish the bit mask.
+ exp = exposure.clone()
+ bitMask = exp.mask.getPlaneBitMask(self.background.config.ignoredPixelMask)
+
+ # Fit and subtract background.
+ if self.config.doBackground:
+ maskedImage = exp.getMaskedImage()
+ bg = self.background.fitBackground(maskedImage)
+ bgImage = bg.getImageF(self.background.config.algorithm, self.background.config.undersampleStyle)
+ maskedImage -= bgImage
+
+ # Detect sources and update cloned exposure mask planes in-place.
+ if self.config.doDetection:
+ schema = SourceTable.makeMinimalSchema()
+ table = SourceTable.make(schema)
+ # Detection sigma, used for smoothing and to grow detections, is
+ # normally measured from the PSF of the exposure. As the PSF hasn't
+ # been measured at this stage of processing, sigma is instead
+ # set to an approximate value here (which should be sufficient).
+ _ = self.detection.run(table=table, exposure=exp, sigma=2)
+
+ # Safety check: do any pixels remain for amp offset estimation?
+ if (exp.mask.array & bitMask).all():
+ self.log.warning("All pixels masked: cannot calculate any amp offset corrections.")
+ else:
+ # Set up amp offset inputs.
+ im = exp.image
+ im.array[(exp.mask.array & bitMask) > 0] = np.nan
+ amps = exp.getDetector().getAmplifiers()
+
+ min_dimension = np.min([np.min(amp.getBBox().getDimensions()) for amp in amps])
+ if self.config.ampEdgeWindow is None:
+ self.config.ampEdgeWindow = min_dimension
+ self.log.info(
+ f"ampEdgeWindow automatically set to shortest amp side ({min_dimension} pixels) as no "
+ "input provided."
+ )
+ elif self.config.ampEdgeWindow > min_dimension:
+ raise RuntimeError(
+ f"The sliding window ({self.config.ampEdgeWindow} pixels) is larger than the smaller "
+ f"dimension in amplifiers ({min_dimension} pixels). This can lead to downstream issues "
+ "after convolution in the `getSideAmpOffset()` method. Please adjust `ampEdgeWindow` "
+ f"in the config to a value equal to or less than {min_dimension} pixels and rerun."
+ )
+
+ # Determine amplifier side geometry.
+ ampAreas = {amp.getBBox().getArea() for amp in amps}
+ if len(ampAreas) > 1:
+ raise NotImplementedError(
+ "Amp offset correction is not yet implemented for detectors with differing amp sizes."
+ )
+
+ # Obtain association and offset matrices.
+ A, sides = self.getAmpAssociations(amps)
+ B = self.getAmpOffsets(im, amps, A, sides)
+
+ # If least-squares minimization fails, convert NaNs to zeroes,
+ # ensuring that no values are erroneously added/subtracted
+ pedestals = np.nan_to_num(np.linalg.lstsq(A, B, rcond=None)[0])
+ metadata = exposure.getMetadata()
+ for ii, (amp, pedestal) in enumerate(zip(amps, pedestals)):
+ ampIm = exposure.image[amp.getBBox()].array
+ ampIm -= pedestal
+ metadata.set(
+ f"PEDESTAL{ii + 1}", float(pedestal), f"Pedestal level subtracted from amp {ii + 1}"
+ )
+ self.log.info(f"amp pedestal values: {', '.join([f'{x:.4f}' for x in pedestals])}")
+
+ def getAmpAssociations(self, amps):
+ """Determine amp geometry and amp associations from a list of
+ amplifiers.
+
+ Parse an input list of amplifiers to determine the layout of amps
+ within a detector, and identify all amp sides (i.e., the
+ horizontal and vertical junctions between amps).
+
+ Returns a matrix with a shape corresponding to the geometry of the amps
+ in the detector.
+
+ Parameters
+ ----------
+ amps : `list` [`lsst.afw.cameraGeom.Amplifier`]
+ List of amplifier objects used to deduce associations.
+
+ Returns
+ -------
+ ampAssociations : `numpy.ndarray`
+ An N x N matrix (N = the number of amplifiers) providing the
+ association information for amplifiers, indicating their
+ connections within the detector layout. The elements are defined as
+ below: TODO
+ -1: ...
+ ...: ...
+ ampSides : `numpy.ndarray`
+ An N x N matrix (N = the number of amplifiers) representing the amp
+ side information corresponding to the `ampAssociations`
+ matrix. The elements are integers defined as below:
+ -1: no side
+ 0: side on the bottom
+ 1: side on the right
+ 2: side on the top
+ 3: side on the left
+ """
+ xCenters = [amp.getBBox().getCenterX() for amp in amps]
+ yCenters = [amp.getBBox().getCenterY() for amp in amps]
+ xIndices = np.ceil(xCenters / np.min(xCenters) / 2).astype(int) - 1
+ yIndices = np.ceil(yCenters / np.min(yCenters) / 2).astype(int) - 1
+
+ nAmps = len(amps)
+ ampIds = np.zeros((len(set(yIndices)), len(set(xIndices))), dtype=int)
+
+ for ampId, xIndex, yIndex in zip(np.arange(nAmps), xIndices, yIndices):
+ ampIds[yIndex, xIndex] = ampId
+
+ ampAssociations = np.zeros((nAmps, nAmps), dtype=int)
+ ampSides = np.full_like(ampAssociations, -1)
+
+ for ampId in ampIds.ravel():
+ neighbors, sides = self.getNeighbors(ampIds, ampId)
+ ampAssociations[ampId, neighbors] = -1
+ ampSides[ampId, neighbors] = sides
+ ampAssociations[ampId, ampId] = -ampAssociations[ampId].sum()
+
+ if ampAssociations.sum() != 0:
+ raise RuntimeError("The `ampAssociations` array does not sum to zero.")
+
+ self.log.debug("amp associations:\n%s", ampAssociations)
+ self.log.debug("amp sides:\n%s", ampSides)
+
+ return ampAssociations, ampSides
+
+ def getNeighbors(self, ampIds, ampId):
+ """Get the neighbor amplifiers and their sides for a given
+ amplifier.
+
+ Parameters
+ ----------
+ ampIds : `numpy.ndarray`
+ Matrix with amp side association information.
+ ampId : `int`
+ The amplifier ID for which neighbor amplifiers and side IDs
+ are to be found.
+
+ Returns
+ -------
+ neighbors : `list` [`int`]
+ List of neighbor amplifier IDs.
+ sides : `list` [`int`]
+ List of side IDs, with each ID corresponding to its respective
+ neighbor amplifier.
+ """
+ m, n = ampIds.shape
+ r, c = np.ravel(np.where(ampIds == ampId))
+ neighbors, sides = [], []
+ sideLookup = {
+ 0: (r + 1, c),
+ 1: (r, c + 1),
+ 2: (r - 1, c),
+ 3: (r, c - 1),
+ }
+ for side, (row, column) in sideLookup.items():
+ if 0 <= row < m and 0 <= column < n:
+ neighbors.append(ampIds[row][column])
+ sides.append(side)
+ return neighbors, sides
+
+ def getInterfaceOffset(self, ampIdA, ampIdB, edgeA, edgeB):
+ """Calculate the amp offset for a given interface between two
+ amplifiers.
Parameters
----------
- exposure : `lsst.afw.image.Exposure`
- Exposure to be corrected for any amp offsets.
+ ampIdA : int
+ ID of the first amplifier.
+ ampIdB : int
+ ID of the second amplifier.
+ edgeA : numpy.ndarray
+ Amp edge for the first amplifier.
+ edgeB : numpy.ndarray
+ Amp edge for the second amplifier.
+
+ Returns
+ -------
+ interfaceOffset : float
+ The calculated amp offset value for the given interface between
+ amps A and B.
+ """
+ sctrl = StatisticsControl()
+ # NOTE: Taking the difference with the order below reverses the sign
+ # flip in the B matrix
+ edgeDiff = edgeA - edgeB
+ # Compute rolling averages
+ edgeDiffSum = np.convolve(np.nan_to_num(edgeDiff), np.ones(self.config.ampEdgeWindow), "same")
+ edgeDiffNum = np.convolve(~np.isnan(edgeDiff), np.ones(self.config.ampEdgeWindow), "same")
+ edgeDiffAvg = edgeDiffSum / np.clip(edgeDiffNum, 1, None)
+ edgeDiffAvg[np.isnan(edgeDiff)] = np.nan
+ # Take clipped mean of rolling average data as amp offset value
+ interfaceOffset = makeStatistics(edgeDiffAvg, MEANCLIP, sctrl).getValue()
+ # Perform a couple of do-no-harm safety checks:
+ # a) The fraction of unmasked pixel rows is > ampEdgeMinFrac,
+ # b) The absolute offset ADU value is < ampEdgeMaxOffset
+ ampEdgeGoodFrac = 1 - (np.sum(np.isnan(edgeDiffAvg)) / len(edgeDiffAvg))
+ minFracFail = ampEdgeGoodFrac < self.config.ampEdgeMinFrac
+ maxOffsetFail = np.abs(interfaceOffset) > self.config.ampEdgeMaxOffset
+ if minFracFail or maxOffsetFail:
+ interfaceOffset = 0
+ interfaceId = f"{ampIdA}{ampIdB}"
+ self.log.debug(
+ f"amp interface {interfaceId} : "
+ f"viable edge difference frac = {ampEdgeGoodFrac}, "
+ f"interface offset = {interfaceOffset:.3f}"
+ )
+ return interfaceOffset
+
+ def getAmpOffsets(self, im, amps, associations, sides):
+ """Calculate the amp offsets for all amplifiers.
+
+ Parameters
+ ----------
+ im : `lsst.afw.image._image.ImageF`
+ Amplifier image to extract data from.
+ amps : `list` [`lsst.afw.cameraGeom.Amplifier`]
+ List of amplifier objects.
+ associations : numpy.ndarray
+ An N x N matrix containing amp association information, where N is
+ the number of amplifiers.
+ sides : numpy.ndarray
+ An N x N matrix containing amp side information, where N is the
+ number of amplifiers.
+
+ Returns
+ -------
+ ampsOffsets : `numpy.ndarray`
+ 1D float array containing the calculated amp offsets for all
+ amplifiers.
+ """
+ ampsOffsets = np.zeros(len(amps))
+ ampsEdges = self.getAmpEdges(im, amps, sides)
+ interfaceOffsetLookup = {}
+ for ampId, ampAssociations in enumerate(associations):
+ ampNeighbors = np.ravel(np.where(ampAssociations < 0))
+ for ampNeighbor in ampNeighbors:
+ ampSide = sides[ampId][ampNeighbor]
+ edgeA = ampsEdges[ampId][ampSide]
+ edgeB = ampsEdges[ampNeighbor][(ampSide + 2) % 4]
+ if ampId < ampNeighbor:
+ interfaceOffset = self.getInterfaceOffset(ampId, ampNeighbor, edgeA, edgeB)
+ interfaceOffsetLookup[f"{ampId}{ampNeighbor}"] = interfaceOffset
+ else:
+ interfaceOffset = -interfaceOffsetLookup[f"{ampNeighbor}{ampId}"]
+ ampsOffsets[ampId] += interfaceOffset
+ return ampsOffsets
+
+ def getAmpEdges(self, im, amps, ampSides):
+ """Calculate the amp edges for all amplifiers.
+
+ Parameters
+ ----------
+ im : `lsst.afw.image._image.ImageF`
+ Amplifier image to extract data from.
+ amps : `list` [`lsst.afw.cameraGeom.Amplifier`]
+ List of amplifier objects.
+ ampSides : `numpy.ndarray`
+ An N x N matrix containing amp side information, where N is the
+ number of amplifiers.
+
+ Returns
+ -------
+ ampEdges : `dict` [`int`, `dict` [`int`, `numpy.ndarray`]]
+ A dictionary containing amp edge(s) for each amplifier, corresponding
+ to one or more potential sides, where each edge is associated with
+ an side. The outer dictionary has integer keys representing
+ amplifier IDs, and the inner dictionary has integer keys representing
+ side IDs for each amplifier and values that are 1D arrays of
+ floats representing the 1D medianified strips from the amp image,
+ referred to as "amp edge":
+ {ampID: {sideID: numpy.ndarray}, ...}
"""
- raise NotImplementedError("Amp offset task should be retargeted by a camera specific version.")
+ ampEdgeOuter = self.config.ampEdgeInset + self.config.ampEdgeWidth
+ ampEdges = {}
+ slice_map = {
+ 0: (slice(-ampEdgeOuter, -self.config.ampEdgeInset), slice(None)),
+ 1: (slice(None), slice(-ampEdgeOuter, -self.config.ampEdgeInset)),
+ 2: (slice(self.config.ampEdgeInset, ampEdgeOuter), slice(None)),
+ 3: (slice(None), slice(self.config.ampEdgeInset, ampEdgeOuter)),
+ }
+ for ampId, (amp, ampSides) in enumerate(zip(amps, ampSides)):
+ ampEdges[ampId] = {}
+ ampIm = im[amp.getBBox()].array
+ # Loop over identified sides
+ for ampSide in ampSides:
+ if ampSide < 0:
+ continue
+ strip = ampIm[slice_map[ampSide]]
+ # Catch warnings to prevent all-NaN slice RuntimeWarning
+ with warnings.catch_warnings():
+ warnings.filterwarnings("ignore", r"All-NaN (slice|axis) encountered")
+ ampEdges[ampId][ampSide] = np.nanmedian(strip, axis=ampSide % 2) # 1D medianified strip
+ return ampEdges
diff --git a/tests/test_ampOffset.py b/tests/test_ampOffset.py
new file mode 100644
index 000000000..cd396c677
--- /dev/null
+++ b/tests/test_ampOffset.py
@@ -0,0 +1,113 @@
+# This file is part of ip_isr.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import unittest
+import numpy as np
+
+import lsst.utils.tests
+import lsst.afw.image as afwImage
+from lsst.obs.lsst import LsstCam
+from lsst.obs.subaru import HyperSuprimeCam
+from lsst.ip.isr.ampOffset import AmpOffsetConfig, AmpOffsetTask
+
+
+class AmpOffsetTest(lsst.utils.tests.TestCase):
+ def setUp(self):
+ self.camera = None # `lsst.afw.cameraGeom.Camera` object
+ self.bgbinSize = None
+ self.ampEdgeWindow = None
+
+ def tearDown(self):
+ del self.camera
+ del self.bgbinSize
+ del self.ampEdgeWindow
+
+ def buildExposure(self, addBackground=False):
+ exp = afwImage.ExposureF(self.camera[0].getBBox())
+ exp.setDetector(self.camera[0])
+ amps = exp.getDetector().getAmplifiers()
+ nAmps = len(amps)
+ self.values = np.linspace(-2.5, 2.5, nAmps)
+ for i in range(nAmps):
+ exp.image[amps[i].getBBox()] = self.values[i]
+ if addBackground:
+ exp.image.array += 100
+ return exp
+
+ def testAmpOffset(self):
+ if self.camera is None:
+ raise unittest.SkipTest("Skipping due to no camera set.")
+ exp = self.buildExposure(addBackground=False)
+ config = AmpOffsetConfig()
+ config.background.binSize = self.bgbinSize
+ config.ampEdgeWindow = self.ampEdgeWindow
+ config.doBackground = False
+ config.doDetection = False
+ task = AmpOffsetTask(config=config)
+ task.run(exp)
+ meta = exp.getMetadata().toDict()
+ self.assertEqual(np.sum(exp.image.array), 0)
+ for i, value in enumerate(self.values):
+ self.assertAlmostEqual(meta[f"PEDESTAL{i+1}"], value, 6)
+
+ def testAmpOffsetWithBackground(self):
+ if self.camera is None:
+ raise unittest.SkipTest("Skipping due to no camera set.")
+ exp = self.buildExposure(addBackground=True)
+ config = AmpOffsetConfig()
+ config.background.binSize = self.bgbinSize
+ config.ampEdgeWindow = self.ampEdgeWindow
+ config.doBackground = True
+ config.doDetection = True
+ task = AmpOffsetTask(config=config)
+ task.run(exp)
+ meta = exp.getMetadata().toDict()
+ nAmps = len(meta)
+ for i in range(nAmps // 2):
+ self.assertAlmostEqual(meta[f"PEDESTAL{i+1}"], -meta[f"PEDESTAL{nAmps-i}"], 7)
+
+
+class AmpOffsetTestSubaru(AmpOffsetTest):
+ def setUp(self):
+ instrument = HyperSuprimeCam()
+ self.camera = instrument.getCamera()
+ self.bgbinSize = 512 # Width of an HSC amp
+ self.ampEdgeWindow = 512
+
+
+class AmpOffsetTestRubin(AmpOffsetTest):
+ def setUp(self):
+ instrument = LsstCam()
+ self.camera = instrument.getCamera()
+ self.bgbinSize = 509 # Width of an LSST amp
+ self.ampEdgeWindow = 509
+
+
+class TestMemory(lsst.utils.tests.MemoryTestCase):
+ pass
+
+
+def setup_module(module):
+ lsst.utils.tests.init()
+
+
+if __name__ == "__main__":
+ lsst.utils.tests.init()
+ unittest.main()