Skip to content

Commit

Permalink
Merge branch 'tickets/DM-40911'
Browse files Browse the repository at this point in the history
  • Loading branch information
leeskelvin committed Sep 29, 2023
2 parents 98d669c + 035eeb4 commit 44799a0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 8 deletions.
6 changes: 4 additions & 2 deletions python/lsst/source/injection/bin/make_injection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def build_argparser():
"-i",
"--injection-pipeline",
type=str,
help="Location of an injection pipeline definition YAML file.",
help="Location of an injection pipeline definition YAML file stub. If "
"this is not explicitly provided, an attempt to infer the injection "
"pipeline stub will be made using the injected dataset type name.",
metavar="FILE",
)
parser.add_argument(
Expand Down Expand Up @@ -146,4 +148,4 @@ def main():
pipeline = make_injection_pipeline(
**{k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]}
)
print(pipeline)
print("\n", pipeline, sep="")
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def ingest_injection_catalog(
table: Table | list[Table],
band: str,
output_collection: str,
dataset_type_name: str,
dataset_type_name: str = "injection_catalog",
log_level: int = logging.INFO,
) -> list[DatasetRef]:
"""Ingest a source table into the butler.
Expand All @@ -58,7 +58,7 @@ def ingest_injection_catalog(
Band associated with the input source table(s).
output_collection : `str`
Name of the output collection to ingest the consolidated table into.
dataset_type_name : `str`
dataset_type_name : `str`, optional
Dataset type name for the ingested consolidated table.
log_level : `int`, optional
The log level to use for logging.
Expand Down
52 changes: 48 additions & 4 deletions python/lsst/source/injection/utils/make_injection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def make_injection_pipeline(
reference_pipeline : Pipeline | `str`
Location of a reference pipeline definition YAML file.
injection_pipeline : Pipeline | `str`, optional
Location of an injection pipeline definition YAML file.
Location of an injection pipeline definition YAML file stub. If not
provided, an attempt to infer the injection pipeline will be made based
on the injected dataset type name.
exclude_subsets : `bool`, optional
If True, do not update pipeline subsets to include the injection task.
excluded_tasks : `set` [`str`] | `str`
Expand Down Expand Up @@ -106,6 +108,7 @@ def make_injection_pipeline(
# Remove all tasks which are not to be included in the injection pipeline.
if isinstance(excluded_tasks, str):
excluded_tasks = set(excluded_tasks.split(","))
not_excluded_tasks = set()
for task_label in excluded_tasks:
# First remove tasks from their host subsets, if present.
try:
Expand All @@ -119,9 +122,17 @@ def make_injection_pipeline(
try:
pipeline.removeTask(task_label)
except KeyError:
logger.warning("Task '%s' not found in pipeline; nothing to exclude.", task_label)

# Determine the set of dataset type names affected by source injection
not_excluded_tasks.add(task_label)
if len(not_excluded_tasks) > 0:
grammar = "Task" if len(not_excluded_tasks) == 1 else "Tasks"
logger.warning(
"%s marked for exclusion not found in the reference pipeline: %s.",
grammar,
", ".join(sorted(not_excluded_tasks)),
)

# Determine the set of dataset type names affected by source injection.
all_connection_type_names = set()
injected_types = {dataset_type_name}
precursor_injection_task_labels = set()
# Loop over all tasks in the pipeline.
Expand All @@ -142,6 +153,7 @@ def make_injection_pipeline(
conns = taskDef.connections
input_types = _get_dataset_type_names(conns, conns.inputs)
output_types = _get_dataset_type_names(conns, conns.outputs)
all_connection_type_names |= input_types | output_types
if dataset_type_name in output_types:
precursor_injection_task_labels.add(taskDef.label)
# If the task has any injected dataset type names as inputs, add all of
Expand All @@ -162,6 +174,36 @@ def make_injection_pipeline(
field,
taskDef.label,
)
# Raise if the injected dataset type does not exist in the pipeline.
if dataset_type_name not in all_connection_type_names:
raise RuntimeError(
f"Dataset type '{dataset_type_name}' not found in the reference pipeline; "
"no connection type edits to be made."
)

# Attempt to infer the injection pipeline from the dataset type name.
if not injection_pipeline:
match dataset_type_name:
case "postISRCCD":
injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_exposure.yaml"
case "icExp" | "calexp":
injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_visit.yaml"
case "deepCoadd" | "deepCoadd_calexp" | "goodSeeingCoadd":
injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_coadd.yaml"
case _:
# Print a warning rather than a raise, as the user may wish to
# edit connection names without merging an injection pipeline.
logger.warning(
"Unable to infer injection pipeline stub from dataset type name '%s' and none was "
"provided. No injection pipeline will be merged into the output pipeline.",
dataset_type_name,
)
if injection_pipeline:
logger.info(
"Injected dataset type '%s' used to infer injection pipeline: %s",
dataset_type_name,
injection_pipeline,
)

# Merge the injection pipeline to the modified pipeline, if provided.
if injection_pipeline:
Expand Down Expand Up @@ -189,4 +231,6 @@ def make_injection_pipeline(
precursor_subsets = pipeline.findSubsetsWithLabel(label)
for subset in precursor_subsets:
pipeline.addLabelToSubset(subset, injection_taskDef.label)

logger.info("Made an injection pipeline containing %d tasks.", len(pipeline))
return pipeline

0 comments on commit 44799a0

Please sign in to comment.