Skip to content

Commit

Permalink
Construct source injection unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
leeskelvin committed Sep 7, 2023
1 parent 73631a3 commit 2e3203b
Show file tree
Hide file tree
Showing 6 changed files with 445 additions and 6 deletions.
18 changes: 12 additions & 6 deletions python/lsst/source/injection/utils/make_injection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def _get_dataset_type_names(conns, fields):

def make_injection_pipeline(
dataset_type_name: str,
reference_pipeline: str,
injection_pipeline: str | None = None,
reference_pipeline: Pipeline | str,
injection_pipeline: Pipeline | str | None = None,
exclude_subsets: bool = False,
excluded_tasks: set[str]
| str = {
Expand Down Expand Up @@ -69,9 +69,9 @@ def make_injection_pipeline(
----------
dataset_type_name : `str`
Name of the dataset type being injected into.
reference_pipeline : `str`
reference_pipeline : Pipeline | `str`
Location of a reference pipeline definition YAML file.
injection_pipeline : `str`, optional
injection_pipeline : Pipeline | `str`, optional
Location of an injection pipeline definition YAML file.
exclude_subsets : `bool`, optional
If True, do not update pipeline subsets to include the injection task.
Expand All @@ -94,7 +94,10 @@ def make_injection_pipeline(
logger = logging.getLogger(__name__)
logger.setLevel(log_level)

pipeline = Pipeline.fromFile(reference_pipeline)
if isinstance(reference_pipeline, str):
pipeline = Pipeline.fromFile(reference_pipeline)
else:
pipeline = reference_pipeline

# Add an instrument override, if provided.
if instrument:
Expand Down Expand Up @@ -162,7 +165,10 @@ def make_injection_pipeline(

# Merge the injection pipeline to the modified pipeline, if provided.
if injection_pipeline:
pipeline2 = Pipeline.fromFile(injection_pipeline)
if isinstance(injection_pipeline, str):
pipeline2 = Pipeline.fromFile(injection_pipeline)
else:
pipeline2 = injection_pipeline
if len(pipeline2) != 1:
raise RuntimeError(
f"The injection pipeline contains {len(pipeline2)} tasks; only one task is allowed."
Expand Down
128 changes: 128 additions & 0 deletions python/lsst/source/injection/utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# This file is part of source_injection.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


import numpy as np
from lsst.afw.geom import makeCdMatrix, makeSkyWcs
from lsst.afw.image import makePhotoCalibFromCalibZeroPoint
from lsst.geom import Box2I, Extent2I, Point2D, Point2I, SpherePoint, degrees
from lsst.ip.isr.isrTask import IsrTask
from lsst.meas.algorithms.testUtils import plantSources
from lsst.pipe.base import Pipeline
from lsst.pipe.base.pipelineIR import LabeledSubset
from lsst.pipe.tasks.characterizeImage import CharacterizeImageTask
from lsst.source.injection import generate_injection_catalog


def make_test_exposure():
"""Make a test exposure with a PSF attached and stars placed randomly.
This function generates a noisy synthetic image with Gaussian PSFs injected
into the frame.
The exposure is returned with a WCS, PhotoCalib and PSF attached.
Returns
-------
exposure : `lsst.afw.image.Exposure`
Exposure with calibs attached and stars placed randomly.
"""
# Inspired by meas_algorithms test_dynamicDetection.py.
xy0 = Point2I(12345, 67890) # xy0 for image
dims = Extent2I(2345, 2345) # Dimensions of image
bbox = Box2I(xy0, dims) # Bounding box of image
sigma = 3.21 # PSF sigma
buffer = 4.0 # Buffer for star centers around edge
n_sigma = 5.0 # Number of PSF sigmas for kernel
sky = 12345.6 # Initial sky level
num_stars = 100 # Number of stars
noise = np.sqrt(sky) * np.pi * sigma**2 # Poisson noise per PSF
faint = 1.0 * noise # Faintest level for star fluxes
bright = 100.0 * noise # Brightest level for star fluxes
star_bbox = Box2I(bbox) # Area on image in which we can put star centers
star_bbox.grow(-int(buffer * sigma)) # Shrink star_bbox
pixel_scale = 1.0e-5 * degrees # Pixel scale (1E-5 deg = 0.036 arcsec)

# Make an exposure with a PSF attached; place stars randomly.
rng = np.random.default_rng(12345)
stars = [
(xx, yy, ff, sigma)
for xx, yy, ff in zip(
rng.uniform(star_bbox.getMinX(), star_bbox.getMaxX(), num_stars),
rng.uniform(star_bbox.getMinY(), star_bbox.getMaxY(), num_stars),
np.linspace(faint, bright, num_stars),
)
]
exposure = plantSources(bbox, 2 * int(n_sigma * sigma) + 1, sky, stars, True)

# Set WCS and PhotoCalib.
exposure.setWcs(
makeSkyWcs(
crpix=Point2D(0, 0),
crval=SpherePoint(0, 0, degrees),
cdMatrix=makeCdMatrix(scale=pixel_scale),
)
)
exposure.setPhotoCalib(makePhotoCalibFromCalibZeroPoint(1e10, 1e8))
return exposure


def make_test_injection_catalog(wcs, bbox):
"""Make a test source injection catalog.
This function generates a test source injection catalog consisting of 30
star-like sources of varying magnitude.
Parameters
----------
wcs : `lsst.afw.geom.SkyWcs`
WCS associated with the exposure.
bbox : `lsst.geom.Box2I`
Bounding box of the exposure.
Returns
-------
injection_catalog : `astropy.table.Table`
Source injection catalog.
"""
radec0 = wcs.pixelToSky(bbox.getBeginX(), bbox.getBeginY())
radec1 = wcs.pixelToSky(bbox.getEndX(), bbox.getEndY())
ra_lim = sorted([radec0.getRa().asDegrees(), radec1.getRa().asDegrees()])
dec_lim = sorted([radec0.getDec().asDegrees(), radec1.getDec().asDegrees()])
injection_catalog = generate_injection_catalog(
ra_lim=ra_lim,
dec_lim=dec_lim,
wcs=wcs,
number=10,
source_type="DeltaFunction",
mag=[10.0, 15.0, 20.0],
)
return injection_catalog


def make_test_reference_pipeline():
"""Make a test reference pipeline containing the ISR task."""
reference_pipeline = Pipeline("reference_pipeline")
reference_pipeline.addTask(IsrTask, "isr")
reference_pipeline.addTask(CharacterizeImageTask, "characterizeImage")
reference_pipeline._pipelineIR.labeled_subsets["test_subset"] = LabeledSubset("test_subset", set(), None)
reference_pipeline.addLabelToSubset("test_subset", "isr")
reference_pipeline.addLabelToSubset("test_subset", "characterizeImage")
return reference_pipeline
38 changes: 38 additions & 0 deletions tests/test_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# This file is part of source_injection.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import unittest

from lsst.utils.tests import ImportTestCase


class SourceInjectionImportTestCase(ImportTestCase):
"""Test that every file in source_injection can be imported."""

PACKAGES = {
"lsst.source.injection",
"lsst.source.injection.bin",
"lsst.source.injection.utils",
}


if __name__ == "__main__":
unittest.main()
127 changes: 127 additions & 0 deletions tests/test_inject_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# This file is part of source_injection.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import unittest
from types import GeneratorType

import galsim
import lsst.utils.tests
import numpy as np
from galsim import BoundsI, GSObject
from lsst.geom import Point2D, SpherePoint, degrees
from lsst.source.injection.inject_engine import (
generate_galsim_objects,
inject_galsim_objects_into_exposure,
make_galsim_object,
)
from lsst.source.injection.utils.test_utils import make_test_exposure, make_test_injection_catalog
from lsst.utils.tests import MemoryTestCase, TestCase


class InjectEngineTestCase(TestCase):
"""Test the inject_engine.py module."""

def setUp(self):
"""Set up synthetic source injection inputs.
This method sets up a noisy synthetic image with Gaussian PSFs injected
into the frame, an example source injection catalog, and a generator of
GalSim objects intended for injection.
"""
self.exposure = make_test_exposure()
self.injection_catalog = make_test_injection_catalog(
self.exposure.getWcs(),
self.exposure.getBBox(),
)
self.galsim_objects = generate_galsim_objects(
injection_catalog=self.injection_catalog,
photo_calib=self.exposure.photoCalib,
wcs=self.exposure.wcs,
fits_alignment="wcs",
stamp_prefix="",
)

def tearDown(self):
del self.exposure
del self.injection_catalog
del self.galsim_objects

def test_make_galsim_object(self):
source_data = self.injection_catalog[0]
sky_coords = SpherePoint(float(source_data["ra"]), float(source_data["dec"]), degrees)
pixel_coords = self.exposure.wcs.skyToPixel(sky_coords)
inst_flux = self.exposure.photoCalib.magnitudeToInstFlux(source_data["mag"], pixel_coords)
object = make_galsim_object(
source_data=source_data,
source_type=source_data["source_type"],
inst_flux=inst_flux,
)
self.assertIsInstance(object, GSObject)
self.assertIsInstance(object, getattr(galsim, source_data["source_type"]))

def test_generate_galsim_objects(self):
self.assertTrue(isinstance(self.galsim_objects, GeneratorType))
for galsim_object in self.galsim_objects:
self.assertIsInstance(galsim_object, tuple)
self.assertEqual(len(galsim_object), 4)
self.assertIsInstance(galsim_object[0], SpherePoint) # RA/Dec
self.assertIsInstance(galsim_object[1], Point2D) # x/y
self.assertIsInstance(galsim_object[2], int) # draw size
self.assertIsInstance(galsim_object[3], GSObject) # GSObject

def test_inject_galsim_objects_into_exposure(self):
flux0 = np.sum(self.exposure.image.array)
injected_outputs = inject_galsim_objects_into_exposure(
exposure=self.exposure,
objects=self.galsim_objects,
mask_plane_name="INJECTED",
calib_flux_radius=12.0,
draw_size_max=1000,
)
pc = self.exposure.getPhotoCalib()
inst_fluxes = [float(pc.magnitudeToInstFlux(mag)) for mag in self.injection_catalog["mag"]]
self.assertAlmostEqual(
np.sum(self.exposure.image.array) - flux0,
np.sum(inst_fluxes),
delta=0.0001 * np.sum(inst_fluxes),
)
self.assertEqual(len(injected_outputs[0]), len(self.injection_catalog["ra"]))
self.assertTrue(all(isinstance(injected_output, list) for injected_output in injected_outputs))
self.assertTrue(all(isinstance(item, int) for item in injected_outputs[0])) # draw sizes
self.assertTrue(all(isinstance(item, BoundsI) for item in injected_outputs[1])) # common bounds
self.assertTrue(all(isinstance(item, bool) for item in injected_outputs[2])) # FFT size errors
self.assertTrue(all(isinstance(item, bool) for item in injected_outputs[3])) # PSF compute errors


class MemoryTestCase(MemoryTestCase):
"""Test memory usage of functions in this script."""

pass


def setup_module(module):
"""Configure pytest."""
lsst.utils.tests.init()


if __name__ == "__main__":
lsst.utils.tests.init()
unittest.main()
Loading

0 comments on commit 2e3203b

Please sign in to comment.