Skip to content

Commit

Permalink
DSL sample (#87)
Browse files Browse the repository at this point in the history
Fixes #7
  • Loading branch information
cretz authored Oct 24, 2023
1 parent c587d20 commit 41bb0ca
Show file tree
Hide file tree
Showing 12 changed files with 411 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
# Using fixed Poetry version until
# https://github.com/python-poetry/poetry/pull/7694 is fixed
- run: python -m pip install --upgrade wheel "poetry==1.4.0" poethepoet
- run: poetry install --with pydantic
- run: poetry install --with pydantic --with dsl
- run: poe lint
- run: poe test -s -o log_cli_level=DEBUG
- run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
Expand Down
29 changes: 29 additions & 0 deletions dsl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# DSL Sample

This sample shows how to have a workflow interpret/invoke arbitrary steps defined in a DSL. It is similar to the DSL
samples [in TypeScript](https://github.com/temporalio/samples-typescript/tree/main/dsl-interpreter) and
[in Go](https://github.com/temporalio/samples-go/tree/main/dsl).

For this sample, the optional `dsl` dependency group must be included. To include, run:

poetry install --with dsl

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute a workflow of steps defined in
[workflow1.yaml](workflow1.yaml):

poetry run python starter.py workflow1.yaml

This will run the workflow and show the final variables that the workflow returns. Looking in the worker terminal, each
step executed will be visible.

Similarly we can do the same for the more advanced [workflow2.yaml](workflow2.yaml) file:

poetry run python starter.py workflow2.yaml

This sample gives a guide of how one can write a workflow to interpret arbitrary steps from a user-provided DSL. Many
DSL models are more advanced and are more specific to conform to business logic needs.
Empty file added dsl/__init__.py
Empty file.
28 changes: 28 additions & 0 deletions dsl/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from temporalio import activity


class DSLActivities:
@activity.defn
async def activity1(self, arg: str) -> str:
activity.logger.info(f"Executing activity1 with arg: {arg}")
return f"[result from activity1: {arg}]"

@activity.defn
async def activity2(self, arg: str) -> str:
activity.logger.info(f"Executing activity2 with arg: {arg}")
return f"[result from activity2: {arg}]"

@activity.defn
async def activity3(self, arg1: str, arg2: str) -> str:
activity.logger.info(f"Executing activity3 with args: {arg1} and {arg2}")
return f"[result from activity3: {arg1} {arg2}]"

@activity.defn
async def activity4(self, arg: str) -> str:
activity.logger.info(f"Executing activity4 with arg: {arg}")
return f"[result from activity4: {arg}]"

@activity.defn
async def activity5(self, arg1: str, arg2: str) -> str:
activity.logger.info(f"Executing activity5 with args: {arg1} and {arg2}")
return f"[result from activity5: {arg1} {arg2}]"
46 changes: 46 additions & 0 deletions dsl/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
import logging
import sys
import uuid

import dacite
import yaml
from temporalio.client import Client

from dsl.workflow import DSLInput, DSLWorkflow


async def main(dsl_yaml: str) -> None:
# Convert the YAML to our dataclass structure. We use PyYAML + dacite to do
# this but it can be done any number of ways.
dsl_input = dacite.from_dict(DSLInput, yaml.safe_load(dsl_yaml))

# Connect client
client = await Client.connect("localhost:7233")

# Run workflow
result = await client.execute_workflow(
DSLWorkflow.run,
dsl_input,
id=f"dsl-workflow-id-{uuid.uuid4()}",
task_queue="dsl-task-queue",
)
logging.info(
f"Final variables:\n "
+ "\n ".join((f"{k}: {v}" for k, v in result.items()))
)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

# Require the YAML file as an argument. We read this _outside_ of the async
# def function because thread-blocking IO should never happen in async def
# functions.
if len(sys.argv) != 2:
raise RuntimeError("Expected single argument for YAML file")
with open(sys.argv[1], "r") as yaml_file:
dsl_yaml = yaml_file.read()

# Run
asyncio.run(main(dsl_yaml))
44 changes: 44 additions & 0 deletions dsl/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from dsl.activities import DSLActivities
from dsl.workflow import DSLWorkflow

interrupt_event = asyncio.Event()


async def main():
# Connect client
client = await Client.connect("localhost:7233")

# Run a worker for the activities and workflow
activities = DSLActivities()
async with Worker(
client,
task_queue="dsl-task-queue",
activities=[
activities.activity1,
activities.activity2,
activities.activity3,
activities.activity4,
activities.activity5,
],
workflows=[DSLWorkflow],
):
# Wait until interrupted
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
85 changes: 85 additions & 0 deletions dsl/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from __future__ import annotations

import asyncio
import dataclasses
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union

from temporalio import workflow


@dataclass
class DSLInput:
root: Statement
variables: Dict[str, Any] = dataclasses.field(default_factory=dict)


@dataclass
class ActivityStatement:
activity: ActivityInvocation


@dataclass
class ActivityInvocation:
name: str
arguments: List[str] = dataclasses.field(default_factory=list)
result: Optional[str] = None


@dataclass
class SequenceStatement:
sequence: Sequence


@dataclass
class Sequence:
elements: List[Statement]


@dataclass
class ParallelStatement:
parallel: Parallel


@dataclass
class Parallel:
branches: List[Statement]


Statement = Union[ActivityStatement, SequenceStatement, ParallelStatement]


@workflow.defn
class DSLWorkflow:
@workflow.run
async def run(self, input: DSLInput) -> Dict[str, Any]:
self.variables = dict(input.variables)
workflow.logger.info("Running DSL workflow")
await self.execute_statement(input.root)
workflow.logger.info("DSL workflow completed")
return self.variables

async def execute_statement(self, stmt: Statement) -> None:
if isinstance(stmt, ActivityStatement):
# Invoke activity loading arguments from variables and optionally
# storing result as a variable
result = await workflow.execute_activity(
stmt.activity.name,
args=[self.variables.get(arg, "") for arg in stmt.activity.arguments],
start_to_close_timeout=timedelta(minutes=1),
)
if stmt.activity.result:
self.variables[stmt.activity.result] = result
elif isinstance(stmt, SequenceStatement):
# Execute each statement in order
for elem in stmt.sequence.elements:
await self.execute_statement(elem)
elif isinstance(stmt, ParallelStatement):
# Execute all in parallel. Note, this will raise an exception when
# the first activity fails and will not cancel the others. We could
# store tasks and cancel if we wanted. In newer Python versions this
# would use a TaskGroup instead.
await asyncio.gather(
*[self.execute_statement(branch) for branch in stmt.parallel.branches]
)
28 changes: 28 additions & 0 deletions dsl/workflow1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This sample workflows execute 3 steps in sequence.
# 1) Activity1, takes arg1 as input, and put result as result1.
# 2) Activity2, takes result1 as input, and put result as result2.
# 3) Activity3, takes args2 and result2 as input, and put result as result3.

variables:
arg1: value1
arg2: value2

root:
sequence:
elements:
- activity:
name: activity1
arguments:
- arg1
result: result1
- activity:
name: activity2
arguments:
- result1
result: result2
- activity:
name: activity3
arguments:
- arg2
- result2
result: result3
58 changes: 58 additions & 0 deletions dsl/workflow2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This sample workflow executes 3 steps in sequence.
# 1) activity1, takes arg1 as input, and put result as result1.
# 2) it runs a parallel block which runs below sequence branches in parallel
# 2.1) sequence 1
# 2.1.1) activity2, takes result1 as input, and put result as result2
# 2.1.2) activity3, takes arg2 and result2 as input, and put result as result3
# 2.2) sequence 2
# 2.2.1) activity4, takes result1 as input, and put result as result4
# 2.2.2) activity5, takes arg3 and result4 as input, and put result as result5
# 3) activity3, takes result3 and result5 as input, and put result as result6.

variables:
arg1: value1
arg2: value2
arg3: value3

root:
sequence:
elements:
- activity:
name: activity1
arguments:
- arg1
result: result1
- parallel:
branches:
- sequence:
elements:
- activity:
name: activity2
arguments:
- result1
result: result2
- activity:
name: activity3
arguments:
- arg2
- result2
result: result3
- sequence:
elements:
- activity:
name: activity4
arguments:
- result1
result: result4
- activity:
name: activity5
arguments:
- arg3
- result4
result: result5
- activity:
name: activity3
arguments:
- result3
- result5
result: result6
Loading

0 comments on commit 41bb0ca

Please sign in to comment.