Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Pydantic I/O types in workflow context #1189

Closed
154 changes: 154 additions & 0 deletions docs/examples/workflows/experimental/pydantic_io_in_dag_context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Pydantic Io In Dag Context






=== "Hera"

```python linenums="1"
import sys
from typing import List

if sys.version_info >= (3, 9):
from typing import Annotated
else:
from typing_extensions import Annotated


from hera.shared import global_config
from hera.workflows import DAG, Parameter, WorkflowTemplate, script
from hera.workflows.io.v1 import Input, Output

global_config.experimental_features["decorator_syntax"] = True


class CutInput(Input):
cut_after: Annotated[int, Parameter(name="cut-after")]
strings: List[str]


class CutOutput(Output):
first_strings: Annotated[List[str], Parameter(name="first-strings")]
remainder: List[str]


class JoinInput(Input):
strings: List[str]
joiner: str


class JoinOutput(Output):
joined_string: Annotated[str, Parameter(name="joined-string")]


@script(constructor="runner")
def cut(input: CutInput) -> CutOutput:
return CutOutput(
first_strings=input.strings[: input.cut_after],
remainder=input.strings[input.cut_after :],
exit_code=1 if len(input.strings) <= input.cut_after else 0,
)


@script(constructor="runner")
def join(input: JoinInput) -> JoinOutput:
return JoinOutput(joined_string=input.joiner.join(input.strings))


with WorkflowTemplate(generate_name="pydantic-io-in-steps-context-v1-", entrypoint="d") as w:
with DAG(name="d"):
cut_result = cut(CutInput(strings=["hello", "world", "it's", "hera"], cut_after=1))
join(JoinInput(strings=cut_result.first_strings, joiner=" "))
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
generateName: pydantic-io-in-steps-context-v1-
spec:
entrypoint: d
templates:
- dag:
tasks:
- arguments:
parameters:
- name: cut-after
value: '1'
- name: strings
value: '["hello", "world", "it''s", "hera"]'
name: cut
template: cut
- arguments:
parameters:
- name: strings
value: '{{tasks.cut.outputs.parameters.first-strings}}'
- name: joiner
value: ' '
depends: cut
name: join
template: join
name: d
- inputs:
parameters:
- name: cut-after
- name: strings
name: cut
outputs:
parameters:
- name: first-strings
valueFrom:
path: /tmp/hera-outputs/parameters/first-strings
- name: remainder
valueFrom:
path: /tmp/hera-outputs/parameters/remainder
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.pydantic_io_in_dag_context:cut
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__outputs_directory
value: /tmp/hera-outputs
- name: hera__script_pydantic_io
value: ''
image: python:3.8
source: '{{inputs.parameters}}'
- inputs:
parameters:
- name: strings
- name: joiner
name: join
outputs:
parameters:
- name: joined-string
valueFrom:
path: /tmp/hera-outputs/parameters/joined-string
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.pydantic_io_in_dag_context:join
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__outputs_directory
value: /tmp/hera-outputs
- name: hera__script_pydantic_io
value: ''
image: python:3.8
source: '{{inputs.parameters}}'
```

152 changes: 152 additions & 0 deletions docs/examples/workflows/experimental/pydantic_io_in_steps_context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Pydantic Io In Steps Context






=== "Hera"

```python linenums="1"
import sys
from typing import List

if sys.version_info >= (3, 9):
from typing import Annotated
else:
from typing_extensions import Annotated


from hera.shared import global_config
from hera.workflows import Parameter, Steps, WorkflowTemplate, script
from hera.workflows.io.v1 import Input, Output

global_config.experimental_features["decorator_syntax"] = True


class CutInput(Input):
cut_after: Annotated[int, Parameter(name="cut-after")]
strings: List[str]


class CutOutput(Output):
first_strings: Annotated[List[str], Parameter(name="first-strings")]
remainder: List[str]


class JoinInput(Input):
strings: List[str]
joiner: str


class JoinOutput(Output):
joined_string: Annotated[str, Parameter(name="joined-string")]


@script(constructor="runner")
def cut(input: CutInput) -> CutOutput:
return CutOutput(
first_strings=input.strings[: input.cut_after],
remainder=input.strings[input.cut_after :],
exit_code=1 if len(input.strings) <= input.cut_after else 0,
)


@script(constructor="runner")
def join(input: JoinInput) -> JoinOutput:
return JoinOutput(joined_string=input.joiner.join(input.strings))


with WorkflowTemplate(generate_name="pydantic-io-in-steps-context-v1-", entrypoint="d") as w:
with Steps(name="d"):
cut_result = cut(CutInput(strings=["hello", "world", "it's", "hera"], cut_after=1))
join(JoinInput(strings=cut_result.first_strings, joiner=" "))
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
generateName: pydantic-io-in-steps-context-v1-
spec:
entrypoint: d
templates:
- name: d
steps:
- - arguments:
parameters:
- name: cut-after
value: '1'
- name: strings
value: '["hello", "world", "it''s", "hera"]'
name: cut
template: cut
- - arguments:
parameters:
- name: strings
value: '{{steps.cut.outputs.parameters.first-strings}}'
- name: joiner
value: ' '
name: join
template: join
- inputs:
parameters:
- name: cut-after
- name: strings
name: cut
outputs:
parameters:
- name: first-strings
valueFrom:
path: /tmp/hera-outputs/parameters/first-strings
- name: remainder
valueFrom:
path: /tmp/hera-outputs/parameters/remainder
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.pydantic_io_in_steps_context:cut
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__outputs_directory
value: /tmp/hera-outputs
- name: hera__script_pydantic_io
value: ''
image: python:3.8
source: '{{inputs.parameters}}'
- inputs:
parameters:
- name: strings
- name: joiner
name: join
outputs:
parameters:
- name: joined-string
valueFrom:
path: /tmp/hera-outputs/parameters/joined-string
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.pydantic_io_in_steps_context:join
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__outputs_directory
value: /tmp/hera-outputs
- name: hera__script_pydantic_io
value: ''
image: python:3.8
source: '{{inputs.parameters}}'
```

37 changes: 37 additions & 0 deletions docs/user-guides/decorators.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,40 @@ We can simply call the script templates, passing the input objects in.

For more complex examples, including use of a dag, see
[the "experimental" examples](../examples/workflows/experimental/new_dag_decorator_params.md).

## Incremental workflow migration

If you have a larger workflow you want to migrate to decorator syntax, you can enable a hybrid mode where Pydantic types can be passed to functions in a Steps/DAG context block, intermixed with calls that pass dictionaries. This will allow you to make smaller changes, and verify that the generated YAML remains the same. For example:

```py
from hera.shared import global_config
from hera.workflows import Input, Output, Steps, Workflow, script

global_config.experimental_features["context_manager_pydantic_io"] = True

class MyInput(Input):
value: int

class MyOutput(Output):
value: int

# Function migrated to Pydantic I/O
@script()
def double(input: MyInput) -> MyOutput:
return MyOutput(value = input.value * 2)

# Not yet migrated to Pydantic I/O
@script()
def print_value(value: int) -> None:
print("Value was", value)

# Not yet migrated to decorator syntax
with Workflow(name="my-template") as w:
with Steps(name="steps"):
# Can now pass Pydantic types to/from functions
first_step = double(Input(value=5))
# Results can be passed into non-migrated functions
print_value(arguments={"value": first_step.value})
```

This feature is turned on by a different experimental flag, as we recommend only using this as a temporary stop-gap during a migration. Once you have fully migrated, you can disable the flag again to verify you are no longer using hybrid mode.
Loading
Loading