Skip to content

Commit

Permalink
feat(pipeline): add documentation and adjust key argument handling"
Browse files Browse the repository at this point in the history
  • Loading branch information
ljgray committed May 16, 2024
1 parent f2199fd commit f1afa3d
Showing 1 changed file with 73 additions and 37 deletions.
110 changes: 73 additions & 37 deletions caput/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
There are two options when choosing how to execute a pipeline: standard and legacy.
When the above pipeline is executed in standard mode, it produces the following output.
>>> local_tasks.update(globals()) # Required for interactive sessions.
>>> m = Manager.from_yaml_str(spam_config)
>>> m.run()
Expand Down Expand Up @@ -388,6 +389,7 @@
import logging
import os
import queue
import traceback
import warnings
from copy import deepcopy

Expand Down Expand Up @@ -500,11 +502,10 @@ class Manager(config.Reader):
task_specs : list
Configuration of pipeline tasks.
execution_order : str
Set the task execution order for this pipeline instance. `loop` round-robins
Set the task execution order for this pipeline instance. `legacy` round-robins
through all tasks based on the config order, and tries to clear out finished
tasks as soon as possible. `tree` walks through the tree of associated input-output
keys and tries to run each branch to completion to minimize the time for which
any given data product will exist. Default is `loop`.
tasks as soon as possible. `standard` uses a priority and availability system
to select the next task to run, and falls back to `legacy` if nothing is available.
save_versions : list
Module names (str). This list together with the version strings from these
modules are attached to output metadata. Default is [].
Expand Down Expand Up @@ -588,7 +589,7 @@ def from_yaml_str(cls, yaml_doc, lint=False, psutil_profiling=False):
if not isinstance(yaml_params["pipeline"], dict):
raise config.CaputConfigError(
"Value 'pipeline' in YAML configuration is of type "
f"`{type(yaml_params['pipeline']).__name__}` (expected a YAML block here).",
f"`{type(yaml_params['pipeline']).__name__}` (expected a dict here).",
location=yaml_params,
)
except TypeError as e:
Expand Down Expand Up @@ -647,9 +648,9 @@ def run(self):

# Log MPI information
if mpiutil._comm is not None:
logger.debug(f"Running with {mpiutil.size} MPI processe(s)")
logger.debug(f"Running with {mpiutil.size} MPI process(es)")
else:
logger.debug("Running serial.")
logger.debug("Running in single process without MPI.")

# Index of first task in the list which has
# not finished running
Expand Down Expand Up @@ -690,8 +691,9 @@ def run(self):
"is at beginning of task list. Advancing state."
)
task._pipeline_advance_state()
# Restart from the beginning of the task list
self._task_idx = self._task_head
else:
# Restart from the beginning of the task list
self._task_idx = self._task_head
continue
# Raised if the task has finished
except _PipelineFinished:
Expand All @@ -705,9 +707,7 @@ def run(self):
continue

if self.execution_order == "legacy":
# Advance the task pointer. If this is the last task
# in the list, it will get wrapped when selecting
# the next task
# Advance the task pointer
self._task_idx += 1

# Ensure the output(s) are correctly structured
Expand Down Expand Up @@ -788,7 +788,12 @@ def _next_task(self):
return self.tasks[self._task_idx]

def _iter_tasks(self):
"""Iterate through tasks in order and return the next in order."""
"""Iterate through tasks in order and return the next in order.
This method implements the `legacy` execution order, and is used
as a fallback for the `standard` processing order when no task is
available to run.
"""
for ii in range(len(self.tasks)):
# Iterate starting at the next task
jj = (ii + self._task_idx) % len(self.tasks)
Expand All @@ -809,7 +814,7 @@ def _check_task_output(out, task):
Returns
-------
out : Same as `TaskBase.next` or None
Pipeline product. None if there's no output of that task stage that
Pipeline product, or None if there is no output of the task stage that
has to be handled further.
Raises
Expand Down Expand Up @@ -855,12 +860,12 @@ def _setup_tasks(self):
self.add_task(task, task_spec)
except config.CaputConfigError as e:
raise config.CaputConfigError(
f"Setting up task {ii} caused an error!!",
f"Setting up task {ii} caused an error:\n\t{traceback.format_exc()}",
location=task_spec if e.line is None else e.line,
) from e

def _validate_task_inputs(self):
# Make sure this tasks in/requires values have corresponding
# Make sure all taskss in/requires values have corresponding
# out keys from another task
all_out_values = []
for t in self.task_specs:
Expand Down Expand Up @@ -920,7 +925,7 @@ def _get_task_from_spec(self, task_spec: dict):
task_cls = misc.import_class(task_path)
except (config.CaputConfigError, AttributeError, ModuleNotFoundError) as e:
raise config.CaputConfigError(
f"Loading task `{task_path}` caused an error {e.__class__.__name__}!!"
f"Loading task `{task_path}` caused an error:\n\t{traceback.format_exc()}"
) from e

# Get the parameters and initialize the class.
Expand Down Expand Up @@ -956,7 +961,7 @@ def _get_task_from_spec(self, task_spec: dict):
task = task_cls._from_config(task_params)
except config.CaputConfigError as e:
raise config.CaputConfigError(
f"Failed instantiating {task_cls} from config.",
f"Failed instantiating {task_cls} from config./n/t{traceback.format_exc()}",
location=task_spec.get("params", task_spec),
) from e

Expand All @@ -974,23 +979,42 @@ def add_task(self, task, task_spec: dict = {}, **kwargs):
The names of the task inputs and outputs.
**kwargs : dict
Included for legacy purposes. Alternative method to provide
`requires`, `in\_`, and `out` arguments
`requires`, `in\_`, and `out` arguments. These should *only*
be provided if `task_spec` is not provided - a ValueError
will be raised otherwise.
Raises
------
caput.config.CaputConfigError
If there was an error in the task configuration.
"""
requires = kwargs.get("requires", task_spec.get("requires"))
in_ = kwargs.get("in_", task_spec.get("in"))
out = kwargs.get("out", task_spec.get("out"))

def _check_duplicate(key0: str, key1: str, d0: dict, d1: dict):
"""Check if an argument has been provided twice."""
val0 = d0.get(key0, d0.get(key1))
val1 = d1.get(key0, d1.get(key1))

# Check that the key has not been provided twice. It's
# ok to return None, we only care if *both* values are
# not None
if val0 is None:
return val1

if val1 is None:
return val0

raise ValueError(f"Argument `{key0}/{key1}` was provided twice")

requires = _check_duplicate("requires", "requires", task_spec, kwargs)
in_ = _check_duplicate("in", "in_", task_spec, kwargs)
out = _check_duplicate("out", "out", task_spec, kwargs)

try:
task._setup_keys(in_, out, requires)
# Want to blindly catch errors
except Exception as e:
raise config.CaputConfigError(
f"Setting task {task!s} caused " f"an error!!"
f"Adding task {task!s} caused an error:\n\t{traceback.format_exc()}"
) from e

self.tasks.append(task)
Expand Down Expand Up @@ -1024,11 +1048,15 @@ class TaskBase(config.Reader):
Limits the number of `next` outputs from this task before finishing.
Default is None, allowing an unlimited number of `next` products.
base_priority : int
Integer priority relative to other tasks. Larger positive values have
higher priority, and larger negative numbers have lower priority.
Default is 0. After setup, priority will be adjusted based on the
difference between number of inputs and number of outputs - tasks
with more input than outputs will have increased priority.
Base integer priority. Priority only matters relative to other tasks
in a pipeline, with run order given by `sorted(priorities, reverse=True)`.
Task priority is also adjusted based on net difference in input and output,
which will typically adjust priority by +/- (0 to 2). `base_priority` should
be set accordingly - factors of 10 (i.e. -10, 10, 20, ...) are effective at
forcing a task to have highest/lowest priority relative to other tasks.
`base_priority` should be used sparingly when a user wants to enforce a
specific non-standard pipeline behaviour. See method `priority` for details
about dynamic priority.
"""

broadcast_inputs = config.Property(proptype=bool, default=False)
Expand Down Expand Up @@ -1151,23 +1179,31 @@ def _pipeline_is_available(self):
def priority(self):
"""Return the priority associated with this task.
If the task is not yet initialized, return the base priority.
If the task is not yet initialized, dynamic priority is zero.
If the task in in state `setup`, return base priority
plus one if all `requires` items are stashed, otherwise base
priority.
If the task in in state `setup`, dynamic priority is one
if all `requires` items are stashed and zero otherwise.
If the task is in state `next`, priority is the total net consumption
of the task plus its base priority.
If the task is in state `next`, dynamic priority is the total
net consumption of the task.
For example:
- A task which consumes 2 items, produces one, and can currently run
once will have priority (2 - 1) * 1 + base = 1 + base
- A task which does not consume anything but produces one item
will have priority (0 - 1) * 1 + base = -1 + base
In any other state, priority is just net consumption for one iteration
plus the base priority.
In any other state, priority is just net consumption for one
iteration.
The priority returned is the sum of `base_priority` and the
calculated dynamic priority.
Returns
-------
priority : int
`base_priority` plus dynamic priority calculated based on
task state and inputs/outputs
"""
if not hasattr(self, "_pipeline_state"):
# This task hasn't been initialized
Expand All @@ -1187,7 +1223,7 @@ def priority(self):
else:
# If a task has passed the above states, it should be
# finished quickly so set a very high priority
p = 100
p = 1e10

return p + self.base_priority

Expand Down

0 comments on commit f1afa3d

Please sign in to comment.