From 94d17cf7c870e4629f9e98d3575fd7a7b9977da2 Mon Sep 17 00:00:00 2001 From: Liam Gray Date: Fri, 31 May 2024 14:54:31 -0700 Subject: [PATCH] feat(pipeline): better handling for key rejection --- caput/pipeline.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/caput/pipeline.py b/caput/pipeline.py index 9c0012ee..0cc19b0e 100644 --- a/caput/pipeline.py +++ b/caput/pipeline.py @@ -393,6 +393,7 @@ import logging import os import queue +import re import traceback import warnings from copy import deepcopy @@ -488,7 +489,7 @@ def _get_versions(modules): class Manager(config.Reader): - """Pipeline manager for setting up and running pipeline tasks. + r"""Pipeline manager for setting up and running pipeline tasks. The manager is in charge of initializing all pipeline tasks, setting them up by providing the appropriate parameters, then executing the methods of @@ -510,6 +511,11 @@ class Manager(config.Reader): through all tasks based on the config order, and tries to clear out finished tasks as soon as possible. `standard` uses a priority and availability system to select the next task to run, and falls back to `legacy` if nothing is available. + key_pattern : str, optional + Regex pattern to match on in order to pass a key to subsequent tasks. By default, + any key that contains no alphanumeric characters (except _) will be ignored. + This is useful for controlling which keys are passed in tasks which produce + multiple outputs. Default is `[^\W_]` save_versions : list Module names (str). This list together with the version strings from these modules are attached to output metadata. Default is []. @@ -525,6 +531,7 @@ class Manager(config.Reader): cluster = config.Property(default={}, proptype=dict) task_specs = config.Property(default=[], proptype=list, key="tasks") execution_order = config.enum(["standard", "legacy"], default="standard") + key_pattern = config.Property(proptype=str, default=r"[^\W_]") # Options to be stored in self.all_tasks_params versions = config.Property(default=[], proptype=_get_versions, key="save_versions") @@ -538,6 +545,9 @@ def __init__(self, psutil_profiling=False): self._psutil_profiling = psutil_profiling + # Precompile the key pattern to skip + self.key_pattern = re.compile(self.key_pattern) + logger.debug( f"CPU and memory profiling using psutil {'enabled' if self._psutil_profiling else 'disabled'}." ) @@ -722,9 +732,9 @@ def run(self): # Queue outputs for any associated tasks for key, product in zip(task._out_keys, out): - # Purposefully skip this output. Used if only one output - # needs to be passed - if key == "_": + # Purposefully skip an output that does not match + # the allowed key pattern + if not self.key_pattern.search(key): continue # Try to pass this product to each task received = [