Skip to content

Commit

Permalink
Add support for inserting additional tasks and subsets.
Browse files Browse the repository at this point in the history
  • Loading branch information
jtmccann committed Oct 28, 2024
1 parent 69e9300 commit 8d2a8b1
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
26 changes: 26 additions & 0 deletions python/lsst/source/injection/bin/make_injection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,32 @@ def build_argparser():
help="Config override for a task, in the format 'label:key=value'.",
action="append",
)
parser.add_argument(
"-a",
"--additional-pipelines",
type=str,
help="""Location(s) of additional input pipeline definition YAML file(s).
Tasks from these additional pipelines will be added to the output injection pipeline.""",
required=True,
metavar="FILE",
nargs="+",
)
parser.add_argument(
"-s",
"--subset-name",
type=str,
help="""All tasks from any additional pipelines will be added to this subset.
The subset will be created if it does not already exist.""",
metavar="FILE",
)
parser.add_argument(
"-d",
"--subset-description",
type=str,
help="The description given to a new subset which holds tasks from additional pipelines provided."
"Note: this argument is ignored if the subset already exists.",
metavar="FILE",
)
parser.add_argument(
"-h",
"--help",
Expand Down
45 changes: 45 additions & 0 deletions python/lsst/source/injection/utils/_make_injection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def make_injection_pipeline(
prefix: str = "injected_",
instrument: str | None = None,
config: str | list[str] | None = None,
additional_pipelines: list[Pipeline] | list[str] | None = None,
subset_name: str | None = None,
subset_description: str = "",
log_level: int = logging.INFO,
) -> Pipeline:
"""Make an expanded source injection pipeline.
Expand Down Expand Up @@ -122,6 +125,17 @@ def make_injection_pipeline(
Add instrument overrides. Must be a fully qualified class name.
config : `str` | `list` [`str`], optional
Config override for a task, in the format 'label:key=value'.
additional_pipelines: `list`[Pipeline] | `list`[`str`]
Location(s) of additional input pipeline definition YAML file(s).
Tasks from these additional pipelines will be added to the output
injection pipeline.
subset_name: `str`, optional
All tasks from any additional pipelines will be added to this subset.
The subset will be created if it does not already exist.
subset_description: `str`, optional
The description given to a new subset which holds tasks from additional
pipelines provided. Note: this argument is ignored if the subset
already exists.
log_level : `int`, optional
The log level to use for logging.
Expand Down Expand Up @@ -329,4 +343,35 @@ def make_injection_pipeline(
len(injected_subset_labels),
grammar2,
)

# Optionally include additional tasks in the injection pipeline.
if additional_pipelines:
additional_tasks: set[str] = set()
# Record all input task labels and merge all input pipelines into the
# injection pipeline.
for additional_pipeline in additional_pipelines:
if isinstance(additional_pipeline, str):
additional_pipeline = Pipeline.fromFile(additional_pipeline)
additional_tasks.update(additional_pipeline.task_labels)
pipeline.mergePipeline(additional_pipeline)

# Add all tasks to subset_name. If the subset does not exist create it.
if isinstance(subset_name, str):
if subset_name in pipeline.subsets.keys():
for additional_task in additional_tasks:
pipeline.addLabelToSubset(subset_name, additional_task)
subset_grammar = f"the existing subset {subset_name}"
else:
pipeline.addLabeledSubset(subset_name, subset_description, additional_tasks)
subset_grammar = f"a new subset {subset_name}"

# Logging info.
task_grammar = "task" if len(additional_tasks) == 1 else "tasks"
logger.info(
"Added %d %s to %s",
len(additional_tasks),
task_grammar,
subset_grammar,
)

return pipeline

0 comments on commit 8d2a8b1

Please sign in to comment.