From f1afa3da56ad2d808bece9d212abfb1c1037ce37 Mon Sep 17 00:00:00 2001 From: Liam Gray Date: Mon, 13 May 2024 12:20:12 -0700 Subject: [PATCH] feat(pipeline): add documentation and adjust key argument handling" --- caput/pipeline.py | 110 ++++++++++++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 37 deletions(-) diff --git a/caput/pipeline.py b/caput/pipeline.py index cff92aea..f18ac435 100644 --- a/caput/pipeline.py +++ b/caput/pipeline.py @@ -182,6 +182,7 @@ There are two options when choosing how to execute a pipeline: standard and legacy. When the above pipeline is executed in standard mode, it produces the following output. + >>> local_tasks.update(globals()) # Required for interactive sessions. >>> m = Manager.from_yaml_str(spam_config) >>> m.run() @@ -388,6 +389,7 @@ import logging import os import queue +import traceback import warnings from copy import deepcopy @@ -500,11 +502,10 @@ class Manager(config.Reader): task_specs : list Configuration of pipeline tasks. execution_order : str - Set the task execution order for this pipeline instance. `loop` round-robins + Set the task execution order for this pipeline instance. `legacy` round-robins through all tasks based on the config order, and tries to clear out finished - tasks as soon as possible. `tree` walks through the tree of associated input-output - keys and tries to run each branch to completion to minimize the time for which - any given data product will exist. Default is `loop`. + tasks as soon as possible. `standard` uses a priority and availability system + to select the next task to run, and falls back to `legacy` if nothing is available. save_versions : list Module names (str). This list together with the version strings from these modules are attached to output metadata. Default is []. @@ -588,7 +589,7 @@ def from_yaml_str(cls, yaml_doc, lint=False, psutil_profiling=False): if not isinstance(yaml_params["pipeline"], dict): raise config.CaputConfigError( "Value 'pipeline' in YAML configuration is of type " - f"`{type(yaml_params['pipeline']).__name__}` (expected a YAML block here).", + f"`{type(yaml_params['pipeline']).__name__}` (expected a dict here).", location=yaml_params, ) except TypeError as e: @@ -647,9 +648,9 @@ def run(self): # Log MPI information if mpiutil._comm is not None: - logger.debug(f"Running with {mpiutil.size} MPI processe(s)") + logger.debug(f"Running with {mpiutil.size} MPI process(es)") else: - logger.debug("Running serial.") + logger.debug("Running in single process without MPI.") # Index of first task in the list which has # not finished running @@ -690,8 +691,9 @@ def run(self): "is at beginning of task list. Advancing state." ) task._pipeline_advance_state() - # Restart from the beginning of the task list - self._task_idx = self._task_head + else: + # Restart from the beginning of the task list + self._task_idx = self._task_head continue # Raised if the task has finished except _PipelineFinished: @@ -705,9 +707,7 @@ def run(self): continue if self.execution_order == "legacy": - # Advance the task pointer. If this is the last task - # in the list, it will get wrapped when selecting - # the next task + # Advance the task pointer self._task_idx += 1 # Ensure the output(s) are correctly structured @@ -788,7 +788,12 @@ def _next_task(self): return self.tasks[self._task_idx] def _iter_tasks(self): - """Iterate through tasks in order and return the next in order.""" + """Iterate through tasks in order and return the next in order. + + This method implements the `legacy` execution order, and is used + as a fallback for the `standard` processing order when no task is + available to run. + """ for ii in range(len(self.tasks)): # Iterate starting at the next task jj = (ii + self._task_idx) % len(self.tasks) @@ -809,7 +814,7 @@ def _check_task_output(out, task): Returns ------- out : Same as `TaskBase.next` or None - Pipeline product. None if there's no output of that task stage that + Pipeline product, or None if there is no output of the task stage that has to be handled further. Raises @@ -855,12 +860,12 @@ def _setup_tasks(self): self.add_task(task, task_spec) except config.CaputConfigError as e: raise config.CaputConfigError( - f"Setting up task {ii} caused an error!!", + f"Setting up task {ii} caused an error:\n\t{traceback.format_exc()}", location=task_spec if e.line is None else e.line, ) from e def _validate_task_inputs(self): - # Make sure this tasks in/requires values have corresponding + # Make sure all taskss in/requires values have corresponding # out keys from another task all_out_values = [] for t in self.task_specs: @@ -920,7 +925,7 @@ def _get_task_from_spec(self, task_spec: dict): task_cls = misc.import_class(task_path) except (config.CaputConfigError, AttributeError, ModuleNotFoundError) as e: raise config.CaputConfigError( - f"Loading task `{task_path}` caused an error {e.__class__.__name__}!!" + f"Loading task `{task_path}` caused an error:\n\t{traceback.format_exc()}" ) from e # Get the parameters and initialize the class. @@ -956,7 +961,7 @@ def _get_task_from_spec(self, task_spec: dict): task = task_cls._from_config(task_params) except config.CaputConfigError as e: raise config.CaputConfigError( - f"Failed instantiating {task_cls} from config.", + f"Failed instantiating {task_cls} from config./n/t{traceback.format_exc()}", location=task_spec.get("params", task_spec), ) from e @@ -974,23 +979,42 @@ def add_task(self, task, task_spec: dict = {}, **kwargs): The names of the task inputs and outputs. **kwargs : dict Included for legacy purposes. Alternative method to provide - `requires`, `in\_`, and `out` arguments + `requires`, `in\_`, and `out` arguments. These should *only* + be provided if `task_spec` is not provided - a ValueError + will be raised otherwise. Raises ------ caput.config.CaputConfigError If there was an error in the task configuration. """ - requires = kwargs.get("requires", task_spec.get("requires")) - in_ = kwargs.get("in_", task_spec.get("in")) - out = kwargs.get("out", task_spec.get("out")) + + def _check_duplicate(key0: str, key1: str, d0: dict, d1: dict): + """Check if an argument has been provided twice.""" + val0 = d0.get(key0, d0.get(key1)) + val1 = d1.get(key0, d1.get(key1)) + + # Check that the key has not been provided twice. It's + # ok to return None, we only care if *both* values are + # not None + if val0 is None: + return val1 + + if val1 is None: + return val0 + + raise ValueError(f"Argument `{key0}/{key1}` was provided twice") + + requires = _check_duplicate("requires", "requires", task_spec, kwargs) + in_ = _check_duplicate("in", "in_", task_spec, kwargs) + out = _check_duplicate("out", "out", task_spec, kwargs) try: task._setup_keys(in_, out, requires) # Want to blindly catch errors except Exception as e: raise config.CaputConfigError( - f"Setting task {task!s} caused " f"an error!!" + f"Adding task {task!s} caused an error:\n\t{traceback.format_exc()}" ) from e self.tasks.append(task) @@ -1024,11 +1048,15 @@ class TaskBase(config.Reader): Limits the number of `next` outputs from this task before finishing. Default is None, allowing an unlimited number of `next` products. base_priority : int - Integer priority relative to other tasks. Larger positive values have - higher priority, and larger negative numbers have lower priority. - Default is 0. After setup, priority will be adjusted based on the - difference between number of inputs and number of outputs - tasks - with more input than outputs will have increased priority. + Base integer priority. Priority only matters relative to other tasks + in a pipeline, with run order given by `sorted(priorities, reverse=True)`. + Task priority is also adjusted based on net difference in input and output, + which will typically adjust priority by +/- (0 to 2). `base_priority` should + be set accordingly - factors of 10 (i.e. -10, 10, 20, ...) are effective at + forcing a task to have highest/lowest priority relative to other tasks. + `base_priority` should be used sparingly when a user wants to enforce a + specific non-standard pipeline behaviour. See method `priority` for details + about dynamic priority. """ broadcast_inputs = config.Property(proptype=bool, default=False) @@ -1151,14 +1179,13 @@ def _pipeline_is_available(self): def priority(self): """Return the priority associated with this task. - If the task is not yet initialized, return the base priority. + If the task is not yet initialized, dynamic priority is zero. - If the task in in state `setup`, return base priority - plus one if all `requires` items are stashed, otherwise base - priority. + If the task in in state `setup`, dynamic priority is one + if all `requires` items are stashed and zero otherwise. - If the task is in state `next`, priority is the total net consumption - of the task plus its base priority. + If the task is in state `next`, dynamic priority is the total + net consumption of the task. For example: - A task which consumes 2 items, produces one, and can currently run @@ -1166,8 +1193,17 @@ def priority(self): - A task which does not consume anything but produces one item will have priority (0 - 1) * 1 + base = -1 + base - In any other state, priority is just net consumption for one iteration - plus the base priority. + In any other state, priority is just net consumption for one + iteration. + + The priority returned is the sum of `base_priority` and the + calculated dynamic priority. + + Returns + ------- + priority : int + `base_priority` plus dynamic priority calculated based on + task state and inputs/outputs """ if not hasattr(self, "_pipeline_state"): # This task hasn't been initialized @@ -1187,7 +1223,7 @@ def priority(self): else: # If a task has passed the above states, it should be # finished quickly so set a very high priority - p = 100 + p = 1e10 return p + self.base_priority