Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove some deepcopy to speed up workflow conductor #256

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6a6f0b8
remove some deepcopy to speed up workflow conductor
guzzijones Jul 7, 2023
0f5d374
Update tox.yml
guzzijones Jul 8, 2023
73522e4
Update tox.yml
guzzijones Jul 8, 2023
46c7546
Update tox.yml
guzzijones Jul 8, 2023
3bff3e0
black formatting
guzzijones Jul 10, 2023
3805276
add back python 3.8
guzzijones Jul 10, 2023
8ce4a42
try 18.04
guzzijones Jul 10, 2023
61bf846
switch back to 20.04
guzzijones Jul 10, 2023
bbd2ea4
remove unittest2
guzzijones Jul 10, 2023
cb52348
install setup.py to load entrypoints
guzzijones Jul 10, 2023
7bdb543
set language
guzzijones Jul 10, 2023
8fd8bb1
remove another deepcopy
guzzijones Jul 10, 2023
aac13bf
remove staged copy
guzzijones Jul 10, 2023
4712345
remove reruns deepcopy
guzzijones Jul 10, 2023
f5caf3a
remove serialize parent_ctx copy
guzzijones Jul 10, 2023
6478f9b
deepcopy machine.py, deepcopy graphing.py, conducting.py
guzzijones Jul 11, 2023
0f58986
remove json_util
guzzijones Jul 11, 2023
d3541fb
need to copy staged so it isn't mutated for with items
guzzijones Jul 12, 2023
6e3866a
ensure staged is deep copied
guzzijones Jul 24, 2023
62a23a5
flake8 lint fix
guzzijones Jul 24, 2023
1364dca
add back deepcopy for errors as they are also mutated in st2
guzzijones Jul 27, 2023
d9816fa
do not copy with items context; added benchmarks
guzzijones Jan 23, 2024
016f825
remove comment about task render
guzzijones Jan 23, 2024
8cf2ff8
add test requirements
guzzijones Jan 23, 2024
3e7cfb4
fix conflicts
guzzijones Jan 23, 2024
eb09856
typo in benchmark vs benchmarks
guzzijones Jan 23, 2024
c5dbc16
add __init__.py so imports work
guzzijones Jan 23, 2024
84a6aa2
remove unused import ctx_util
guzzijones Jan 23, 2024
a1f4685
flake8 fixes
guzzijones Jan 23, 2024
3c2db2c
add license file
guzzijones Jan 23, 2024
429f0e0
remove extras require
guzzijones Jan 23, 2024
25cf175
more deep copy removed
guzzijones Jan 30, 2024
d4c848b
flake fix
guzzijones Jan 30, 2024
ce8fcdf
Merge branch 'master' into nocopy
guzzijones Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added orquesta/benchmarks/__init__.py
Empty file.
Empty file.
Empty file.
Empty file.
142 changes: 142 additions & 0 deletions orquesta/benchmarks/specs/native/v1/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright 2021 The StackStorm Authors.
# Copyright 2019 Extreme Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

import orquesta.specs.native.v1.models as models

from orquesta.expressions import base as expr_base
from orquesta.utils import context as ctx_util


WITH_ITEMS = [
[{"test": "s"}, {"test1": "s"}, {"test2": "s"}],
[{"test": "s" * 10000}, {"test1": "s" * 10000}, {"test2": "s" * 10000}],
[{"test": "s" * 1000000}, {"test1": "s" * 1000000}, {"test2": "s" * 1000000}],
]


@pytest.mark.parametrize("fixture", WITH_ITEMS, ids=["small", "medium", "large"])
@pytest.mark.benchmark(group="no deepcopy")
def test_task_spec_render(benchmark, fixture):
def run_benchmark():
# Instantiate workflow spec.
task_spec = models.TaskSpec(
{
"action": "core.echo message=<% item() %>",
"next": [{"publish": [{"items": "<% result() %>"}]}],
"with": {"items": "<% ctx(xs) %>"},
}
)
in_ctx = {
"xs": fixture,
"__current_task": {"id": "task1", "route": 0},
"__state": {
"contexts": [{"xs": fixture}],
"routes": [[]],
"sequence": [],
"staged": [
{"id": "task1", "ctxs": {"in": [0]}, "route": 0, "prev": {}, "ready": True}
],
"status": "running",
"tasks": {},
},
}
# Instantiate conductor
task_spec.render(in_ctx)

benchmark(run_benchmark)


class OldTaskSpec(models.TaskSpec):
def render(self, in_ctx):
action_specs = []

if not self.has_items():
action_spec = {
"action": expr_base.evaluate(self.action, in_ctx),
"input": expr_base.evaluate(getattr(self, "input", {}), in_ctx),
}

action_specs.append(action_spec)
else:
items_spec = self.get_items_spec()

if " in " not in items_spec.items:
items_expr = items_spec.items.strip()
else:
start_idx = items_spec.items.index(" in ") + 4
items_expr = items_spec.items[start_idx:].strip()

items = expr_base.evaluate(items_expr, in_ctx)

if not isinstance(items, list):
raise TypeError('The value of "%s" is not type of list.' % items_expr)

item_keys = (
None
if " in " not in items_spec.items
else items_spec.items[: items_spec.items.index(" in ")].replace(" ", "").split(",")
)

for idx, item in enumerate(items):
if item_keys and (isinstance(item, tuple) or isinstance(item, list)):
item = dict(zip(item_keys, list(item)))
elif item_keys and len(item_keys) == 1:
item = {item_keys[0]: item}

item_ctx_value = ctx_util.set_current_item(in_ctx, item)

action_spec = {
"action": expr_base.evaluate(self.action, item_ctx_value),
"input": expr_base.evaluate(getattr(self, "input", {}), item_ctx_value),
"item_id": idx,
}

action_specs.append(action_spec)

return self, action_specs


@pytest.mark.parametrize("fixture", WITH_ITEMS, ids=["small", "medium", "large"])
@pytest.mark.benchmark(group="deepcopy")
def test_task_spec_render_old(benchmark, fixture):
def run_benchmark():
# Instantiate workflow spec.
task_spec = OldTaskSpec(
{
"action": "core.echo message=<% item() %>",
"next": [{"publish": [{"items": "<% result() %>"}]}],
"with": {"items": "<% ctx(xs) %>"},
}
)
in_ctx = {
"xs": fixture,
"__current_task": {"id": "task1", "route": 0},
"__state": {
"contexts": [{"xs": fixture}],
"routes": [[]],
"sequence": [],
"staged": [
{"id": "task1", "ctxs": {"in": [0]}, "route": 0, "prev": {}, "ready": True}
],
"status": "running",
"tasks": {},
},
}
# Instantiate conductor
task_spec.render(in_ctx)

benchmark(run_benchmark)
55 changes: 27 additions & 28 deletions orquesta/conducting.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,29 @@ def __init__(self, conductor=None):

def serialize(self):
data = {
"contexts": json_util.deepcopy(self.contexts),
"routes": json_util.deepcopy(self.routes),
"sequence": json_util.deepcopy(self.sequence),
"staged": json_util.deepcopy(self.staged),
"contexts": self.contexts,
"routes": self.routes,
"sequence": self.sequence,
"staged": self.staged,
"status": self.status,
"tasks": json_util.deepcopy(self.tasks),
"tasks": self.tasks,
}

if self.reruns:
data["reruns"] = json_util.deepcopy(self.reruns)
data["reruns"] = self.reruns

return data

@classmethod
def deserialize(cls, data):
instance = cls()
instance.contexts = json_util.deepcopy(data.get("contexts", list()))
instance.routes = json_util.deepcopy(data.get("routes", list()))
instance.sequence = json_util.deepcopy(data.get("sequence", list()))
instance.contexts = data.get("contexts", list())
instance.routes = data.get("routes", list())
instance.sequence = data.get("sequence", list())
instance.staged = json_util.deepcopy(data.get("staged", list()))
instance.status = data.get("status", statuses.UNSET)
instance.tasks = json_util.deepcopy(data.get("tasks", dict()))
instance.reruns = json_util.deepcopy(data.get("reruns", list()))
instance.tasks = data.get("tasks", dict())
instance.reruns = data.get("reruns", list())

return instance

Expand Down Expand Up @@ -279,10 +279,10 @@ def serialize(self):
"spec": self.spec.serialize(),
"graph": self.graph.serialize(),
"input": self.get_workflow_input(),
"context": self.get_workflow_parent_context(),
"context": self._parent_ctx,
"state": self.workflow_state.serialize(),
"log": json_util.deepcopy(self.log),
"errors": json_util.deepcopy(self.errors),
"log": self.log,
"errors": self.errors,
"output": self.get_workflow_output(),
}

Expand All @@ -292,12 +292,12 @@ def deserialize(cls, data):
spec = spec_module.WorkflowSpec.deserialize(data["spec"])

graph = graphing.WorkflowGraph.deserialize(data["graph"])
inputs = json_util.deepcopy(data["input"])
context = json_util.deepcopy(data["context"])
inputs = data["input"]
context = data["context"]
state = WorkflowState.deserialize(data["state"])
log = json_util.deepcopy(data.get("log", []))
log = data.get("log", [])
errors = json_util.deepcopy(data["errors"])
guzzijones marked this conversation as resolved.
Show resolved Hide resolved
outputs = json_util.deepcopy(data["output"])
outputs = data["output"]

instance = cls(spec)
instance.restore(graph, log, errors, state, inputs, outputs, context)
Expand All @@ -317,7 +317,7 @@ def workflow_state(self):
self._workflow_state = WorkflowState(conductor=self)

# Set any given context as the initial context.
init_ctx = self.get_workflow_parent_context()
init_ctx = self.get_workflow_parent_context_copy()

# Render workflow inputs and merge into the initial context.
workflow_input = self.get_workflow_input()
Expand Down Expand Up @@ -407,11 +407,11 @@ def log_errors(self, errors, task_id=None, route=None, task_transition_id=None):
error, task_id=task_id, route=route, task_transition_id=task_transition_id
)

def get_workflow_parent_context(self):
def get_workflow_parent_context_copy(self):
return json_util.deepcopy(self._parent_ctx)

def get_workflow_input(self):
return json_util.deepcopy(self._inputs)
return self._inputs

def get_workflow_status(self):
return self.workflow_state.status
Expand Down Expand Up @@ -460,7 +460,7 @@ def request_workflow_status(self, status):
raise exc.InvalidWorkflowStatusTransition(current_status, wf_ex_event.name)

def get_workflow_initial_context(self):
return json_util.deepcopy(self.workflow_state.contexts[0])
return self.workflow_state.contexts[0]

def get_workflow_terminal_context(self):
if self.get_workflow_status() not in statuses.COMPLETED_STATUSES:
Expand All @@ -481,8 +481,7 @@ def get_workflow_terminal_context(self):
for idx, task in other_term_tasks:
# Remove the initial context since the first task processed above already
# inclulded that and we only want to apply the differences.
in_ctx_idxs = json_util.deepcopy(task["ctxs"]["in"])
in_ctx_idxs.remove(0)
in_ctx_idxs = [i for index, i in enumerate(task["ctxs"]["in"]) if index != 0]

wf_term_ctx = dict_util.merge_dicts(
wf_term_ctx, self.get_task_context(in_ctx_idxs), overwrite=True
Expand Down Expand Up @@ -512,7 +511,7 @@ def render_workflow_output(self):
self.request_workflow_status(statuses.FAILED)

def get_workflow_output(self):
return json_util.deepcopy(self._outputs) if self._outputs else None
return self._outputs if self._outputs else None

def reset_workflow_output(self):
self._outputs = None
Expand Down Expand Up @@ -782,7 +781,7 @@ def setup_retry_in_task_state(self, task_state_entry, in_ctx_idxs):
# Setup the retry in the task state.
task_id = task_state_entry["id"]
task_retry_spec = self.graph.get_task_retry_spec(task_id)
task_state_entry["retry"] = json_util.deepcopy(task_retry_spec)
task_state_entry["retry"] = task_retry_spec
task_state_entry["retry"]["tally"] = 0

# Get task context for evaluating the expression in delay and count.
Expand Down Expand Up @@ -1188,8 +1187,8 @@ def get_task_transition_contexts(self, task_id, route):

def _request_task_rerun(self, task_id, route, reset_items=False):
task = self.workflow_state.get_task(task_id, route)
task_ctx = json_util.deepcopy(task["ctxs"]["in"])
task_prev = json_util.deepcopy(task["prev"])
task_ctx = task["ctxs"]["in"]
task_prev = task["prev"]
task_spec = self.spec.tasks.get_task(task_id)

# Reset terminal status for the rerunnable candidate.
Expand Down
2 changes: 1 addition & 1 deletion orquesta/graphing.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def serialize(self):

@classmethod
def deserialize(cls, data):
g = json_graph.adjacency_graph(json_util.deepcopy(data), directed=True, multigraph=True)
g = json_graph.adjacency_graph(data, directed=True, multigraph=True)
return cls(graph=g)

@staticmethod
Expand Down
11 changes: 7 additions & 4 deletions orquesta/machines.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from orquesta import events
from orquesta import exceptions as exc
from orquesta import statuses
from orquesta.utils import jsonify as json_util


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -527,11 +526,15 @@ def add_context_to_task_item_event(cls, workflow_state, task_id, task_route, ac_
if ac_ex_event.status in requirements:
# Make a copy of the items and remove current item under evaluation.
staged_task = workflow_state.get_staged_task(task_id, task_route)
items = json_util.deepcopy(staged_task["items"])
del items[ac_ex_event.item_id]
items_status = [item.get("status", statuses.UNSET) for item in items]
items = staged_task["items"]
items_status = [
item.get("status", statuses.UNSET)
for index, item in enumerate(items)
if index != ac_ex_event.item_id
]

# Assess various situations.
# todo(aj) loop over list one time and add to each list
active = list(filter(lambda x: x in statuses.ACTIVE_STATUSES, items_status))
incomplete = list(filter(lambda x: x not in statuses.COMPLETED_STATUSES, items_status))
paused = list(filter(lambda x: x in [statuses.PENDING, statuses.PAUSED], items_status))
Expand Down
Loading
Loading