Skip to content

Commit

Permalink
Task execution graphs #26
Browse files Browse the repository at this point in the history
Included:
- Add deps task option for declaring dependencies on other tasks
- Add uses task option for declaring a task dependency where the
  output of the upstream task should be captured and made available
  as an environment variable.
- Add TaskExecutionGraph for planning execution of tasks with
  dependencies
- modify PoeExecutor to support stdout capture
- Add capture_stdout task option to redirect task output to a file.

Still to do:
- Feature tests for new features
- Document new features
  • Loading branch information
nat-n committed Apr 28, 2021
1 parent 7442ece commit 35123fd
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 67 deletions.
68 changes: 54 additions & 14 deletions poethepoet/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .exceptions import ExecutionError, PoeException
from .task import PoeTask
from .task.args import PoeTaskArgs
from .task.graph import TaskExecutionGraph
from .ui import PoeUi


Expand Down Expand Up @@ -47,12 +48,16 @@ def __call__(self, cli_args: Sequence[str]) -> int:
self.print_help()
return 0

if not self.resolve_task():
if not self.resolve_task(" ".join(cli_args)):
return 1

return self.run_task() or 0
assert self.task
if self.task.has_deps():
return self.run_task_graph() or 0
else:
return self.run_task() or 0

def resolve_task(self) -> bool:
def resolve_task(self, invocation: Sequence[str]) -> bool:
task = self.ui["task"]
if not task:
self.print_help(info="No task specified.")
Expand All @@ -71,29 +76,64 @@ def resolve_task(self) -> bool:
)
return False

self.task = PoeTask.from_config(task_name, config=self.config, ui=self.ui)
self.task = PoeTask.from_config(
task_name, config=self.config, ui=self.ui, invocation=tuple(invocation)
)
return True

def run_task(self) -> Optional[int]:
def run_task(self, context: Optional[RunContext] = None) -> Optional[int]:
_, *extra_args = self.ui["task"]
if context is None:
context = RunContext(
config=self.config,
env=os.environ,
dry=self.ui["dry_run"],
poe_active=os.environ.get("POE_ACTIVE"),
)
try:
assert self.task
return self.task.run(
context=RunContext(
config=self.config,
env=os.environ,
dry=self.ui["dry_run"],
poe_active=os.environ.get("POE_ACTIVE"),
),
extra_args=extra_args,
)
return self.task.run(context=context, extra_args=extra_args)
except PoeException as error:
self.print_help(error=error)
return 1
except ExecutionError as error:
self.ui.print_error(error=error)
return 1

def run_task_graph(self, context: Optional[RunContext] = None) -> Optional[int]:
assert self.task
graph = TaskExecutionGraph(self.task, self.config)
plan = graph.get_execution_plan()

context = RunContext(
config=self.config,
env=os.environ,
dry=self.ui["dry_run"],
poe_active=os.environ.get("POE_ACTIVE"),
)

for stage in plan:
for task in stage:
if task == self.task:
# The final sink task gets special treatment
return self.run_task(context)

try:
task_result = task.run(
context=context, extra_args=task.invocation[1:]
)
if task_result:
raise ExecutionError(
f"Task graph aborted after failed task {task.name!r}"
)
except PoeException as error:
self.print_help(error=error)
return 1
except ExecutionError as error:
self.ui.print_error(error=error)
return 1
return 0

def print_help(
self,
info: Optional[str] = None,
Expand Down
14 changes: 12 additions & 2 deletions poethepoet/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Dict,
MutableMapping,
Optional,
Tuple,
TYPE_CHECKING,
)
from .executor import PoeExecutor
Expand All @@ -20,6 +21,7 @@ class RunContext:
project_dir: Path
multistage: bool = False
exec_cache: Dict[str, Any]
captured_stdout: Dict[Tuple[str, ...], str]

def __init__(
self,
Expand All @@ -34,19 +36,27 @@ def __init__(
self.dry = dry
self.poe_active = poe_active
self.exec_cache = {}
self.captured_stdout = {}

@property
def executor_type(self) -> Optional[str]:
return self.config.executor["type"]

def get_env(self, env: MutableMapping[str, str]) -> Dict[str, str]:
return {**self.env, **env}

def get_executor(
self,
invocation: Tuple[str, ...],
env: MutableMapping[str, str],
task_executor: Optional[Dict[str, str]] = None,
task_options: Dict[str, Any],
) -> PoeExecutor:
return PoeExecutor.get(
invocation=invocation,
context=self,
env=self.get_env(env),
working_dir=self.project_dir,
dry=self.dry,
executor_config=task_executor,
executor_config=task_options.get("executor"),
capture_stdout=task_options.get("capture_stdout", False),
)
4 changes: 4 additions & 0 deletions poethepoet/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ def __init__(self, msg, *args):
self.args = (msg, *args)


class CyclicDependencyError(PoeException):
pass


class ExecutionError(RuntimeError):
def __init__(self, msg, *args):
self.msg = msg
Expand Down
42 changes: 33 additions & 9 deletions poethepoet/executor/base.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
import signal
from subprocess import Popen, PIPE
import sys
from typing import Any, Dict, MutableMapping, Optional, Sequence, Type, TYPE_CHECKING
from typing import (
Any,
Dict,
MutableMapping,
Optional,
Sequence,
Tuple,
Type,
TYPE_CHECKING,
Union,
)
from ..exceptions import PoeException
from ..virtualenv import Virtualenv

if TYPE_CHECKING:
from pathlib import Path
from ..context import RunContext

# TODO: maybe invert the control so the executor is given a task to run
# TODO: maybe invert the control so the executor is given a task to run?


class MetaPoeExecutor(type):
Expand Down Expand Up @@ -38,32 +48,38 @@ class PoeExecutor(metaclass=MetaPoeExecutor):

def __init__(
self,
invocation: Tuple[str, ...],
context: "RunContext",
options: MutableMapping[str, str],
env: MutableMapping[str, str],
working_dir: Optional["Path"] = None,
dry: bool = False,
capture_stdout: Union[str, bool] = False,
):
self.invocation = invocation
self.context = context
self.options = options
self.working_dir = working_dir
self.env = env
self.dry = dry
self.capture_stdout = capture_stdout

@classmethod
def get(
cls,
invocation: Tuple[str, ...],
context: "RunContext",
env: MutableMapping[str, str],
working_dir: Optional["Path"] = None,
dry: bool = False,
executor_config: Optional[Dict[str, str]] = None,
capture_stdout: Union[str, bool] = False,
) -> "PoeExecutor":
""""""
# use task specific executor config or fallback to global
options = executor_config or context.config.executor
return cls._resolve_implementation(context, executor_config)(
context, options, env, working_dir, dry
invocation, context, options, env, working_dir, dry, capture_stdout
)

@classmethod
Expand All @@ -75,13 +91,14 @@ def _resolve_implementation(
by making some reasonable assumptions based on visible features of the
environment
"""
config_executor_type = context.executor_type
if executor_config:
if executor_config["type"] not in cls.__executor_types:
raise PoeException(
f"Cannot instantiate unknown executor {executor_config['type']!r}"
)
return cls.__executor_types[executor_config["type"]]
elif context.config.executor["type"] == "auto":
elif config_executor_type == "auto":
if "poetry" in context.config.project["tool"]:
# Looks like this is a poetry project!
return cls.__executor_types["poetry"]
Expand All @@ -92,12 +109,11 @@ def _resolve_implementation(
# Fallback to not using any particular environment
return cls.__executor_types["simple"]
else:
if context.config.executor["type"] not in cls.__executor_types:
if config_executor_type not in cls.__executor_types:
raise PoeException(
f"Cannot instantiate unknown executor"
+ repr(context.config.executor["type"])
f"Cannot instantiate unknown executor" + repr(config_executor_type)
)
return cls.__executor_types[context.config.executor["type"]]
return cls.__executor_types[config_executor_type]

def execute(self, cmd: Sequence[str], input: Optional[bytes] = None,) -> int:
raise NotImplementedError
Expand All @@ -116,6 +132,11 @@ def _exec_via_subproc(
popen_kwargs["env"] = self.env if env is None else env
if input is not None:
popen_kwargs["stdin"] = PIPE
if self.capture_stdout:
if isinstance(self.capture_stdout, str):
popen_kwargs["stdout"] = open(self.capture_stdout, "wb")
else:
popen_kwargs["stdout"] = PIPE
if self.working_dir is not None:
popen_kwargs["cwd"] = self.working_dir

Expand All @@ -131,7 +152,10 @@ def handle_signal(signum, _frame):
old_signal_handler = signal.signal(signal.SIGINT, handle_signal)

# send data to the subprocess and wait for it to finish
proc.communicate(input)
(captured_stdout, _) = proc.communicate(input)

if self.capture_stdout == True:
self.context.captured_stdout[self.invocation] = captured_stdout.decode()

# restore signal handler
signal.signal(signal.SIGINT, old_signal_handler)
Expand Down
5 changes: 5 additions & 0 deletions poethepoet/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import re


def is_valid_env_var(var_name: str) -> bool:
return bool(re.match("[a-zA-Z_][a-zA-Z0-9_]*", var_name))
Loading

0 comments on commit 35123fd

Please sign in to comment.