Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cyclic pipelines should run stable #171

Merged
merged 22 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ea1d394
fix: cyclic pipelines should run stable
mathislucka Jan 20, 2025
3ebc571
fix: use pipeline from experimental
mathislucka Jan 20, 2025
7ddf66f
fix: imports ruff
mathislucka Jan 20, 2025
f21a3fc
remove unit tests (tested in haystack main)
mathislucka Jan 20, 2025
9cf99f0
add missing dependency
mathislucka Jan 20, 2025
efd5ca1
fix potential use before assignment
mathislucka Jan 20, 2025
36ec9d8
fix line too long
mathislucka Jan 20, 2025
3ef29e1
improve comments and validate pipeline
Amnah199 Jan 23, 2025
411fc2a
revert test changes and remove ipthon dependency
julian-risch Jan 24, 2025
e24f222
remove unused _warn_if_ambiguous_intent
julian-risch Jan 24, 2025
5a66d98
remove redundant if condition
julian-risch Jan 24, 2025
0ecaf7b
update networkx import
julian-risch Jan 24, 2025
cd9db11
Merge branch 'fix/pipeline_run' of github.com:deepset-ai/haystack-exp…
julian-risch Jan 24, 2025
fef62c2
Merge branch 'main' into fix/pipeline_run
julian-risch Jan 24, 2025
b0d33cf
add ipython dependency, sort imports
julian-risch Jan 24, 2025
0081d82
Merge branch 'main' into fix/pipeline_run
julian-risch Jan 24, 2025
e506d40
fix type checker errors
julian-risch Jan 24, 2025
4697e84
Merge branch 'fix/pipeline_run' of github.com:deepset-ai/haystack-exp…
julian-risch Jan 24, 2025
f35be5c
adding Pipeline imports to __init__ files
davidsbatista Jan 24, 2025
b1948ba
Merge branch 'fix/pipeline_run' of github.com:deepset-ai/haystack-exp…
julian-risch Jan 24, 2025
1a0bbfc
add example to _convert_from_legacy_format docstring
julian-risch Jan 24, 2025
6c83d50
remove is_queue_blocked
Amnah199 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions haystack_experimental/core/pipeline/component_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ def are_all_sockets_ready(component: Dict, inputs: Dict, only_check_mandatory: b
for socket_name, socket in sockets_to_check.items():
socket_inputs = inputs.get(socket_name, [])
expected_sockets.add(socket_name)
if (
is_socket_lazy_variadic(socket)
and any_socket_input_received(socket_inputs)
or has_socket_received_all_inputs(socket, socket_inputs)

# Check if socket has all required inputs or is a lazy variadic socket with any input
if has_socket_received_all_inputs(socket, socket_inputs) or (
is_socket_lazy_variadic(socket) and any_socket_input_received(socket_inputs)
):
filled_sockets.add(socket_name)

Expand Down Expand Up @@ -221,6 +221,12 @@ def are_all_lazy_variadic_sockets_resolved(component: Dict, inputs: Dict) -> boo
for socket_name, socket in component["input_sockets"].items():
if is_socket_lazy_variadic(socket):
socket_inputs = inputs.get(socket_name, [])

# Checks if a lazy variadic socket is ready to run.
# A socket is ready if either:
# - it has received all expected inputs, or
# - all its predecessors have executed
# If none of the conditions are met, the socket is not ready to run and we defer the component.
if not (
has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs)
or all_socket_predecessors_executed(socket, socket_inputs)
Expand Down
56 changes: 46 additions & 10 deletions haystack_experimental/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

from haystack import logging, tracing
from haystack.core.component import Component, InputSocket
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
from haystack_experimental.core.pipeline.base import PipelineBase
from haystack.telemetry import pipeline_running

from haystack_experimental.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
from haystack_experimental.core.pipeline.base import PipelineBase
from haystack_experimental.core.pipeline.component_checks import (
_NO_OUTPUT_PRODUCED,
all_predecessors_executed,
Expand Down Expand Up @@ -388,6 +388,37 @@ def _is_queue_stale(priority_queue: FIFOPriorityQueue) -> bool:
"""
return len(priority_queue) == 0 or priority_queue.peek()[0] > ComponentPriority.READY

@staticmethod
def _is_queue_blocked(pq: FIFOPriorityQueue) -> bool:
Amnah199 marked this conversation as resolved.
Show resolved Hide resolved
"""
Checks if all the components in priority queue are blocked before pipeline run.

:param pq: Priority queue of component names.
"""
queue_copy = deepcopy(pq)

while queue_copy:
component = queue_copy.peek()
if component[0] != ComponentPriority.BLOCKED:
return False
queue_copy.pop()
return True

def validate_pipeline(self, priority_queue: FIFOPriorityQueue):
"""
Validate the pipeline to check if it is blocked or has valid no entry point.

:param priority_queue: Priority queue of component names.
"""
if self._is_queue_blocked(priority_queue):
raise PipelineRuntimeError(
"Cannot run pipeline - all components are blocked. "
"This typically happens when:\n"
"1. There is no valid entry point for the pipeline\n"
"2. There is a circular dependency preventing the pipeline from running\n"
"Check the connections between these components and ensure all required inputs are provided."
)

def run( # noqa: PLR0915, PLR0912
self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None
) -> Dict[str, Any]:
Expand Down Expand Up @@ -471,6 +502,8 @@ def run( # noqa: PLR0915, PLR0912
will only contain the outputs of leaf components, i.e., components
without outgoing connections.

:raises ValueError:
If invalid inputs are provided to the pipeline.
:raises PipelineRuntimeError:
If the Pipeline contains cycles with unsupported connections that would cause
it to get stuck and fail running.
Expand All @@ -490,7 +523,7 @@ def run( # noqa: PLR0915, PLR0912
# normalize `data`
data = self._prepare_component_input_data(data)

# Raise if input is malformed in some way
# Raise ValueError if input is malformed in some way
self._validate_input(data)

if include_outputs_from is None:
Expand All @@ -515,11 +548,14 @@ def run( # noqa: PLR0915, PLR0912
},
) as span:
inputs = self._convert_from_legacy_format(pipeline_inputs=data)
#self._warn_if_ambiguous_intent(
# self._warn_if_ambiguous_intent(
# inputs=inputs, component_names=ordered_component_names, receivers=cached_receivers
#)
# )
priority_queue = self._fill_queue(ordered_component_names, inputs)

# check if pipeline is blocked before execution
self.validate_pipeline(priority_queue)

while True:
candidate = self._get_next_runnable_component(priority_queue)
if candidate is None:
Expand All @@ -529,14 +565,14 @@ def run( # noqa: PLR0915, PLR0912
if len(priority_queue) > 0:
next_priority, next_name = priority_queue.peek()

# alternative to _warn_if_ambiguous_intent
if (
priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]
and next_priority == priority
priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]
and next_priority == priority
):
msg = (
f"Ambiguous running order: Components '{component_name}' and '{next_name}' are waiting for "
f"optional inputs at the same time. Component '{component_name}' executes first."
f"Components '{component_name}' and '{next_name}' are waiting for "
f"optional inputs at the same time. The pipeline will execute '{component_name}' "
f"first based on lexicographical ordering."
)
warnings.warn(msg)

Expand Down