Skip to content

Commit

Permalink
feat(pipeline): better handling for key rejection
Browse files Browse the repository at this point in the history
  • Loading branch information
ljgray committed May 31, 2024
1 parent fe5b8bb commit 94d17cf
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions caput/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@
import logging
import os
import queue
import re
import traceback
import warnings
from copy import deepcopy
Expand Down Expand Up @@ -488,7 +489,7 @@ def _get_versions(modules):


class Manager(config.Reader):
"""Pipeline manager for setting up and running pipeline tasks.
r"""Pipeline manager for setting up and running pipeline tasks.
The manager is in charge of initializing all pipeline tasks, setting them
up by providing the appropriate parameters, then executing the methods of
Expand All @@ -510,6 +511,11 @@ class Manager(config.Reader):
through all tasks based on the config order, and tries to clear out finished
tasks as soon as possible. `standard` uses a priority and availability system
to select the next task to run, and falls back to `legacy` if nothing is available.
key_pattern : str, optional
Regex pattern to match on in order to pass a key to subsequent tasks. By default,
any key that contains no alphanumeric characters (except _) will be ignored.
This is useful for controlling which keys are passed in tasks which produce
multiple outputs. Default is `[^\W_]`
save_versions : list
Module names (str). This list together with the version strings from these
modules are attached to output metadata. Default is [].
Expand All @@ -525,6 +531,7 @@ class Manager(config.Reader):
cluster = config.Property(default={}, proptype=dict)
task_specs = config.Property(default=[], proptype=list, key="tasks")
execution_order = config.enum(["standard", "legacy"], default="standard")
key_pattern = config.Property(proptype=str, default=r"[^\W_]")

# Options to be stored in self.all_tasks_params
versions = config.Property(default=[], proptype=_get_versions, key="save_versions")
Expand All @@ -538,6 +545,9 @@ def __init__(self, psutil_profiling=False):

self._psutil_profiling = psutil_profiling

# Precompile the key pattern to skip
self.key_pattern = re.compile(self.key_pattern)

logger.debug(
f"CPU and memory profiling using psutil {'enabled' if self._psutil_profiling else 'disabled'}."
)
Expand Down Expand Up @@ -722,9 +732,9 @@ def run(self):

# Queue outputs for any associated tasks
for key, product in zip(task._out_keys, out):
# Purposefully skip this output. Used if only one output
# needs to be passed
if key == "_":
# Purposefully skip an output that does not match
# the allowed key pattern
if not self.key_pattern.search(key):
continue
# Try to pass this product to each task
received = [
Expand Down

0 comments on commit 94d17cf

Please sign in to comment.