From 89fb8d506ff33b441f90972d127746d15da6192e Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:21:18 -0700 Subject: [PATCH 01/14] Add more comments and docstrings --- libs/langgraph/langgraph/constants.py | 167 ++++++++---------- libs/langgraph/langgraph/errors.py | 25 +-- libs/langgraph/langgraph/pregel/algo.py | 25 ++- libs/langgraph/langgraph/pregel/debug.py | 4 + libs/langgraph/langgraph/pregel/executor.py | 17 +- libs/langgraph/langgraph/pregel/io.py | 5 +- libs/langgraph/langgraph/pregel/loop.py | 6 +- libs/langgraph/langgraph/pregel/messages.py | 3 + libs/langgraph/langgraph/pregel/metadata.py | 0 libs/langgraph/langgraph/pregel/read.py | 24 ++- libs/langgraph/langgraph/pregel/runner.py | 8 + libs/langgraph/langgraph/pregel/types.py | 112 ++++++++++-- libs/langgraph/langgraph/pregel/utils.py | 2 +- libs/langgraph/langgraph/pregel/validate.py | 2 +- libs/langgraph/langgraph/pregel/write.py | 29 ++- libs/langgraph/langgraph/version.py | 2 +- libs/langgraph/tests/test_tracing_interops.py | 7 +- 17 files changed, 274 insertions(+), 164 deletions(-) delete mode 100644 libs/langgraph/langgraph/pregel/metadata.py diff --git a/libs/langgraph/langgraph/constants.py b/libs/langgraph/langgraph/constants.py index e8719e664..eb0f703be 100644 --- a/libs/langgraph/langgraph/constants.py +++ b/libs/langgraph/langgraph/constants.py @@ -1,133 +1,106 @@ -from dataclasses import dataclass from types import MappingProxyType -from typing import Any, Literal, Mapping +from typing import Any, Mapping +from langgraph.pregel.types import Interrupt, Send # noqa: F401 + +# Interrupt, Send re-exported for backwards compatibility + +# --- Empty read-only containers --- +EMPTY_MAP: Mapping[str, Any] = MappingProxyType({}) +EMPTY_SEQ: tuple[str, ...] = tuple() + +# --- Reserved write keys --- INPUT = "__input__" +# for values passed as input to the graph +INTERRUPT = "__interrupt__" +# for dynamic interrupts raised by nodes +ERROR = "__error__" +# for errors raised by nodes +NO_WRITES = "__no_writes__" +# marker to signal node didn't write anything +SCHEDULED = "__scheduled__" +# marker to signal node was scheduled (in distributed mode) +TASKS = "__pregel_tasks" +# for Send objects returned by nodes/edges, corresponds to PUSH below +START = "__start__" +# marker for the first (maybe virtual) node in graph-style Pregel +END = "__end__" +# marker for the last (maybe virtual) node in graph-style Pregel + +# --- Reserved config.configurable keys --- CONFIG_KEY_SEND = "__pregel_send" +# holds the `write` function that accepts writes to state/edges/reserved keys CONFIG_KEY_READ = "__pregel_read" +# holds the `read` function that returns a copy of the current state CONFIG_KEY_CHECKPOINTER = "__pregel_checkpointer" +# holds a `BaseCheckpointSaver` passed from parent graph to child graphs CONFIG_KEY_STREAM = "__pregel_stream" +# holds a `StreamProtocol` passed from parent graph to child graphs CONFIG_KEY_STREAM_WRITER = "__pregel_stream_writer" +# holds a `StreamWriter` for stream_mode=custom CONFIG_KEY_STORE = "__pregel_store" +# holds a `BaseStore` made available to managed values CONFIG_KEY_RESUMING = "__pregel_resuming" +# holds a boolean indicating if subgraphs should resume from a previous checkpoint CONFIG_KEY_TASK_ID = "__pregel_task_id" +# holds the task ID for the current task CONFIG_KEY_DEDUPE_TASKS = "__pregel_dedupe_tasks" +# holds a boolean indicating if tasks should be deduplicated (for distributed mode) CONFIG_KEY_ENSURE_LATEST = "__pregel_ensure_latest" +# holds a boolean indicating whether to assert the requested checkpoint is the latest +# (for distributed mode) CONFIG_KEY_DELEGATE = "__pregel_delegate" -# this one part of public API so more readable +# holds a boolean indicating whether to delegate subgraphs (for distributed mode) CONFIG_KEY_CHECKPOINT_MAP = "checkpoint_map" -INTERRUPT = "__interrupt__" -ERROR = "__error__" -NO_WRITES = "__no_writes__" -SCHEDULED = "__scheduled__" -TASKS = "__pregel_tasks" # for backwards compat, this is the original name of PUSH +# holds a mapping of checkpoint_ns -> checkpoint_id for parent graphs +CONFIG_KEY_CHECKPOINT_ID = "checkpoint_id" +# holds the current checkpoint_id, if any +CONFIG_KEY_CHECKPOINT_NS = "checkpoint_ns" +# holds the current checkpoint_ns, "" for root graph + +# --- Other constants --- PUSH = "__pregel_push" +# denotes push-style tasks, ie. those created by Send objects PULL = "__pregel_pull" +# denotes pull-style tasks, ie. those triggered by edges RUNTIME_PLACEHOLDER = "__pregel_runtime_placeholder__" +# placeholder for managed values replaced at runtime +TAG_HIDDEN = "langsmith:hidden" +# tag to hide a node/edge from certain tracing/streaming environments +NS_SEP = "|" +# for checkpoint_ns, separates each level (ie. graph|subgraph|subsubgraph) +NS_END = ":" +# for checkpoint_ns, for each level, separates the namespace from the task_id + RESERVED = { - SCHEDULED, + # reserved write keys + INPUT, INTERRUPT, ERROR, NO_WRITES, + SCHEDULED, TASKS, - PUSH, - PULL, + # reserved config.configurable keys CONFIG_KEY_SEND, CONFIG_KEY_READ, CONFIG_KEY_CHECKPOINTER, - CONFIG_KEY_CHECKPOINT_MAP, CONFIG_KEY_STREAM, CONFIG_KEY_STREAM_WRITER, CONFIG_KEY_STORE, + CONFIG_KEY_CHECKPOINT_MAP, CONFIG_KEY_RESUMING, CONFIG_KEY_TASK_ID, CONFIG_KEY_DEDUPE_TASKS, CONFIG_KEY_ENSURE_LATEST, CONFIG_KEY_DELEGATE, - INPUT, + CONFIG_KEY_CHECKPOINT_MAP, + CONFIG_KEY_CHECKPOINT_ID, + CONFIG_KEY_CHECKPOINT_NS, + # other constants + PUSH, + PULL, RUNTIME_PLACEHOLDER, + TAG_HIDDEN, + NS_SEP, + NS_END, } -TAG_HIDDEN = "langsmith:hidden" - -START = "__start__" -END = "__end__" - -NS_SEP = "|" -NS_END = ":" - -EMPTY_MAP: Mapping[str, Any] = MappingProxyType({}) - - -class Send: - """A message or packet to send to a specific node in the graph. - - The `Send` class is used within a `StateGraph`'s conditional edges to - dynamically invoke a node with a custom state at the next step. - - Importantly, the sent state can differ from the core graph's state, - allowing for flexible and dynamic workflow management. - - One such example is a "map-reduce" workflow where your graph invokes - the same node multiple times in parallel with different states, - before aggregating the results back into the main graph's state. - - Attributes: - node (str): The name of the target node to send the message to. - arg (Any): The state or message to send to the target node. - - Examples: - >>> from typing import Annotated - >>> import operator - >>> class OverallState(TypedDict): - ... subjects: list[str] - ... jokes: Annotated[list[str], operator.add] - ... - >>> from langgraph.constants import Send - >>> from langgraph.graph import END, START - >>> def continue_to_jokes(state: OverallState): - ... return [Send("generate_joke", {"subject": s}) for s in state['subjects']] - ... - >>> from langgraph.graph import StateGraph - >>> builder = StateGraph(OverallState) - >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]}) - >>> builder.add_conditional_edges(START, continue_to_jokes) - >>> builder.add_edge("generate_joke", END) - >>> graph = builder.compile() - >>> - >>> # Invoking with two subjects results in a generated joke for each - >>> graph.invoke({"subjects": ["cats", "dogs"]}) - {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']} - """ - - node: str - arg: Any - - def __init__(self, /, node: str, arg: Any) -> None: - """ - Initialize a new instance of the Send class. - - Args: - node (str): The name of the target node to send the message to. - arg (Any): The state or message to send to the target node. - """ - self.node = node - self.arg = arg - - def __hash__(self) -> int: - return hash((self.node, self.arg)) - - def __repr__(self) -> str: - return f"Send(node={self.node!r}, arg={self.arg!r})" - - def __eq__(self, value: object) -> bool: - return ( - isinstance(value, Send) - and self.node == value.node - and self.arg == value.arg - ) - - -@dataclass -class Interrupt: - value: Any - when: Literal["during"] = "during" diff --git a/libs/langgraph/langgraph/errors.py b/libs/langgraph/langgraph/errors.py index ec84e0b28..ad3150942 100644 --- a/libs/langgraph/langgraph/errors.py +++ b/libs/langgraph/langgraph/errors.py @@ -1,8 +1,10 @@ from typing import Any, Sequence -from langgraph.checkpoint.base import EmptyChannelError +from langgraph.checkpoint.base import EmptyChannelError # noqa: F401 from langgraph.constants import Interrupt +# EmptyChannelError re-exported for backwards compatibility + class GraphRecursionError(RecursionError): """Raised when the graph has exhausted the maximum number of steps. @@ -24,13 +26,14 @@ class GraphRecursionError(RecursionError): class InvalidUpdateError(Exception): - """Raised when attempting to update a channel with an invalid sequence of updates.""" + """Raised when attempting to update a channel with an invalid set of updates.""" pass class GraphInterrupt(Exception): - """Raised when a subgraph is interrupted.""" + """Raised when a subgraph is interrupted, supressed by the root graph. + Never raised directly, or surfaced to the user.""" def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None: super().__init__(interrupts) @@ -44,7 +47,7 @@ def __init__(self, value: Any) -> None: class GraphDelegate(Exception): - """Raised when a graph is delegated.""" + """Raised when a graph is delegated (for distributed mode).""" def __init__(self, *args: dict[str, Any]) -> None: super().__init__(*args) @@ -57,22 +60,12 @@ class EmptyInputError(Exception): class TaskNotFound(Exception): - """Raised when the executor is unable to find a task.""" + """Raised when the executor is unable to find a task (for distributed mode).""" pass class CheckpointNotLatest(Exception): - """Raised when the checkpoint is not the latest version.""" + """Raised when the checkpoint is not the latest version (for distributed mode).""" pass - - -__all__ = [ - "GraphRecursionError", - "InvalidUpdateError", - "GraphInterrupt", - "NodeInterrupt", - "EmptyInputError", - "EmptyChannelError", -] diff --git a/libs/langgraph/langgraph/pregel/algo.py b/libs/langgraph/langgraph/pregel/algo.py index 8a63d4cc6..44da0bbd5 100644 --- a/libs/langgraph/langgraph/pregel/algo.py +++ b/libs/langgraph/langgraph/pregel/algo.py @@ -33,6 +33,7 @@ CONFIG_KEY_READ, CONFIG_KEY_SEND, CONFIG_KEY_TASK_ID, + EMPTY_SEQ, INTERRUPT, NO_WRITES, NS_END, @@ -55,10 +56,11 @@ GetNextVersion = Callable[[Optional[V], BaseChannel], V] -EMPTY_SEQ: tuple[str, ...] = tuple() - class WritesProtocol(Protocol): + """Protocol for objects containing writes to be applied to checkpoint. + Implemented by PregelTaskWrites and PregelExecutableTask.""" + @property def name(self) -> str: ... @@ -70,6 +72,9 @@ def triggers(self) -> Sequence[str]: ... class PregelTaskWrites(NamedTuple): + """Simplest implementation of WritesProtocol, for usage with writes that + don't originate from a runnable task, eg. graph input, update_state, etc.""" + name: str writes: Sequence[tuple[str, Any]] triggers: Sequence[str] @@ -80,6 +85,7 @@ def should_interrupt( interrupt_nodes: Union[All, Sequence[str]], tasks: Iterable[PregelExecutableTask], ) -> list[PregelExecutableTask]: + """Check if the graph should be interrupted based on current state.""" version_type = type(next(iter(checkpoint["channel_versions"].values()), None)) null_version = version_type() # type: ignore[misc] seen = checkpoint["versions_seen"].get(INTERRUPT, {}) @@ -117,6 +123,9 @@ def local_read( select: Union[list[str], str], fresh: bool = False, ) -> Union[dict[str, Any], Any]: + """Function injected under CONFIG_KEY_READ in task config, to read current state. + Used by conditional edges to read a copy of the state with reflecting the writes + from that node only.""" if isinstance(select, str): managed_keys = [] for c, _ in task.writes: @@ -153,6 +162,8 @@ def local_write( managed: ManagedValueMapping, writes: Sequence[tuple[str, Any]], ) -> None: + """Function injected under CONFIG_KEY_SEND in task config, to write to channels. + Validates writes and forwards them to `commit` function.""" for chan, value in writes: if chan == TASKS: if not isinstance(value, Send): @@ -169,6 +180,7 @@ def local_write( def increment(current: Optional[int], channel: BaseChannel) -> int: + """Default channel versioning function, increments the current int version.""" return current + 1 if current is not None else 1 @@ -178,6 +190,9 @@ def apply_writes( tasks: Iterable[WritesProtocol], get_next_version: Optional[GetNextVersion], ) -> dict[str, list[Any]]: + """Apply writes from a set of tasks (usually the tasks from a Pregel step) + to the checkpoint and channels, and return managed values writes to be applied + externally.""" # update seen versions for task in tasks: checkpoint["versions_seen"].setdefault(task.name, {}).update( @@ -297,6 +312,9 @@ def prepare_next_tasks( checkpointer: Optional[BaseCheckpointSaver] = None, manager: Union[None, ParentRunManager, AsyncParentRunManager] = None, ) -> Union[dict[str, PregelTask], dict[str, PregelExecutableTask]]: + """Prepare the set of tasks that will make up the next Pregel step. + This is the union of all PUSH tasks (Sends) and PULL tasks (nodes triggered + by edges).""" tasks: dict[str, Union[PregelTask, PregelExecutableTask]] = {} # Consume pending packets for idx, _ in enumerate(checkpoint["pending_sends"]): @@ -348,6 +366,8 @@ def prepare_single_task( checkpointer: Optional[BaseCheckpointSaver] = None, manager: Union[None, ParentRunManager, AsyncParentRunManager] = None, ) -> Union[None, PregelTask, PregelExecutableTask]: + """Prepares a single task for the next Pregel step, given a task path, which + uniquely identifies a PUSH or PULL task within the graph.""" checkpoint_id = UUID(checkpoint["id"]).bytes configurable = config.get("configurable", {}) parent_ns = configurable.get("checkpoint_ns", "") @@ -568,6 +588,7 @@ def _proc_input( *, for_execution: bool, ) -> Iterator[Any]: + """Prepare input for a PULL task, based on the process's channels and triggers.""" # If all trigger channels subscribed by this process are not empty # then invoke the process with the values of all non-empty channels if isinstance(proc.channels, dict): diff --git a/libs/langgraph/langgraph/pregel/debug.py b/libs/langgraph/langgraph/pregel/debug.py index 56f1eb9c6..9c4661c0c 100644 --- a/libs/langgraph/langgraph/pregel/debug.py +++ b/libs/langgraph/langgraph/pregel/debug.py @@ -84,6 +84,7 @@ class DebugOutputCheckpoint(DebugOutputBase): def map_debug_tasks( step: int, tasks: Iterable[PregelExecutableTask] ) -> Iterator[DebugOutputTask]: + """Produce "task" events for stream_mode=debug.""" ts = datetime.now(timezone.utc).isoformat() for task in tasks: if task.config is not None and TAG_HIDDEN in task.config.get("tags", []): @@ -107,6 +108,7 @@ def map_debug_task_results( task_tup: tuple[PregelExecutableTask, Sequence[tuple[str, Any]]], stream_keys: Union[str, Sequence[str]], ) -> Iterator[DebugOutputTaskResult]: + """Produce "task_result" events for stream_mode=debug.""" stream_channels_list = ( [stream_keys] if isinstance(stream_keys, str) else stream_keys ) @@ -135,6 +137,7 @@ def map_debug_checkpoint( tasks: Iterable[PregelExecutableTask], pending_writes: list[PendingWrite], ) -> Iterator[DebugOutputCheckpoint]: + """Produce "checkpoint" events for stream_mode=debug.""" yield { "type": "checkpoint", "timestamp": checkpoint["ts"], @@ -213,6 +216,7 @@ def tasks_w_writes( pending_writes: Optional[list[PendingWrite]], states: Optional[dict[str, Union[RunnableConfig, StateSnapshot]]], ) -> tuple[PregelTask, ...]: + """Apply writes / subgraph states to tasks to be returned in a StateSnapshot.""" pending_writes = pending_writes or [] return tuple( PregelTask( diff --git a/libs/langgraph/langgraph/pregel/executor.py b/libs/langgraph/langgraph/pregel/executor.py index 46f1c3f64..691098b7a 100644 --- a/libs/langgraph/langgraph/pregel/executor.py +++ b/libs/langgraph/langgraph/pregel/executor.py @@ -39,6 +39,13 @@ def __call__( class BackgroundExecutor(ContextManager): + """A context manager that runs sync tasks in the background. + Uses a thread pool executor to delegate tasks to separate threads. + On exit, + - cancels any (not yet started) tasks with `__cancel_on_exit__=True` + - waits for all tasks to finish + - re-raises the first exception from tasks with `__reraise_on_exit__=True`""" + def __init__(self, config: RunnableConfig) -> None: self.stack = ExitStack() self.executor = self.stack.enter_context(get_executor_for_config(config)) @@ -49,7 +56,7 @@ def submit( # type: ignore[valid-type] fn: Callable[P, T], *args: P.args, __name__: Optional[str] = None, # currently not used in sync version - __cancel_on_exit__: bool = False, + __cancel_on_exit__: bool = False, # for sync, can cancel only if not started __reraise_on_exit__: bool = True, **kwargs: P.kwargs, ) -> concurrent.futures.Future[T]: @@ -101,6 +108,14 @@ def __exit__( class AsyncBackgroundExecutor(AsyncContextManager): + """A context manager that runs async tasks in the background. + Uses the current event loop to delegate tasks to asyncio tasks. + On exit, + - cancels any tasks with `__cancel_on_exit__=True` + - waits for all tasks to finish + - re-raises the first exception from tasks with `__reraise_on_exit__=True` + ignoring CancelledError""" + def __init__(self) -> None: self.context_not_supported = sys.version_info < (3, 11) self.tasks: dict[asyncio.Task, tuple[bool, bool]] = {} diff --git a/libs/langgraph/langgraph/pregel/io.py b/libs/langgraph/langgraph/pregel/io.py index ad2252c9d..6542b1d91 100644 --- a/libs/langgraph/langgraph/pregel/io.py +++ b/libs/langgraph/langgraph/pregel/io.py @@ -3,7 +3,7 @@ from langchain_core.runnables.utils import AddableDict from langgraph.channels.base import BaseChannel, EmptyChannelError -from langgraph.constants import ERROR, INTERRUPT, TAG_HIDDEN +from langgraph.constants import EMPTY_SEQ, ERROR, INTERRUPT, TAG_HIDDEN from langgraph.pregel.log import logger from langgraph.pregel.types import PregelExecutableTask @@ -97,9 +97,6 @@ def __radd__(self, other: dict[str, Any]) -> "AddableUpdatesDict": raise TypeError("AddableUpdatesDict does not support right-side addition") -EMPTY_SEQ: tuple[str, ...] = tuple() - - def map_output_updates( output_channels: Union[str, Sequence[str]], tasks: list[tuple[PregelExecutableTask, Sequence[tuple[str, Any]]]], diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index 49baa1846..c97cc8a69 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -44,6 +44,7 @@ CONFIG_KEY_RESUMING, CONFIG_KEY_STREAM, CONFIG_KEY_TASK_ID, + EMPTY_SEQ, ERROR, INPUT, INTERRUPT, @@ -101,13 +102,12 @@ V = TypeVar("V") P = ParamSpec("P") +StreamChunk = tuple[tuple[str, ...], str, Any] + INPUT_DONE = object() INPUT_RESUMING = object() -EMPTY_SEQ = () SPECIAL_CHANNELS = (ERROR, INTERRUPT, SCHEDULED) -StreamChunk = tuple[tuple[str, ...], str, Any] - class StreamProtocol: __slots__ = ("modes", "__call__") diff --git a/libs/langgraph/langgraph/pregel/messages.py b/libs/langgraph/langgraph/pregel/messages.py index 7c3f90b10..d0ae539e2 100644 --- a/libs/langgraph/langgraph/pregel/messages.py +++ b/libs/langgraph/langgraph/pregel/messages.py @@ -24,6 +24,9 @@ class StreamMessagesHandler(BaseCallbackHandler, _StreamingCallbackHandler): + """A callback handler that implements stream_mode=messages. + Collects messages from (1) chat model stream events and (2) node outputs.""" + def __init__(self, stream: Callable[[StreamChunk], None]): self.stream = stream self.metadata: dict[UUID, Meta] = {} diff --git a/libs/langgraph/langgraph/pregel/metadata.py b/libs/langgraph/langgraph/pregel/metadata.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/libs/langgraph/langgraph/pregel/read.py b/libs/langgraph/langgraph/pregel/read.py index 3ad988b89..097e76fa2 100644 --- a/libs/langgraph/langgraph/pregel/read.py +++ b/libs/langgraph/langgraph/pregel/read.py @@ -31,6 +31,9 @@ class ChannelRead(RunnableCallable): + """Implements the logic for reading state from CONFIG_KEY_READ. + Usable both as a runnable as well as a static method to call imperatively.""" + channel: Union[str, list[str]] fresh: bool = False @@ -108,21 +111,39 @@ def do_read( class PregelNode(Runnable): + """A node in a Pregel graph. This won't be invoked as a runnable by the graph + itself, but instead acts as a container for the components necessary to make + a PregelExecutableTask for a node.""" + channels: Union[list[str], Mapping[str, str]] + """The channels that will be passed as input to `bound`. + If a list, the node will be invoked with the first of that isn't empty. + If a dict, the keys are the names of the channels, and the values are the keys + to use in the input to `bound`.""" triggers: list[str] + """If any of these channels is written to, this node will be triggered in + the next step.""" mapper: Optional[Callable[[Any], Any]] + """A function to transform the input before passing it to `bound`.""" writers: list[Runnable] + """A list of writers that will be executed after `bound`, responsible for + taking the output of `bound` and writing it to the appropriate channels.""" bound: Runnable[Any, Any] + """The main logic of the node. This will be invoked with the input from + `channels`.""" retry_policy: Optional[RetryPolicy] + """The retry policy to use when invoking the node.""" tags: Optional[Sequence[str]] + """Tags to attach to the node for tracing.""" metadata: Optional[Mapping[str, Any]] + """Metadata to attach to the node for tracing.""" def __init__( self, @@ -151,7 +172,7 @@ def copy(self, update: dict[str, Any]) -> PregelNode: @cached_property def flat_writers(self) -> list[Runnable]: - """Get writers with optimizations applied.""" + """Get writers with optimizations applied. Dedupes consecutive ChannelWrites.""" writers = self.writers.copy() while ( len(writers) > 1 @@ -170,6 +191,7 @@ def flat_writers(self) -> list[Runnable]: @cached_property def node(self) -> Optional[Runnable[Any, Any]]: + """Get a runnable that combines `bound` and `writers`.""" writers = self.flat_writers if self.bound is DEFAULT_BOUND and not writers: return None diff --git a/libs/langgraph/langgraph/pregel/runner.py b/libs/langgraph/langgraph/pregel/runner.py index 14e84352f..32bbf74bd 100644 --- a/libs/langgraph/langgraph/pregel/runner.py +++ b/libs/langgraph/langgraph/pregel/runner.py @@ -22,6 +22,10 @@ class PregelRunner: + """Responsible for executing a set of Pregel tasks concurrently, commiting + their writes, yielding control to caller when there is output to emit, and + interrupting other tasks if appropriate.""" + def __init__( self, *, @@ -215,6 +219,8 @@ def commit( def _should_stop_others( done: Union[set[concurrent.futures.Future[Any]], set[asyncio.Future[Any]]], ) -> bool: + """Check if any task failed, if so, cancel all other tasks. + GraphInterrupts are not considered failures.""" for fut in done: if fut.cancelled(): return True @@ -227,6 +233,7 @@ def _should_stop_others( def _exception( fut: Union[concurrent.futures.Future[Any], asyncio.Future[Any]], ) -> Optional[BaseException]: + """Return the exception from a future, without raising CancelledError.""" if fut.cancelled(): if isinstance(fut, asyncio.Future): return asyncio.CancelledError() @@ -245,6 +252,7 @@ def _panic_or_proceed( timeout_exc_cls: Type[Exception] = TimeoutError, panic: bool = True, ) -> None: + """Cancel remaining tasks if any failed, re-raise exception if panic is True.""" done: set[Union[concurrent.futures.Future[Any], asyncio.Future[Any]]] = set() inflight: set[Union[concurrent.futures.Future[Any], asyncio.Future[Any]]] = set() for fut, val in futs.items(): diff --git a/libs/langgraph/langgraph/pregel/types.py b/libs/langgraph/langgraph/pregel/types.py index d34845483..4cca9946c 100644 --- a/libs/langgraph/langgraph/pregel/types.py +++ b/libs/langgraph/langgraph/pregel/types.py @@ -1,10 +1,28 @@ from collections import deque +from dataclasses import dataclass from typing import Any, Callable, Literal, NamedTuple, Optional, Sequence, Type, Union from langchain_core.runnables import Runnable, RunnableConfig from langgraph.checkpoint.base import CheckpointMetadata -from langgraph.constants import Interrupt + +All = Literal["*"] + +StreamMode = Literal["values", "updates", "debug", "messages", "custom"] +"""How the stream method should emit outputs. + +- 'values': Emit all values of the state for each step. +- 'updates': Emit only the node name(s) and updates + that were returned by the node(s) **after** each step. +- 'debug': Emit debug events for each step. +- 'messages': Emit LLM messages token-by-token. +- 'custom': Emit custom output `write: StreamWriter` kwarg of each node. +""" + +StreamWriter = Callable[[Any], None] +"""Callable that accepts a single argument and writes it to the output stream. +Always injected into nodes if requested as a keyword argument, but it's a no-op +when not using stream_mode="custom".""" def default_retry_on(exc: Exception) -> bool: @@ -63,6 +81,12 @@ class CachePolicy(NamedTuple): pass +@dataclass +class Interrupt: + value: Any + when: Literal["during"] = "during" + + class PregelTask(NamedTuple): id: str name: str @@ -105,20 +129,72 @@ class StateSnapshot(NamedTuple): """Tasks to execute in this step. If already attempted, may contain an error.""" -All = Literal["*"] - -StreamMode = Literal["values", "updates", "debug", "messages", "custom"] -"""How the stream method should emit outputs. - -- 'values': Emit all values of the state for each step. -- 'updates': Emit only the node name(s) and updates - that were returned by the node(s) **after** each step. -- 'debug': Emit debug events for each step. -- 'messages': Emit LLM messages token-by-token. -- 'custom': Emit custom output `write: StreamWriter` kwarg of each node. -""" - -StreamWriter = Callable[[Any], None] -"""Callable that accepts a single argument and writes it to the output stream. -Always injected into nodes if requested, -but it's a no-op when not using stream_mode="custom".""" +class Send: + """A message or packet to send to a specific node in the graph. + + The `Send` class is used within a `StateGraph`'s conditional edges to + dynamically invoke a node with a custom state at the next step. + + Importantly, the sent state can differ from the core graph's state, + allowing for flexible and dynamic workflow management. + + One such example is a "map-reduce" workflow where your graph invokes + the same node multiple times in parallel with different states, + before aggregating the results back into the main graph's state. + + Attributes: + node (str): The name of the target node to send the message to. + arg (Any): The state or message to send to the target node. + + Examples: + >>> from typing import Annotated + >>> import operator + >>> class OverallState(TypedDict): + ... subjects: list[str] + ... jokes: Annotated[list[str], operator.add] + ... + >>> from langgraph.constants import Send + >>> from langgraph.graph import END, START + >>> def continue_to_jokes(state: OverallState): + ... return [Send("generate_joke", {"subject": s}) for s in state['subjects']] + ... + >>> from langgraph.graph import StateGraph + >>> builder = StateGraph(OverallState) + >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]}) + >>> builder.add_conditional_edges(START, continue_to_jokes) + >>> builder.add_edge("generate_joke", END) + >>> graph = builder.compile() + >>> + >>> # Invoking with two subjects results in a generated joke for each + >>> graph.invoke({"subjects": ["cats", "dogs"]}) + {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']} + """ + + __slots__ = ("node", "arg") + + node: str + arg: Any + + def __init__(self, /, node: str, arg: Any) -> None: + """ + Initialize a new instance of the Send class. + + Args: + node (str): The name of the target node to send the message to. + arg (Any): The state or message to send to the target node. + """ + self.node = node + self.arg = arg + + def __hash__(self) -> int: + return hash((self.node, self.arg)) + + def __repr__(self) -> str: + return f"Send(node={self.node!r}, arg={self.arg!r})" + + def __eq__(self, value: object) -> bool: + return ( + isinstance(value, Send) + and self.node == value.node + and self.arg == value.arg + ) diff --git a/libs/langgraph/langgraph/pregel/utils.py b/libs/langgraph/langgraph/pregel/utils.py index c6dc064d3..3a29e5ed1 100644 --- a/libs/langgraph/langgraph/pregel/utils.py +++ b/libs/langgraph/langgraph/pregel/utils.py @@ -4,7 +4,7 @@ def get_new_channel_versions( previous_versions: ChannelVersions, current_versions: ChannelVersions ) -> ChannelVersions: - """Get new channel versions.""" + """Get subset of current_versions that are newer than previous_versions.""" if previous_versions: version_type = type(next(iter(current_versions.values()), None)) null_version = version_type() # type: ignore[misc] diff --git a/libs/langgraph/langgraph/pregel/validate.py b/libs/langgraph/langgraph/pregel/validate.py index 232014240..965fab54e 100644 --- a/libs/langgraph/langgraph/pregel/validate.py +++ b/libs/langgraph/langgraph/pregel/validate.py @@ -17,7 +17,7 @@ def validate_graph( ) -> None: for chan in channels: if chan in RESERVED: - raise ValueError(f"Channel names {RESERVED} are reserved") + raise ValueError(f"Channel names {chan} are reserved") subscribed_channels = set[str]() for name, node in nodes.items(): diff --git a/libs/langgraph/langgraph/pregel/write.py b/libs/langgraph/langgraph/pregel/write.py index 2adcab757..c2795c67c 100644 --- a/libs/langgraph/langgraph/pregel/write.py +++ b/libs/langgraph/langgraph/pregel/write.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio from typing import ( Any, Callable, @@ -22,30 +21,29 @@ TYPE_SEND = Callable[[Sequence[tuple[str, Any]]], None] R = TypeVar("R", bound=Runnable) - SKIP_WRITE = object() PASSTHROUGH = object() class ChannelWriteEntry(NamedTuple): channel: str + """Channel name to write to.""" value: Any = PASSTHROUGH + """Value to write, or PASSTHROUGH to use the input.""" skip_none: bool = False + """Whether to skip writing if the value is None.""" mapper: Optional[Callable] = None + """Function to transform the value before writing.""" class ChannelWrite(RunnableCallable): + """Implements th logic for sending writes to CONFIG_KEY_SEND. + Can be used as a runnable or as a static method to call imperatively.""" + writes: list[Union[ChannelWriteEntry, Send]] - """ - Sequence of write entries, each of which is a tuple of: - - channel name - - runnable to map input, or None to use the input, or any other value to use instead - - whether to skip writing if the mapped value is None - """ + """Sequence of write entries or Send objects to write.""" require_at_least_one_of: Optional[Sequence[str]] - """ - If defined, at least one of these channels must be written to. - """ + """If defined, at least one of these channels must be written to.""" def __init__( self, @@ -145,6 +143,7 @@ def do_write( @staticmethod def is_writer(runnable: Runnable) -> bool: + """Used by PregelNode to distinguish between writers and other runnables.""" return ( isinstance(runnable, ChannelWrite) or getattr(runnable, "_is_channel_writer", False) is True @@ -152,13 +151,9 @@ def is_writer(runnable: Runnable) -> bool: @staticmethod def register_writer(runnable: R) -> R: + """Used to mark a runnable as a writer, so that it can be detected by is_writer. + Instances of ChannelWrite are automatically marked as writers.""" # using object.__setattr__ to work around objects that override __setattr__ # eg. pydantic models and dataclasses object.__setattr__(runnable, "_is_channel_writer", True) return runnable - - -def _mk_future(val: Any) -> asyncio.Future: - fut: asyncio.Future[Any] = asyncio.Future() - fut.set_result(val) - return fut diff --git a/libs/langgraph/langgraph/version.py b/libs/langgraph/langgraph/version.py index 3368893c0..f5cb757f5 100644 --- a/libs/langgraph/langgraph/version.py +++ b/libs/langgraph/langgraph/version.py @@ -1,4 +1,4 @@ -"""Main entrypoint into package.""" +"""Exports package version.""" from importlib import metadata diff --git a/libs/langgraph/tests/test_tracing_interops.py b/libs/langgraph/tests/test_tracing_interops.py index 9bc8750b5..f1f0ad7fb 100644 --- a/libs/langgraph/tests/test_tracing_interops.py +++ b/libs/langgraph/tests/test_tracing_interops.py @@ -5,11 +5,14 @@ from unittest.mock import MagicMock import langsmith as ls +import pytest from langchain_core.runnables import RunnableConfig from langchain_core.tracers import LangChainTracer from langgraph.graph import StateGraph +pytestmark = pytest.mark.anyio + def _get_mock_client(**kwargs: Any) -> ls.Client: mock_session = MagicMock() @@ -76,7 +79,7 @@ async def child_node(state: State) -> State: child_builder = StateGraph(State) child_builder.add_node(child_node) child_builder.add_edge("__start__", "child_node") - child_graph = child_builder.compile() + child_graph = child_builder.compile().with_config(run_name="child_graph") parent_builder = StateGraph(State) parent_builder.add_node(parent_node) @@ -101,7 +104,7 @@ def get_posts(): # If the callbacks weren't propagated correctly, we'd # end up with broken dotted_orders parent_run = next(data for data in posts if data["name"] == "parent_node") - child_run = next(data for data in posts if data["name"] == "child_node") + child_run = next(data for data in posts if data["name"] == "child_graph") traceable_run = next(data for data in posts if data["name"] == "some_traceable") assert child_run["dotted_order"].startswith(traceable_run["dotted_order"]) From 9f8d71c65bad907a9b23ad8a11c0580830305644 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:23:12 -0700 Subject: [PATCH 02/14] Lint --- libs/langgraph/langgraph/errors.py | 2 +- libs/langgraph/langgraph/pregel/runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/langgraph/langgraph/errors.py b/libs/langgraph/langgraph/errors.py index ad3150942..08bed5927 100644 --- a/libs/langgraph/langgraph/errors.py +++ b/libs/langgraph/langgraph/errors.py @@ -32,7 +32,7 @@ class InvalidUpdateError(Exception): class GraphInterrupt(Exception): - """Raised when a subgraph is interrupted, supressed by the root graph. + """Raised when a subgraph is interrupted, suppressed by the root graph. Never raised directly, or surfaced to the user.""" def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None: diff --git a/libs/langgraph/langgraph/pregel/runner.py b/libs/langgraph/langgraph/pregel/runner.py index 32bbf74bd..6ba72e6ad 100644 --- a/libs/langgraph/langgraph/pregel/runner.py +++ b/libs/langgraph/langgraph/pregel/runner.py @@ -22,7 +22,7 @@ class PregelRunner: - """Responsible for executing a set of Pregel tasks concurrently, commiting + """Responsible for executing a set of Pregel tasks concurrently, committing their writes, yielding control to caller when there is output to emit, and interrupting other tasks if appropriate.""" From 313e3eed3e499af811ec3ca9e0576dff4332ccd7 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:26:29 -0700 Subject: [PATCH 03/14] Fix circular import --- libs/langgraph/langgraph/constants.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/libs/langgraph/langgraph/constants.py b/libs/langgraph/langgraph/constants.py index eb0f703be..e17ce7de8 100644 --- a/libs/langgraph/langgraph/constants.py +++ b/libs/langgraph/langgraph/constants.py @@ -1,9 +1,21 @@ from types import MappingProxyType from typing import Any, Mapping -from langgraph.pregel.types import Interrupt, Send # noqa: F401 # Interrupt, Send re-exported for backwards compatibility +def __getattr__(name: str) -> Any: + if name in globals(): + return globals()[name] + elif name == "Interrupt": + from langgraph.pregel.types import Interrupt + + return Interrupt + elif name == "Send": + from langgraph.pregel.types import Send + + return Send + raise AttributeError(f"module {__name__} has no attribute {name}") + # --- Empty read-only containers --- EMPTY_MAP: Mapping[str, Any] = MappingProxyType({}) From 3cbd70ffc9b64200a2f877fdcf186076a46230fd Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:30:04 -0700 Subject: [PATCH 04/14] Fix docs --- docs/docs/reference/graphs.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/docs/reference/graphs.md b/docs/docs/reference/graphs.md index b2f4acdf2..3c25180ca 100644 --- a/docs/docs/reference/graphs.md +++ b/docs/docs/reference/graphs.md @@ -29,7 +29,7 @@ handler: python ## StreamMode -::: langgraph.pregel.StreamMode +::: langgraph.pregel.types.StreamMode ## Constants @@ -69,7 +69,7 @@ builder.add_conditional_edges("my_node", my_condition) ## Send -::: langgraph.constants.Send +::: langgraph.pregel.types.Send ## RetryPolicy From d9854b36ebb6690a5106ad73a3099ef3ee386259 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:37:17 -0700 Subject: [PATCH 05/14] Move to langgraph.types --- docs/docs/reference/graphs.md | 8 +- libs/langgraph/langgraph/constants.py | 15 +- libs/langgraph/langgraph/errors.py | 2 +- libs/langgraph/langgraph/graph/graph.py | 2 +- libs/langgraph/langgraph/graph/state.py | 2 +- libs/langgraph/langgraph/pregel/__init__.py | 2 +- libs/langgraph/langgraph/pregel/algo.py | 2 +- libs/langgraph/langgraph/pregel/debug.py | 2 +- libs/langgraph/langgraph/pregel/io.py | 2 +- libs/langgraph/langgraph/pregel/loop.py | 2 +- libs/langgraph/langgraph/pregel/retry.py | 12 +- libs/langgraph/langgraph/pregel/runner.py | 2 +- libs/langgraph/langgraph/pregel/types.py | 225 ++---------------- libs/langgraph/langgraph/pregel/validate.py | 2 +- libs/langgraph/langgraph/types.py | 200 ++++++++++++++++ libs/langgraph/langgraph/utils/runnable.py | 2 +- libs/langgraph/tests/test_pregel.py | 2 +- libs/langgraph/tests/test_pregel_async.py | 2 +- .../langgraph/scheduler/kafka/executor.py | 2 +- .../langgraph/scheduler/kafka/orchestrator.py | 2 +- .../langgraph/scheduler/kafka/retry.py | 2 +- 21 files changed, 258 insertions(+), 234 deletions(-) create mode 100644 libs/langgraph/langgraph/types.py diff --git a/docs/docs/reference/graphs.md b/docs/docs/reference/graphs.md index 3c25180ca..779f95095 100644 --- a/docs/docs/reference/graphs.md +++ b/docs/docs/reference/graphs.md @@ -69,8 +69,12 @@ builder.add_conditional_edges("my_node", my_condition) ## Send -::: langgraph.pregel.types.Send +::: langgraph.types.Send + +## Interrupt + +::: langgraph.types.Interrupt ## RetryPolicy -::: langgraph.pregel.types.RetryPolicy +::: langgraph.types.RetryPolicy diff --git a/libs/langgraph/langgraph/constants.py b/libs/langgraph/langgraph/constants.py index e17ce7de8..ef4a8a486 100644 --- a/libs/langgraph/langgraph/constants.py +++ b/libs/langgraph/langgraph/constants.py @@ -1,20 +1,9 @@ from types import MappingProxyType from typing import Any, Mapping +from langgraph.types import Interrupt, Send # noqa: F401 # Interrupt, Send re-exported for backwards compatibility -def __getattr__(name: str) -> Any: - if name in globals(): - return globals()[name] - elif name == "Interrupt": - from langgraph.pregel.types import Interrupt - - return Interrupt - elif name == "Send": - from langgraph.pregel.types import Send - - return Send - raise AttributeError(f"module {__name__} has no attribute {name}") # --- Empty read-only containers --- @@ -54,6 +43,8 @@ def __getattr__(name: str) -> Any: # holds a `BaseStore` made available to managed values CONFIG_KEY_RESUMING = "__pregel_resuming" # holds a boolean indicating if subgraphs should resume from a previous checkpoint +CONFIG_KEY_GRAPH_COUNT = "__pregel_graph_count" +# holds the number of subgraphs executed in a given task, used to raise errors CONFIG_KEY_TASK_ID = "__pregel_task_id" # holds the task ID for the current task CONFIG_KEY_DEDUPE_TASKS = "__pregel_dedupe_tasks" diff --git a/libs/langgraph/langgraph/errors.py b/libs/langgraph/langgraph/errors.py index 08bed5927..c7c5a518a 100644 --- a/libs/langgraph/langgraph/errors.py +++ b/libs/langgraph/langgraph/errors.py @@ -1,7 +1,7 @@ from typing import Any, Sequence from langgraph.checkpoint.base import EmptyChannelError # noqa: F401 -from langgraph.constants import Interrupt +from langgraph.types import Interrupt # EmptyChannelError re-exported for backwards compatibility diff --git a/libs/langgraph/langgraph/graph/graph.py b/libs/langgraph/langgraph/graph/graph.py index c5a043ee7..a5a4db3ac 100644 --- a/libs/langgraph/langgraph/graph/graph.py +++ b/libs/langgraph/langgraph/graph/graph.py @@ -38,8 +38,8 @@ from langgraph.errors import InvalidUpdateError from langgraph.pregel import Channel, Pregel from langgraph.pregel.read import PregelNode -from langgraph.pregel.types import All from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry +from langgraph.types import All from langgraph.utils.runnable import RunnableCallable, coerce_to_runnable logger = logging.getLogger(__name__) diff --git a/libs/langgraph/langgraph/graph/state.py b/libs/langgraph/langgraph/graph/state.py index cee0fb849..4fdff2a49 100644 --- a/libs/langgraph/langgraph/graph/state.py +++ b/libs/langgraph/langgraph/graph/state.py @@ -45,9 +45,9 @@ is_writable_managed_value, ) from langgraph.pregel.read import ChannelRead, PregelNode -from langgraph.pregel.types import All, RetryPolicy from langgraph.pregel.write import SKIP_WRITE, ChannelWrite, ChannelWriteEntry from langgraph.store.base import BaseStore +from langgraph.types import All, RetryPolicy from langgraph.utils.fields import get_field_default from langgraph.utils.pydantic import create_model from langgraph.utils.runnable import coerce_to_runnable diff --git a/libs/langgraph/langgraph/pregel/__init__.py b/libs/langgraph/langgraph/pregel/__init__.py index f1a9aa578..c9fec2e57 100644 --- a/libs/langgraph/langgraph/pregel/__init__.py +++ b/libs/langgraph/langgraph/pregel/__init__.py @@ -83,11 +83,11 @@ from langgraph.pregel.read import PregelNode from langgraph.pregel.retry import RetryPolicy from langgraph.pregel.runner import PregelRunner -from langgraph.pregel.types import All, StateSnapshot, StreamMode from langgraph.pregel.utils import get_new_channel_versions from langgraph.pregel.validate import validate_graph, validate_keys from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry from langgraph.store.base import BaseStore +from langgraph.types import All, StateSnapshot, StreamMode from langgraph.utils.config import ( ensure_config, merge_configs, diff --git a/libs/langgraph/langgraph/pregel/algo.py b/libs/langgraph/langgraph/pregel/algo.py index 44da0bbd5..f9b0096c8 100644 --- a/libs/langgraph/langgraph/pregel/algo.py +++ b/libs/langgraph/langgraph/pregel/algo.py @@ -51,7 +51,7 @@ from langgraph.pregel.log import logger from langgraph.pregel.manager import ChannelsManager from langgraph.pregel.read import PregelNode -from langgraph.pregel.types import All, PregelExecutableTask, PregelTask +from langgraph.types import All, PregelExecutableTask, PregelTask from langgraph.utils.config import merge_configs, patch_config GetNextVersion = Callable[[Optional[V], BaseChannel], V] diff --git a/libs/langgraph/langgraph/pregel/debug.py b/libs/langgraph/langgraph/pregel/debug.py index 9c4661c0c..982182842 100644 --- a/libs/langgraph/langgraph/pregel/debug.py +++ b/libs/langgraph/langgraph/pregel/debug.py @@ -22,7 +22,7 @@ from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, PendingWrite from langgraph.constants import ERROR, INTERRUPT, TAG_HIDDEN from langgraph.pregel.io import read_channels -from langgraph.pregel.types import PregelExecutableTask, PregelTask, StateSnapshot +from langgraph.types import PregelExecutableTask, PregelTask, StateSnapshot class TaskPayload(TypedDict): diff --git a/libs/langgraph/langgraph/pregel/io.py b/libs/langgraph/langgraph/pregel/io.py index 6542b1d91..ef9822641 100644 --- a/libs/langgraph/langgraph/pregel/io.py +++ b/libs/langgraph/langgraph/pregel/io.py @@ -5,7 +5,7 @@ from langgraph.channels.base import BaseChannel, EmptyChannelError from langgraph.constants import EMPTY_SEQ, ERROR, INTERRUPT, TAG_HIDDEN from langgraph.pregel.log import logger -from langgraph.pregel.types import PregelExecutableTask +from langgraph.types import PregelExecutableTask def read_channel( diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index c97cc8a69..7eec59575 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -94,10 +94,10 @@ ) from langgraph.pregel.manager import AsyncChannelsManager, ChannelsManager from langgraph.pregel.read import PregelNode -from langgraph.pregel.types import All, PregelExecutableTask, StreamMode from langgraph.pregel.utils import get_new_channel_versions from langgraph.store.base import BaseStore from langgraph.store.batch import AsyncBatchedStore +from langgraph.types import All, PregelExecutableTask, StreamMode from langgraph.utils.config import patch_configurable V = TypeVar("V") diff --git a/libs/langgraph/langgraph/pregel/retry.py b/libs/langgraph/langgraph/pregel/retry.py index 90ccaa7d0..476b8ef32 100644 --- a/libs/langgraph/langgraph/pregel/retry.py +++ b/libs/langgraph/langgraph/pregel/retry.py @@ -4,9 +4,9 @@ import time from typing import Optional, Sequence -from langgraph.constants import CONFIG_KEY_RESUMING +from langgraph.constants import CONFIG_KEY_GRAPH_COUNT, CONFIG_KEY_RESUMING from langgraph.errors import GraphInterrupt -from langgraph.pregel.types import PregelExecutableTask, RetryPolicy +from langgraph.types import PregelExecutableTask, RetryPolicy from langgraph.utils.config import patch_configurable logger = logging.getLogger(__name__) @@ -70,7 +70,9 @@ def run_with_retry( exc_info=exc, ) # signal subgraphs to resume (if available) - config = patch_configurable(config, {CONFIG_KEY_RESUMING: True}) + config = patch_configurable( + config, {CONFIG_KEY_RESUMING: True, CONFIG_KEY_GRAPH_COUNT: 0} + ) async def arun_with_retry( @@ -136,4 +138,6 @@ async def arun_with_retry( exc_info=exc, ) # signal subgraphs to resume (if available) - config = patch_configurable(config, {CONFIG_KEY_RESUMING: True}) + config = patch_configurable( + config, {CONFIG_KEY_RESUMING: True, CONFIG_KEY_GRAPH_COUNT: 0} + ) diff --git a/libs/langgraph/langgraph/pregel/runner.py b/libs/langgraph/langgraph/pregel/runner.py index 6ba72e6ad..b8392b613 100644 --- a/libs/langgraph/langgraph/pregel/runner.py +++ b/libs/langgraph/langgraph/pregel/runner.py @@ -18,7 +18,7 @@ from langgraph.errors import GraphDelegate, GraphInterrupt from langgraph.pregel.executor import Submit from langgraph.pregel.retry import arun_with_retry, run_with_retry -from langgraph.pregel.types import PregelExecutableTask, RetryPolicy +from langgraph.types import PregelExecutableTask, RetryPolicy class PregelRunner: diff --git a/libs/langgraph/langgraph/pregel/types.py b/libs/langgraph/langgraph/pregel/types.py index 4cca9946c..7a72b88c9 100644 --- a/libs/langgraph/langgraph/pregel/types.py +++ b/libs/langgraph/langgraph/pregel/types.py @@ -1,200 +1,25 @@ -from collections import deque -from dataclasses import dataclass -from typing import Any, Callable, Literal, NamedTuple, Optional, Sequence, Type, Union - -from langchain_core.runnables import Runnable, RunnableConfig - -from langgraph.checkpoint.base import CheckpointMetadata - -All = Literal["*"] - -StreamMode = Literal["values", "updates", "debug", "messages", "custom"] -"""How the stream method should emit outputs. - -- 'values': Emit all values of the state for each step. -- 'updates': Emit only the node name(s) and updates - that were returned by the node(s) **after** each step. -- 'debug': Emit debug events for each step. -- 'messages': Emit LLM messages token-by-token. -- 'custom': Emit custom output `write: StreamWriter` kwarg of each node. -""" - -StreamWriter = Callable[[Any], None] -"""Callable that accepts a single argument and writes it to the output stream. -Always injected into nodes if requested as a keyword argument, but it's a no-op -when not using stream_mode="custom".""" - - -def default_retry_on(exc: Exception) -> bool: - import httpx - import requests - - if isinstance(exc, ConnectionError): - return True - if isinstance( - exc, - ( - ValueError, - TypeError, - ArithmeticError, - ImportError, - LookupError, - NameError, - SyntaxError, - RuntimeError, - ReferenceError, - StopIteration, - StopAsyncIteration, - OSError, - ), - ): - return False - if isinstance(exc, httpx.HTTPStatusError): - return 500 <= exc.response.status_code < 600 - if isinstance(exc, requests.HTTPError): - return 500 <= exc.response.status_code < 600 if exc.response else True - return True - - -class RetryPolicy(NamedTuple): - """Configuration for retrying nodes.""" - - initial_interval: float = 0.5 - """Amount of time that must elapse before the first retry occurs. In seconds.""" - backoff_factor: float = 2.0 - """Multiplier by which the interval increases after each retry.""" - max_interval: float = 128.0 - """Maximum amount of time that may elapse between retries. In seconds.""" - max_attempts: int = 3 - """Maximum number of attempts to make before giving up, including the first.""" - jitter: bool = True - """Whether to add random jitter to the interval between retries.""" - retry_on: Union[ - Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool] - ] = default_retry_on - """List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry.""" - - -class CachePolicy(NamedTuple): - """Configuration for caching nodes.""" - - pass - - -@dataclass -class Interrupt: - value: Any - when: Literal["during"] = "during" - - -class PregelTask(NamedTuple): - id: str - name: str - path: tuple[Union[str, int], ...] - error: Optional[Exception] = None - interrupts: tuple[Interrupt, ...] = () - state: Union[None, RunnableConfig, "StateSnapshot"] = None - - -class PregelExecutableTask(NamedTuple): - name: str - input: Any - proc: Runnable - writes: deque[tuple[str, Any]] - config: RunnableConfig - triggers: list[str] - retry_policy: Optional[RetryPolicy] - cache_policy: Optional[CachePolicy] - id: str - path: tuple[Union[str, int], ...] - scheduled: bool = False - - -class StateSnapshot(NamedTuple): - """Snapshot of the state of the graph at the beginning of a step.""" - - values: Union[dict[str, Any], Any] - """Current values of channels""" - next: tuple[str, ...] - """The name of the node to execute in each task for this step.""" - config: RunnableConfig - """Config used to fetch this snapshot""" - metadata: Optional[CheckpointMetadata] - """Metadata associated with this snapshot""" - created_at: Optional[str] - """Timestamp of snapshot creation""" - parent_config: Optional[RunnableConfig] - """Config used to fetch the parent snapshot, if any""" - tasks: tuple[PregelTask, ...] - """Tasks to execute in this step. If already attempted, may contain an error.""" - - -class Send: - """A message or packet to send to a specific node in the graph. - - The `Send` class is used within a `StateGraph`'s conditional edges to - dynamically invoke a node with a custom state at the next step. - - Importantly, the sent state can differ from the core graph's state, - allowing for flexible and dynamic workflow management. - - One such example is a "map-reduce" workflow where your graph invokes - the same node multiple times in parallel with different states, - before aggregating the results back into the main graph's state. - - Attributes: - node (str): The name of the target node to send the message to. - arg (Any): The state or message to send to the target node. - - Examples: - >>> from typing import Annotated - >>> import operator - >>> class OverallState(TypedDict): - ... subjects: list[str] - ... jokes: Annotated[list[str], operator.add] - ... - >>> from langgraph.constants import Send - >>> from langgraph.graph import END, START - >>> def continue_to_jokes(state: OverallState): - ... return [Send("generate_joke", {"subject": s}) for s in state['subjects']] - ... - >>> from langgraph.graph import StateGraph - >>> builder = StateGraph(OverallState) - >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]}) - >>> builder.add_conditional_edges(START, continue_to_jokes) - >>> builder.add_edge("generate_joke", END) - >>> graph = builder.compile() - >>> - >>> # Invoking with two subjects results in a generated joke for each - >>> graph.invoke({"subjects": ["cats", "dogs"]}) - {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']} - """ - - __slots__ = ("node", "arg") - - node: str - arg: Any - - def __init__(self, /, node: str, arg: Any) -> None: - """ - Initialize a new instance of the Send class. - - Args: - node (str): The name of the target node to send the message to. - arg (Any): The state or message to send to the target node. - """ - self.node = node - self.arg = arg - - def __hash__(self) -> int: - return hash((self.node, self.arg)) - - def __repr__(self) -> str: - return f"Send(node={self.node!r}, arg={self.arg!r})" - - def __eq__(self, value: object) -> bool: - return ( - isinstance(value, Send) - and self.node == value.node - and self.arg == value.arg - ) +"""Re-export types moved to langgraph.types""" + +from langgraph.types import ( + All, + CachePolicy, + PregelExecutableTask, + PregelTask, + RetryPolicy, + StateSnapshot, + StreamMode, + StreamWriter, + default_retry_on, +) + +__all__ = [ + "All", + "CachePolicy", + "PregelExecutableTask", + "PregelTask", + "RetryPolicy", + "StateSnapshot", + "StreamMode", + "StreamWriter", + "default_retry_on", +] diff --git a/libs/langgraph/langgraph/pregel/validate.py b/libs/langgraph/langgraph/pregel/validate.py index 965fab54e..cf957dc07 100644 --- a/libs/langgraph/langgraph/pregel/validate.py +++ b/libs/langgraph/langgraph/pregel/validate.py @@ -3,7 +3,7 @@ from langgraph.channels.base import BaseChannel from langgraph.constants import RESERVED from langgraph.pregel.read import PregelNode -from langgraph.pregel.types import All +from langgraph.types import All def validate_graph( diff --git a/libs/langgraph/langgraph/types.py b/libs/langgraph/langgraph/types.py new file mode 100644 index 000000000..4cca9946c --- /dev/null +++ b/libs/langgraph/langgraph/types.py @@ -0,0 +1,200 @@ +from collections import deque +from dataclasses import dataclass +from typing import Any, Callable, Literal, NamedTuple, Optional, Sequence, Type, Union + +from langchain_core.runnables import Runnable, RunnableConfig + +from langgraph.checkpoint.base import CheckpointMetadata + +All = Literal["*"] + +StreamMode = Literal["values", "updates", "debug", "messages", "custom"] +"""How the stream method should emit outputs. + +- 'values': Emit all values of the state for each step. +- 'updates': Emit only the node name(s) and updates + that were returned by the node(s) **after** each step. +- 'debug': Emit debug events for each step. +- 'messages': Emit LLM messages token-by-token. +- 'custom': Emit custom output `write: StreamWriter` kwarg of each node. +""" + +StreamWriter = Callable[[Any], None] +"""Callable that accepts a single argument and writes it to the output stream. +Always injected into nodes if requested as a keyword argument, but it's a no-op +when not using stream_mode="custom".""" + + +def default_retry_on(exc: Exception) -> bool: + import httpx + import requests + + if isinstance(exc, ConnectionError): + return True + if isinstance( + exc, + ( + ValueError, + TypeError, + ArithmeticError, + ImportError, + LookupError, + NameError, + SyntaxError, + RuntimeError, + ReferenceError, + StopIteration, + StopAsyncIteration, + OSError, + ), + ): + return False + if isinstance(exc, httpx.HTTPStatusError): + return 500 <= exc.response.status_code < 600 + if isinstance(exc, requests.HTTPError): + return 500 <= exc.response.status_code < 600 if exc.response else True + return True + + +class RetryPolicy(NamedTuple): + """Configuration for retrying nodes.""" + + initial_interval: float = 0.5 + """Amount of time that must elapse before the first retry occurs. In seconds.""" + backoff_factor: float = 2.0 + """Multiplier by which the interval increases after each retry.""" + max_interval: float = 128.0 + """Maximum amount of time that may elapse between retries. In seconds.""" + max_attempts: int = 3 + """Maximum number of attempts to make before giving up, including the first.""" + jitter: bool = True + """Whether to add random jitter to the interval between retries.""" + retry_on: Union[ + Type[Exception], Sequence[Type[Exception]], Callable[[Exception], bool] + ] = default_retry_on + """List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry.""" + + +class CachePolicy(NamedTuple): + """Configuration for caching nodes.""" + + pass + + +@dataclass +class Interrupt: + value: Any + when: Literal["during"] = "during" + + +class PregelTask(NamedTuple): + id: str + name: str + path: tuple[Union[str, int], ...] + error: Optional[Exception] = None + interrupts: tuple[Interrupt, ...] = () + state: Union[None, RunnableConfig, "StateSnapshot"] = None + + +class PregelExecutableTask(NamedTuple): + name: str + input: Any + proc: Runnable + writes: deque[tuple[str, Any]] + config: RunnableConfig + triggers: list[str] + retry_policy: Optional[RetryPolicy] + cache_policy: Optional[CachePolicy] + id: str + path: tuple[Union[str, int], ...] + scheduled: bool = False + + +class StateSnapshot(NamedTuple): + """Snapshot of the state of the graph at the beginning of a step.""" + + values: Union[dict[str, Any], Any] + """Current values of channels""" + next: tuple[str, ...] + """The name of the node to execute in each task for this step.""" + config: RunnableConfig + """Config used to fetch this snapshot""" + metadata: Optional[CheckpointMetadata] + """Metadata associated with this snapshot""" + created_at: Optional[str] + """Timestamp of snapshot creation""" + parent_config: Optional[RunnableConfig] + """Config used to fetch the parent snapshot, if any""" + tasks: tuple[PregelTask, ...] + """Tasks to execute in this step. If already attempted, may contain an error.""" + + +class Send: + """A message or packet to send to a specific node in the graph. + + The `Send` class is used within a `StateGraph`'s conditional edges to + dynamically invoke a node with a custom state at the next step. + + Importantly, the sent state can differ from the core graph's state, + allowing for flexible and dynamic workflow management. + + One such example is a "map-reduce" workflow where your graph invokes + the same node multiple times in parallel with different states, + before aggregating the results back into the main graph's state. + + Attributes: + node (str): The name of the target node to send the message to. + arg (Any): The state or message to send to the target node. + + Examples: + >>> from typing import Annotated + >>> import operator + >>> class OverallState(TypedDict): + ... subjects: list[str] + ... jokes: Annotated[list[str], operator.add] + ... + >>> from langgraph.constants import Send + >>> from langgraph.graph import END, START + >>> def continue_to_jokes(state: OverallState): + ... return [Send("generate_joke", {"subject": s}) for s in state['subjects']] + ... + >>> from langgraph.graph import StateGraph + >>> builder = StateGraph(OverallState) + >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]}) + >>> builder.add_conditional_edges(START, continue_to_jokes) + >>> builder.add_edge("generate_joke", END) + >>> graph = builder.compile() + >>> + >>> # Invoking with two subjects results in a generated joke for each + >>> graph.invoke({"subjects": ["cats", "dogs"]}) + {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']} + """ + + __slots__ = ("node", "arg") + + node: str + arg: Any + + def __init__(self, /, node: str, arg: Any) -> None: + """ + Initialize a new instance of the Send class. + + Args: + node (str): The name of the target node to send the message to. + arg (Any): The state or message to send to the target node. + """ + self.node = node + self.arg = arg + + def __hash__(self) -> int: + return hash((self.node, self.arg)) + + def __repr__(self) -> str: + return f"Send(node={self.node!r}, arg={self.arg!r})" + + def __eq__(self, value: object) -> bool: + return ( + isinstance(value, Send) + and self.node == value.node + and self.arg == value.arg + ) diff --git a/libs/langgraph/langgraph/utils/runnable.py b/libs/langgraph/langgraph/utils/runnable.py index bfc2a627b..d81f86e34 100644 --- a/libs/langgraph/langgraph/utils/runnable.py +++ b/libs/langgraph/langgraph/utils/runnable.py @@ -35,7 +35,7 @@ from typing_extensions import TypeGuard from langgraph.constants import CONFIG_KEY_STREAM_WRITER -from langgraph.pregel.types import StreamWriter +from langgraph.types import StreamWriter from langgraph.utils.config import ( ensure_config, get_async_callback_manager_for_config, diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index 3e43ef28f..eb88ab189 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -70,8 +70,8 @@ StateSnapshot, ) from langgraph.pregel.retry import RetryPolicy -from langgraph.pregel.types import PregelTask, StreamWriter from langgraph.store.memory import MemoryStore +from langgraph.types import PregelTask, StreamWriter from tests.any_str import AnyDict, AnyStr, AnyVersion, UnsortedSequence from tests.conftest import ALL_CHECKPOINTERS_SYNC, SHOULD_CHECK_SNAPSHOTS from tests.fake_chat import FakeChatModel diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index 640658399..cee63f469 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -68,8 +68,8 @@ StateSnapshot, ) from langgraph.pregel.retry import RetryPolicy -from langgraph.pregel.types import PregelTask, StreamWriter from langgraph.store.memory import MemoryStore +from langgraph.types import PregelTask, StreamWriter from tests.any_str import AnyDict, AnyStr, AnyVersion, UnsortedSequence from tests.conftest import ( ALL_CHECKPOINTERS_ASYNC, diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py index 9cbb6bf83..c803239e8 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py @@ -25,7 +25,6 @@ ) from langgraph.pregel.manager import AsyncChannelsManager, ChannelsManager from langgraph.pregel.runner import PregelRunner -from langgraph.pregel.types import RetryPolicy from langgraph.scheduler.kafka.retry import aretry, retry from langgraph.scheduler.kafka.types import ( AsyncConsumer, @@ -38,6 +37,7 @@ Sendable, Topics, ) +from langgraph.types import RetryPolicy from langgraph.utils.config import patch_configurable diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py index 1ad9c5c5b..097429bb6 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py @@ -24,7 +24,6 @@ from langgraph.pregel import Pregel from langgraph.pregel.executor import BackgroundExecutor, Submit from langgraph.pregel.loop import AsyncPregelLoop, SyncPregelLoop -from langgraph.pregel.types import RetryPolicy from langgraph.scheduler.kafka.retry import aretry, retry from langgraph.scheduler.kafka.types import ( AsyncConsumer, @@ -37,6 +36,7 @@ Producer, Topics, ) +from langgraph.types import RetryPolicy from langgraph.utils.config import patch_configurable diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/retry.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/retry.py index bb80047f8..74dbe3e27 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/retry.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/retry.py @@ -6,7 +6,7 @@ from typing_extensions import ParamSpec -from langgraph.pregel.types import RetryPolicy +from langgraph.types import RetryPolicy logger = logging.getLogger(__name__) P = ParamSpec("P") From 30b631aaff2507a9a48fd663ac3f43c87be6416f Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:41:05 -0700 Subject: [PATCH 06/14] Rewrite imports --- docs/docs/how-tos/map-reduce.ipynb | 558 +++++++++++----------- docs/docs/reference/graphs.md | 2 +- libs/langgraph/langgraph/types.py | 2 +- libs/langgraph/tests/test_pregel.py | 4 +- libs/langgraph/tests/test_pregel_async.py | 4 +- libs/scheduler-kafka/README.md | 2 +- 6 files changed, 286 insertions(+), 286 deletions(-) diff --git a/docs/docs/how-tos/map-reduce.ipynb b/docs/docs/how-tos/map-reduce.ipynb index 51a2eb14d..fb00f0bfb 100644 --- a/docs/docs/how-tos/map-reduce.ipynb +++ b/docs/docs/how-tos/map-reduce.ipynb @@ -1,286 +1,286 @@ { - "cells": [ - { - "attachments": { - "a108ffc8-6136-4cd7-a6f9-579e41a5a786.png": { - "image/png": "" - } - }, - "cell_type": "markdown", - "id": "95a87145-34d0-4f97-b45f-5c9fd8532c8a", - "metadata": {}, - "source": [ - "# How to create map-reduce branches for parallel execution\n", - "\n", - "[Map-reduce](https://en.wikipedia.org/wiki/MapReduce) operations are essential for efficient task decomposition and parallel processing. This approach involves breaking a task into smaller sub-tasks, processing each sub-task in parallel, and aggregating the results across all of the completed sub-tasks. \n", - "\n", - "Consider this example: given a general topic from the user, generate a list of related subjects, generate a joke for each subject, and select the best joke from the resulting list. In this design pattern, a first node may generate a list of objects (e.g., related subjects) and we want to apply some other node (e.g., generate a joke) to all those objects (e.g., subjects). However, two main challenges arise.\n", - " \n", - "(1) the number of objects (e.g., subjects) may be unknown ahead of time (meaning the number of edges may not be known) when we lay out the graph and (2) the input State to the downstream Node should be different (one for each generated object).\n", - " \n", - "LangGraph addresses these challenges [through its `Send` API](https://langchain-ai.github.io/langgraph/concepts/low_level/#send). By utilizing conditional edges, `Send` can distribute different states (e.g., subjects) to multiple instances of a node (e.g., joke generation). Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management. \n", - "\n", - "![Screenshot 2024-07-12 at 9.45.40 AM.png](attachment:a108ffc8-6136-4cd7-a6f9-579e41a5a786.png)" - ] - }, - { - "cell_type": "markdown", - "id": "66c58b5f", - "metadata": {}, - "source": [ - "## Setup\n", - "\n", - "First, let's install the required packages and set our API keys" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "3eb04cd1", - "metadata": {}, - "outputs": [], - "source": [ - "%%capture --no-stderr\n", - "%pip install -U langchain-anthropic langgraph" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "dc292321", - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "import getpass\n", - "\n", - "\n", - "def _set_env(name: str):\n", - " if not os.getenv(name):\n", - " os.environ[name] = getpass.getpass(f\"{name}: \")\n", - "\n", - "\n", - "_set_env(\"ANTHROPIC_API_KEY\")" - ] - }, - { - "cell_type": "markdown", - "id": "b87911bb", - "metadata": {}, - "source": [ - "
\n", - "

Set up LangSmith for LangGraph development

\n", - "

\n", - " Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here. \n", - "

\n", - "
" - ] - }, - { - "cell_type": "markdown", - "id": "b4e782a0", - "metadata": {}, - "source": [ - "## Define the graph" - ] - }, - { - "cell_type": "markdown", - "id": "66803b55", - "metadata": {}, - "source": [ - "
\n", - "

Using Pydantic with LangChain

\n", - "

\n", - " This notebook uses Pydantic v2 BaseModel, which requires langchain-core >= 0.3. Using langchain-core < 0.3 will result in errors due to mixing of Pydantic v1 and v2 BaseModels.\n", - "

\n", - "
" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "0f0f78e4-423d-4e2d-aa1a-01efaec4715f", - "metadata": {}, - "outputs": [], - "source": [ - "import operator\n", - "from typing import Annotated, TypedDict\n", - "\n", - "from langchain_anthropic import ChatAnthropic\n", - "\n", - "from langgraph.constants import Send\n", - "from langgraph.graph import END, StateGraph, START\n", - "\n", - "from pydantic import BaseModel, Field\n", - "\n", - "# Model and prompts\n", - "# Define model and prompts we will use\n", - "subjects_prompt = \"\"\"Generate a comma separated list of between 2 and 5 examples related to: {topic}.\"\"\"\n", - "joke_prompt = \"\"\"Generate a joke about {subject}\"\"\"\n", - "best_joke_prompt = \"\"\"Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.\n", - "\n", - "{jokes}\"\"\"\n", - "\n", - "\n", - "class Subjects(BaseModel):\n", - " subjects: list[str]\n", - "\n", - "\n", - "class Joke(BaseModel):\n", - " joke: str\n", - "\n", - "\n", - "class BestJoke(BaseModel):\n", - " id: int = Field(description=\"Index of the best joke, starting with 0\", ge=0)\n", - "\n", - "\n", - "model = ChatAnthropic(model=\"claude-3-5-sonnet-20240620\")\n", - "\n", - "# Graph components: define the components that will make up the graph\n", - "\n", - "\n", - "# This will be the overall state of the main graph.\n", - "# It will contain a topic (which we expect the user to provide)\n", - "# and then will generate a list of subjects, and then a joke for\n", - "# each subject\n", - "class OverallState(TypedDict):\n", - " topic: str\n", - " subjects: list\n", - " # Notice here we use the operator.add\n", - " # This is because we want combine all the jokes we generate\n", - " # from individual nodes back into one list - this is essentially\n", - " # the \"reduce\" part\n", - " jokes: Annotated[list, operator.add]\n", - " best_selected_joke: str\n", - "\n", - "\n", - "# This will be the state of the node that we will \"map\" all\n", - "# subjects to in order to generate a joke\n", - "class JokeState(TypedDict):\n", - " subject: str\n", - "\n", - "\n", - "# This is the function we will use to generate the subjects of the jokes\n", - "def generate_topics(state: OverallState):\n", - " prompt = subjects_prompt.format(topic=state[\"topic\"])\n", - " response = model.with_structured_output(Subjects).invoke(prompt)\n", - " return {\"subjects\": response.subjects}\n", - "\n", - "\n", - "# Here we generate a joke, given a subject\n", - "def generate_joke(state: JokeState):\n", - " prompt = joke_prompt.format(subject=state[\"subject\"])\n", - " response = model.with_structured_output(Joke).invoke(prompt)\n", - " return {\"jokes\": [response.joke]}\n", - "\n", - "\n", - "# Here we define the logic to map out over the generated subjects\n", - "# We will use this an edge in the graph\n", - "def continue_to_jokes(state: OverallState):\n", - " # We will return a list of `Send` objects\n", - " # Each `Send` object consists of the name of a node in the graph\n", - " # as well as the state to send to that node\n", - " return [Send(\"generate_joke\", {\"subject\": s}) for s in state[\"subjects\"]]\n", - "\n", - "\n", - "# Here we will judge the best joke\n", - "def best_joke(state: OverallState):\n", - " jokes = \"\\n\\n\".join(state[\"jokes\"])\n", - " prompt = best_joke_prompt.format(topic=state[\"topic\"], jokes=jokes)\n", - " response = model.with_structured_output(BestJoke).invoke(prompt)\n", - " return {\"best_selected_joke\": state[\"jokes\"][response.id]}\n", - "\n", - "\n", - "# Construct the graph: here we put everything together to construct our graph\n", - "graph = StateGraph(OverallState)\n", - "graph.add_node(\"generate_topics\", generate_topics)\n", - "graph.add_node(\"generate_joke\", generate_joke)\n", - "graph.add_node(\"best_joke\", best_joke)\n", - "graph.add_edge(START, \"generate_topics\")\n", - "graph.add_conditional_edges(\"generate_topics\", continue_to_jokes, [\"generate_joke\"])\n", - "graph.add_edge(\"generate_joke\", \"best_joke\")\n", - "graph.add_edge(\"best_joke\", END)\n", - "app = graph.compile()" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "37ed1f71-63db-416f-b715-4617b33d4b7f", - "metadata": {}, - "outputs": [ + "cells": [ { - "data": { - "image/jpeg": "", - "text/plain": [ - "" + "attachments": { + "a108ffc8-6136-4cd7-a6f9-579e41a5a786.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "id": "95a87145-34d0-4f97-b45f-5c9fd8532c8a", + "metadata": {}, + "source": [ + "# How to create map-reduce branches for parallel execution\n", + "\n", + "[Map-reduce](https://en.wikipedia.org/wiki/MapReduce) operations are essential for efficient task decomposition and parallel processing. This approach involves breaking a task into smaller sub-tasks, processing each sub-task in parallel, and aggregating the results across all of the completed sub-tasks. \n", + "\n", + "Consider this example: given a general topic from the user, generate a list of related subjects, generate a joke for each subject, and select the best joke from the resulting list. In this design pattern, a first node may generate a list of objects (e.g., related subjects) and we want to apply some other node (e.g., generate a joke) to all those objects (e.g., subjects). However, two main challenges arise.\n", + " \n", + "(1) the number of objects (e.g., subjects) may be unknown ahead of time (meaning the number of edges may not be known) when we lay out the graph and (2) the input State to the downstream Node should be different (one for each generated object).\n", + " \n", + "LangGraph addresses these challenges [through its `Send` API](https://langchain-ai.github.io/langgraph/concepts/low_level/#send). By utilizing conditional edges, `Send` can distribute different states (e.g., subjects) to multiple instances of a node (e.g., joke generation). Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management. \n", + "\n", + "![Screenshot 2024-07-12 at 9.45.40 AM.png](attachment:a108ffc8-6136-4cd7-a6f9-579e41a5a786.png)" ] - }, - "execution_count": 2, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "from IPython.display import Image\n", - "\n", - "Image(app.get_graph().draw_mermaid_png())" - ] - }, - { - "cell_type": "markdown", - "id": "4a0026d8", - "metadata": {}, - "source": [ - "## Use the graph" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "fd90cace", - "metadata": {}, - "outputs": [ + }, + { + "cell_type": "markdown", + "id": "66c58b5f", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "First, let's install the required packages and set our API keys" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "3eb04cd1", + "metadata": {}, + "outputs": [], + "source": [ + "%%capture --no-stderr\n", + "%pip install -U langchain-anthropic langgraph" + ] + }, { - "name": "stdout", - "output_type": "stream", - "text": [ - "{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}}\n", - "{'generate_joke': {'jokes': [\"Why don't elephants use computers? They're afraid of the mouse!\"]}}\n", - "{'generate_joke': {'jokes': [\"Why don't dolphins use smartphones? Because they're afraid of phishing!\"]}}\n", - "{'generate_joke': {'jokes': [\"Why don't you see penguins in Britain? Because they're afraid of Wales!\"]}}\n", - "{'generate_joke': {'jokes': [\"Why don't lions like fast food? Because they can't catch it!\"]}}\n", - "{'best_joke': {'best_selected_joke': \"Why don't dolphins use smartphones? Because they're afraid of phishing!\"}}\n" - ] + "cell_type": "code", + "execution_count": null, + "id": "dc292321", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import getpass\n", + "\n", + "\n", + "def _set_env(name: str):\n", + " if not os.getenv(name):\n", + " os.environ[name] = getpass.getpass(f\"{name}: \")\n", + "\n", + "\n", + "_set_env(\"ANTHROPIC_API_KEY\")" + ] + }, + { + "cell_type": "markdown", + "id": "b87911bb", + "metadata": {}, + "source": [ + "
\n", + "

Set up LangSmith for LangGraph development

\n", + "

\n", + " Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here. \n", + "

\n", + "
" + ] + }, + { + "cell_type": "markdown", + "id": "b4e782a0", + "metadata": {}, + "source": [ + "## Define the graph" + ] + }, + { + "cell_type": "markdown", + "id": "66803b55", + "metadata": {}, + "source": [ + "
\n", + "

Using Pydantic with LangChain

\n", + "

\n", + " This notebook uses Pydantic v2 BaseModel, which requires langchain-core >= 0.3. Using langchain-core < 0.3 will result in errors due to mixing of Pydantic v1 and v2 BaseModels.\n", + "

\n", + "
" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "0f0f78e4-423d-4e2d-aa1a-01efaec4715f", + "metadata": {}, + "outputs": [], + "source": [ + "import operator\n", + "from typing import Annotated, TypedDict\n", + "\n", + "from langchain_anthropic import ChatAnthropic\n", + "\n", + "from langgraph.types import Send\n", + "from langgraph.graph import END, StateGraph, START\n", + "\n", + "from pydantic import BaseModel, Field\n", + "\n", + "# Model and prompts\n", + "# Define model and prompts we will use\n", + "subjects_prompt = \"\"\"Generate a comma separated list of between 2 and 5 examples related to: {topic}.\"\"\"\n", + "joke_prompt = \"\"\"Generate a joke about {subject}\"\"\"\n", + "best_joke_prompt = \"\"\"Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.\n", + "\n", + "{jokes}\"\"\"\n", + "\n", + "\n", + "class Subjects(BaseModel):\n", + " subjects: list[str]\n", + "\n", + "\n", + "class Joke(BaseModel):\n", + " joke: str\n", + "\n", + "\n", + "class BestJoke(BaseModel):\n", + " id: int = Field(description=\"Index of the best joke, starting with 0\", ge=0)\n", + "\n", + "\n", + "model = ChatAnthropic(model=\"claude-3-5-sonnet-20240620\")\n", + "\n", + "# Graph components: define the components that will make up the graph\n", + "\n", + "\n", + "# This will be the overall state of the main graph.\n", + "# It will contain a topic (which we expect the user to provide)\n", + "# and then will generate a list of subjects, and then a joke for\n", + "# each subject\n", + "class OverallState(TypedDict):\n", + " topic: str\n", + " subjects: list\n", + " # Notice here we use the operator.add\n", + " # This is because we want combine all the jokes we generate\n", + " # from individual nodes back into one list - this is essentially\n", + " # the \"reduce\" part\n", + " jokes: Annotated[list, operator.add]\n", + " best_selected_joke: str\n", + "\n", + "\n", + "# This will be the state of the node that we will \"map\" all\n", + "# subjects to in order to generate a joke\n", + "class JokeState(TypedDict):\n", + " subject: str\n", + "\n", + "\n", + "# This is the function we will use to generate the subjects of the jokes\n", + "def generate_topics(state: OverallState):\n", + " prompt = subjects_prompt.format(topic=state[\"topic\"])\n", + " response = model.with_structured_output(Subjects).invoke(prompt)\n", + " return {\"subjects\": response.subjects}\n", + "\n", + "\n", + "# Here we generate a joke, given a subject\n", + "def generate_joke(state: JokeState):\n", + " prompt = joke_prompt.format(subject=state[\"subject\"])\n", + " response = model.with_structured_output(Joke).invoke(prompt)\n", + " return {\"jokes\": [response.joke]}\n", + "\n", + "\n", + "# Here we define the logic to map out over the generated subjects\n", + "# We will use this an edge in the graph\n", + "def continue_to_jokes(state: OverallState):\n", + " # We will return a list of `Send` objects\n", + " # Each `Send` object consists of the name of a node in the graph\n", + " # as well as the state to send to that node\n", + " return [Send(\"generate_joke\", {\"subject\": s}) for s in state[\"subjects\"]]\n", + "\n", + "\n", + "# Here we will judge the best joke\n", + "def best_joke(state: OverallState):\n", + " jokes = \"\\n\\n\".join(state[\"jokes\"])\n", + " prompt = best_joke_prompt.format(topic=state[\"topic\"], jokes=jokes)\n", + " response = model.with_structured_output(BestJoke).invoke(prompt)\n", + " return {\"best_selected_joke\": state[\"jokes\"][response.id]}\n", + "\n", + "\n", + "# Construct the graph: here we put everything together to construct our graph\n", + "graph = StateGraph(OverallState)\n", + "graph.add_node(\"generate_topics\", generate_topics)\n", + "graph.add_node(\"generate_joke\", generate_joke)\n", + "graph.add_node(\"best_joke\", best_joke)\n", + "graph.add_edge(START, \"generate_topics\")\n", + "graph.add_conditional_edges(\"generate_topics\", continue_to_jokes, [\"generate_joke\"])\n", + "graph.add_edge(\"generate_joke\", \"best_joke\")\n", + "graph.add_edge(\"best_joke\", END)\n", + "app = graph.compile()" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "37ed1f71-63db-416f-b715-4617b33d4b7f", + "metadata": {}, + "outputs": [ + { + "data": { + "image/jpeg": "", + "text/plain": [ + "" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from IPython.display import Image\n", + "\n", + "Image(app.get_graph().draw_mermaid_png())" + ] + }, + { + "cell_type": "markdown", + "id": "4a0026d8", + "metadata": {}, + "source": [ + "## Use the graph" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "fd90cace", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}}\n", + "{'generate_joke': {'jokes': [\"Why don't elephants use computers? They're afraid of the mouse!\"]}}\n", + "{'generate_joke': {'jokes': [\"Why don't dolphins use smartphones? Because they're afraid of phishing!\"]}}\n", + "{'generate_joke': {'jokes': [\"Why don't you see penguins in Britain? Because they're afraid of Wales!\"]}}\n", + "{'generate_joke': {'jokes': [\"Why don't lions like fast food? Because they can't catch it!\"]}}\n", + "{'best_joke': {'best_selected_joke': \"Why don't dolphins use smartphones? Because they're afraid of phishing!\"}}\n" + ] + } + ], + "source": [ + "# Call the graph: here we call it to generate a list of jokes\n", + "for s in app.stream({\"topic\": \"animals\"}):\n", + " print(s)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" } - ], - "source": [ - "# Call the graph: here we call it to generate a list of jokes\n", - "for s in app.stream({\"topic\": \"animals\"}):\n", - " print(s)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.9" - } - }, - "nbformat": 4, - "nbformat_minor": 5 + "nbformat": 4, + "nbformat_minor": 5 } diff --git a/docs/docs/reference/graphs.md b/docs/docs/reference/graphs.md index 779f95095..14392ba2c 100644 --- a/docs/docs/reference/graphs.md +++ b/docs/docs/reference/graphs.md @@ -29,7 +29,7 @@ handler: python ## StreamMode -::: langgraph.pregel.types.StreamMode +::: langgraph.types.StreamMode ## Constants diff --git a/libs/langgraph/langgraph/types.py b/libs/langgraph/langgraph/types.py index 4cca9946c..afbc98505 100644 --- a/libs/langgraph/langgraph/types.py +++ b/libs/langgraph/langgraph/types.py @@ -153,7 +153,7 @@ class Send: ... subjects: list[str] ... jokes: Annotated[list[str], operator.add] ... - >>> from langgraph.constants import Send + >>> from langgraph.types import Send >>> from langgraph.graph import END, START >>> def continue_to_jokes(state: OverallState): ... return [Send("generate_joke", {"subject": s}) for s in state['subjects']] diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index eb88ab189..1fa11ead2 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -52,7 +52,7 @@ CheckpointTuple, ) from langgraph.checkpoint.memory import MemorySaver -from langgraph.constants import ERROR, PULL, PUSH, Interrupt, Send +from langgraph.constants import ERROR, PULL, PUSH from langgraph.errors import InvalidUpdateError, NodeInterrupt from langgraph.graph import END, Graph from langgraph.graph.graph import START @@ -71,7 +71,7 @@ ) from langgraph.pregel.retry import RetryPolicy from langgraph.store.memory import MemoryStore -from langgraph.types import PregelTask, StreamWriter +from langgraph.types import Interrupt, PregelTask, Send, StreamWriter from tests.any_str import AnyDict, AnyStr, AnyVersion, UnsortedSequence from tests.conftest import ALL_CHECKPOINTERS_SYNC, SHOULD_CHECK_SNAPSHOTS from tests.fake_chat import FakeChatModel diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index cee63f469..416c9ffe2 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -51,7 +51,7 @@ CheckpointTuple, ) from langgraph.checkpoint.memory import MemorySaver -from langgraph.constants import ERROR, PULL, PUSH, Interrupt, Send +from langgraph.constants import ERROR, PULL, PUSH from langgraph.errors import InvalidUpdateError, NodeInterrupt from langgraph.graph import END, Graph, StateGraph from langgraph.graph.graph import START @@ -69,7 +69,7 @@ ) from langgraph.pregel.retry import RetryPolicy from langgraph.store.memory import MemoryStore -from langgraph.types import PregelTask, StreamWriter +from langgraph.types import Interrupt, PregelTask, Send, StreamWriter from tests.any_str import AnyDict, AnyStr, AnyVersion, UnsortedSequence from tests.conftest import ( ALL_CHECKPOINTERS_ASYNC, diff --git a/libs/scheduler-kafka/README.md b/libs/scheduler-kafka/README.md index 637a337dd..fd65d7f3c 100644 --- a/libs/scheduler-kafka/README.md +++ b/libs/scheduler-kafka/README.md @@ -95,7 +95,7 @@ You can pass any of the following values as `kwargs` to either `KafkaOrchestrato - batch_max_n (int): Maximum number of messages to include in a single batch. Default: 10. - batch_max_ms (int): Maximum time in milliseconds to wait for messages to include in a batch. Default: 1000. -- retry_policy (langgraph.pregel.types.RetryPolicy): Controls which graph-level errors will be retried when processing messages. A good use for this is to retry database errors thrown by the checkpointer. Defaults to None. +- retry_policy (langgraph.types.RetryPolicy): Controls which graph-level errors will be retried when processing messages. A good use for this is to retry database errors thrown by the checkpointer. Defaults to None. ### Connection settings From beeb2aa8ba80a8a896ab75cd5263644d0b64e5b2 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:44:11 -0700 Subject: [PATCH 07/14] Detect multiple subgraphs in single node --- libs/langgraph/langgraph/pregel/loop.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index 7eec59575..f5dba4f9b 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -41,6 +41,7 @@ CONFIG_KEY_DEDUPE_TASKS, CONFIG_KEY_DELEGATE, CONFIG_KEY_ENSURE_LATEST, + CONFIG_KEY_GRAPH_COUNT, CONFIG_KEY_RESUMING, CONFIG_KEY_STREAM, CONFIG_KEY_TASK_ID, @@ -220,6 +221,11 @@ def __init__( self.config = patch_configurable( self.config, {"checkpoint_ns": "", "checkpoint_id": None} ) + if config["configurable"].get(CONFIG_KEY_GRAPH_COUNT, 0) > 0: + raise ValueError("Detected multiple subgraphs called in a single node.") + else: + # mutate config so that sibling subgraphs can be detected + self.config["configurable"][CONFIG_KEY_GRAPH_COUNT] = 1 if ( CONFIG_KEY_CHECKPOINT_MAP in self.config["configurable"] and self.config["configurable"].get("checkpoint_ns") From 89bb7fe178aef7ca98f452e9b2d1a94034667107 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:47:36 -0700 Subject: [PATCH 08/14] Skip test for now --- libs/langgraph/tests/test_tracing_interops.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/langgraph/tests/test_tracing_interops.py b/libs/langgraph/tests/test_tracing_interops.py index f1f0ad7fb..a3dae6420 100644 --- a/libs/langgraph/tests/test_tracing_interops.py +++ b/libs/langgraph/tests/test_tracing_interops.py @@ -55,6 +55,7 @@ def wait_for( raise ValueError(f"Callable did not return within {total_time}") +@pytest.skip("This test times out in CI") async def test_nested_tracing(): lt_py_311 = sys.version_info < (3, 11) mock_client = _get_mock_client() From 2fccd0958534f6266c53a331b6717c73b62e83ab Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 17:52:17 -0700 Subject: [PATCH 09/14] Fix --- libs/langgraph/langgraph/pregel/algo.py | 3 +++ libs/langgraph/langgraph/pregel/loop.py | 11 ++++++----- libs/langgraph/tests/test_tracing_interops.py | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/libs/langgraph/langgraph/pregel/algo.py b/libs/langgraph/langgraph/pregel/algo.py index f9b0096c8..3c98a248b 100644 --- a/libs/langgraph/langgraph/pregel/algo.py +++ b/libs/langgraph/langgraph/pregel/algo.py @@ -30,6 +30,7 @@ from langgraph.constants import ( CONFIG_KEY_CHECKPOINT_MAP, CONFIG_KEY_CHECKPOINTER, + CONFIG_KEY_GRAPH_COUNT, CONFIG_KEY_READ, CONFIG_KEY_SEND, CONFIG_KEY_TASK_ID, @@ -429,6 +430,7 @@ def prepare_single_task( manager.get_child(f"graph:step:{step}") if manager else None ), configurable={ + CONFIG_KEY_GRAPH_COUNT: 0, CONFIG_KEY_TASK_ID: task_id, # deque.extend is thread-safe CONFIG_KEY_SEND: partial( @@ -539,6 +541,7 @@ def prepare_single_task( else None ), configurable={ + CONFIG_KEY_GRAPH_COUNT: 0, CONFIG_KEY_TASK_ID: task_id, # deque.extend is thread-safe CONFIG_KEY_SEND: partial( diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index f5dba4f9b..a40b5f8f6 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -221,11 +221,12 @@ def __init__( self.config = patch_configurable( self.config, {"checkpoint_ns": "", "checkpoint_id": None} ) - if config["configurable"].get(CONFIG_KEY_GRAPH_COUNT, 0) > 0: - raise ValueError("Detected multiple subgraphs called in a single node.") - else: - # mutate config so that sibling subgraphs can be detected - self.config["configurable"][CONFIG_KEY_GRAPH_COUNT] = 1 + if self.is_nested: + if config["configurable"].get(CONFIG_KEY_GRAPH_COUNT, 0) > 0: + raise ValueError("Detected multiple subgraphs called in a single node.") + else: + # mutate config so that sibling subgraphs can be detected + self.config["configurable"][CONFIG_KEY_GRAPH_COUNT] = 1 if ( CONFIG_KEY_CHECKPOINT_MAP in self.config["configurable"] and self.config["configurable"].get("checkpoint_ns") diff --git a/libs/langgraph/tests/test_tracing_interops.py b/libs/langgraph/tests/test_tracing_interops.py index a3dae6420..5b458394b 100644 --- a/libs/langgraph/tests/test_tracing_interops.py +++ b/libs/langgraph/tests/test_tracing_interops.py @@ -55,7 +55,7 @@ def wait_for( raise ValueError(f"Callable did not return within {total_time}") -@pytest.skip("This test times out in CI") +@pytest.mark.skip("This test times out in CI") async def test_nested_tracing(): lt_py_311 = sys.version_info < (3, 11) mock_client = _get_mock_client() From 81a9a2f9038716b3be8a705f07a46c5b541e7918 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sat, 21 Sep 2024 18:48:23 -0700 Subject: [PATCH 10/14] Use a different strategy, add test --- libs/langgraph/langgraph/constants.py | 18 +++++------ libs/langgraph/langgraph/errors.py | 10 ++++++ libs/langgraph/langgraph/graph/graph.py | 5 ++- libs/langgraph/langgraph/graph/state.py | 7 ++-- .../langgraph/prebuilt/chat_agent_executor.py | 4 +-- libs/langgraph/langgraph/pregel/__init__.py | 32 ++++++++----------- libs/langgraph/langgraph/pregel/algo.py | 3 -- libs/langgraph/langgraph/pregel/loop.py | 12 +++---- libs/langgraph/langgraph/pregel/retry.py | 26 ++++++++++----- libs/langgraph/langgraph/types.py | 18 +++++++++-- libs/langgraph/tests/test_pregel.py | 20 ++++++++++-- libs/langgraph/tests/test_pregel_async.py | 19 +++++++++-- 12 files changed, 114 insertions(+), 60 deletions(-) diff --git a/libs/langgraph/langgraph/constants.py b/libs/langgraph/langgraph/constants.py index ef4a8a486..bde74c438 100644 --- a/libs/langgraph/langgraph/constants.py +++ b/libs/langgraph/langgraph/constants.py @@ -10,6 +10,14 @@ EMPTY_MAP: Mapping[str, Any] = MappingProxyType({}) EMPTY_SEQ: tuple[str, ...] = tuple() +# --- Public constants --- +TAG_HIDDEN = "langsmith:hidden" +# tag to hide a node/edge from certain tracing/streaming environments +START = "__start__" +# the first (maybe virtual) node in graph-style Pregel +END = "__end__" +# the last (maybe virtual) node in graph-style Pregel + # --- Reserved write keys --- INPUT = "__input__" # for values passed as input to the graph @@ -23,10 +31,6 @@ # marker to signal node was scheduled (in distributed mode) TASKS = "__pregel_tasks" # for Send objects returned by nodes/edges, corresponds to PUSH below -START = "__start__" -# marker for the first (maybe virtual) node in graph-style Pregel -END = "__end__" -# marker for the last (maybe virtual) node in graph-style Pregel # --- Reserved config.configurable keys --- CONFIG_KEY_SEND = "__pregel_send" @@ -43,8 +47,6 @@ # holds a `BaseStore` made available to managed values CONFIG_KEY_RESUMING = "__pregel_resuming" # holds a boolean indicating if subgraphs should resume from a previous checkpoint -CONFIG_KEY_GRAPH_COUNT = "__pregel_graph_count" -# holds the number of subgraphs executed in a given task, used to raise errors CONFIG_KEY_TASK_ID = "__pregel_task_id" # holds the task ID for the current task CONFIG_KEY_DEDUPE_TASKS = "__pregel_dedupe_tasks" @@ -68,14 +70,13 @@ # denotes pull-style tasks, ie. those triggered by edges RUNTIME_PLACEHOLDER = "__pregel_runtime_placeholder__" # placeholder for managed values replaced at runtime -TAG_HIDDEN = "langsmith:hidden" -# tag to hide a node/edge from certain tracing/streaming environments NS_SEP = "|" # for checkpoint_ns, separates each level (ie. graph|subgraph|subsubgraph) NS_END = ":" # for checkpoint_ns, for each level, separates the namespace from the task_id RESERVED = { + TAG_HIDDEN, # reserved write keys INPUT, INTERRUPT, @@ -103,7 +104,6 @@ PUSH, PULL, RUNTIME_PLACEHOLDER, - TAG_HIDDEN, NS_SEP, NS_END, } diff --git a/libs/langgraph/langgraph/errors.py b/libs/langgraph/langgraph/errors.py index c7c5a518a..63bc8aff6 100644 --- a/libs/langgraph/langgraph/errors.py +++ b/libs/langgraph/langgraph/errors.py @@ -69,3 +69,13 @@ class CheckpointNotLatest(Exception): """Raised when the checkpoint is not the latest version (for distributed mode).""" pass + + +class MultipleSubgraphsError(Exception): + """Raised when multiple subgraphs are called inside the same node.""" + + pass + + +_SEEN_CHECKPOINT_NS: set[str] = set() +"""Used for subgraph detection.""" diff --git a/libs/langgraph/langgraph/graph/graph.py b/libs/langgraph/langgraph/graph/graph.py index a5a4db3ac..e957a15b9 100644 --- a/libs/langgraph/langgraph/graph/graph.py +++ b/libs/langgraph/langgraph/graph/graph.py @@ -26,7 +26,6 @@ from typing_extensions import Self from langgraph.channels.ephemeral_value import EphemeralValue -from langgraph.checkpoint.base import BaseCheckpointSaver from langgraph.constants import ( END, NS_END, @@ -39,7 +38,7 @@ from langgraph.pregel import Channel, Pregel from langgraph.pregel.read import PregelNode from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry -from langgraph.types import All +from langgraph.types import All, Checkpointer from langgraph.utils.runnable import RunnableCallable, coerce_to_runnable logger = logging.getLogger(__name__) @@ -406,7 +405,7 @@ def validate(self, interrupt: Optional[Sequence[str]] = None) -> Self: def compile( self, - checkpointer: Optional[BaseCheckpointSaver] = None, + checkpointer: Checkpointer = None, interrupt_before: Optional[Union[All, list[str]]] = None, interrupt_after: Optional[Union[All, list[str]]] = None, debug: bool = False, diff --git a/libs/langgraph/langgraph/graph/state.py b/libs/langgraph/langgraph/graph/state.py index 4fdff2a49..bc0762c80 100644 --- a/libs/langgraph/langgraph/graph/state.py +++ b/libs/langgraph/langgraph/graph/state.py @@ -32,7 +32,6 @@ from langgraph.channels.ephemeral_value import EphemeralValue from langgraph.channels.last_value import LastValue from langgraph.channels.named_barrier_value import NamedBarrierValue -from langgraph.checkpoint.base import BaseCheckpointSaver from langgraph.constants import NS_END, NS_SEP, TAG_HIDDEN from langgraph.errors import InvalidUpdateError from langgraph.graph.graph import END, START, Branch, CompiledGraph, Graph, Send @@ -47,7 +46,7 @@ from langgraph.pregel.read import ChannelRead, PregelNode from langgraph.pregel.write import SKIP_WRITE, ChannelWrite, ChannelWriteEntry from langgraph.store.base import BaseStore -from langgraph.types import All, RetryPolicy +from langgraph.types import All, Checkpointer, RetryPolicy from langgraph.utils.fields import get_field_default from langgraph.utils.pydantic import create_model from langgraph.utils.runnable import coerce_to_runnable @@ -400,7 +399,7 @@ def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> Self: def compile( self, - checkpointer: Optional[BaseCheckpointSaver] = None, + checkpointer: Checkpointer = None, *, store: Optional[BaseStore] = None, interrupt_before: Optional[Union[All, list[str]]] = None, @@ -413,7 +412,7 @@ def compile( streamed, batched, and run asynchronously. Args: - checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object. + checkpointer (Checkpointer): An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point. interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before. diff --git a/libs/langgraph/langgraph/prebuilt/chat_agent_executor.py b/libs/langgraph/langgraph/prebuilt/chat_agent_executor.py index 1e2209bd7..c5c64cd24 100644 --- a/libs/langgraph/langgraph/prebuilt/chat_agent_executor.py +++ b/libs/langgraph/langgraph/prebuilt/chat_agent_executor.py @@ -16,13 +16,13 @@ from langchain_core.tools import BaseTool from langgraph._api.deprecation import deprecated_parameter -from langgraph.checkpoint.base import BaseCheckpointSaver from langgraph.graph import StateGraph from langgraph.graph.graph import CompiledGraph from langgraph.graph.message import add_messages from langgraph.managed import IsLastStep from langgraph.prebuilt.tool_executor import ToolExecutor from langgraph.prebuilt.tool_node import ToolNode +from langgraph.types import Checkpointer # We create the AgentState that we will pass around @@ -132,7 +132,7 @@ def create_react_agent( state_schema: Optional[StateSchemaType] = None, messages_modifier: Optional[MessagesModifier] = None, state_modifier: Optional[StateModifier] = None, - checkpointer: Optional[BaseCheckpointSaver] = None, + checkpointer: Checkpointer = None, interrupt_before: Optional[list[str]] = None, interrupt_after: Optional[list[str]] = None, debug: bool = False, diff --git a/libs/langgraph/langgraph/pregel/__init__.py b/libs/langgraph/langgraph/pregel/__init__.py index c9fec2e57..f78d072d6 100644 --- a/libs/langgraph/langgraph/pregel/__init__.py +++ b/libs/langgraph/langgraph/pregel/__init__.py @@ -87,7 +87,7 @@ from langgraph.pregel.validate import validate_graph, validate_keys from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry from langgraph.store.base import BaseStore -from langgraph.types import All, StateSnapshot, StreamMode +from langgraph.types import All, Checkpointer, StateSnapshot, StreamMode from langgraph.utils.config import ( ensure_config, merge_configs, @@ -197,7 +197,7 @@ class Pregel(Runnable[Union[dict[str, Any], Any], Union[dict[str, Any], Any]]): debug: bool """Whether to print debug information during execution. Defaults to False.""" - checkpointer: Optional[BaseCheckpointSaver] = None + checkpointer: Checkpointer = None """Checkpointer used to save and load graph state. Defaults to None.""" store: Optional[BaseStore] = None @@ -281,7 +281,7 @@ def config_specs(self) -> list[ConfigurableFieldSpec]: [spec for node in self.nodes.values() for spec in node.config_specs] + ( self.checkpointer.config_specs - if self.checkpointer is not None + if isinstance(self.checkpointer, BaseCheckpointSaver) else [] ) + ( @@ -1059,6 +1059,8 @@ def _defaults( Union[All, Sequence[str]], Optional[BaseCheckpointSaver], ]: + if config["recursion_limit"] < 1: + raise ValueError("recursion_limit must be at least 1") debug = debug if debug is not None else self.debug if output_keys is None: output_keys = self.stream_channels_asis @@ -1072,12 +1074,16 @@ def _defaults( if CONFIG_KEY_TASK_ID in config.get("configurable", {}): # if being called as a node in another graph, always use values mode stream_mode = ["values"] - if CONFIG_KEY_CHECKPOINTER in config.get("configurable", {}): - checkpointer: Optional[BaseCheckpointSaver] = config["configurable"][ - CONFIG_KEY_CHECKPOINTER - ] + if self.checkpointer is False: + checkpointer: Optional[BaseCheckpointSaver] = None + elif CONFIG_KEY_CHECKPOINTER in config.get("configurable", {}): + checkpointer = config["configurable"][CONFIG_KEY_CHECKPOINTER] else: checkpointer = self.checkpointer + if checkpointer and not config.get("configurable"): + raise ValueError( + f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in checkpointer.config_specs]}" + ) return ( debug, set(stream_mode), @@ -1193,12 +1199,6 @@ def output() -> Iterator: run_id=config.get("run_id"), ) try: - if config["recursion_limit"] < 1: - raise ValueError("recursion_limit must be at least 1") - if self.checkpointer and not config.get("configurable"): - raise ValueError( - f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}" - ) # assign defaults ( debug, @@ -1414,12 +1414,6 @@ def output() -> Iterator: None, ) try: - if config["recursion_limit"] < 1: - raise ValueError("recursion_limit must be at least 1") - if self.checkpointer and not config.get("configurable"): - raise ValueError( - f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}" - ) # assign defaults ( debug, diff --git a/libs/langgraph/langgraph/pregel/algo.py b/libs/langgraph/langgraph/pregel/algo.py index 3c98a248b..f9b0096c8 100644 --- a/libs/langgraph/langgraph/pregel/algo.py +++ b/libs/langgraph/langgraph/pregel/algo.py @@ -30,7 +30,6 @@ from langgraph.constants import ( CONFIG_KEY_CHECKPOINT_MAP, CONFIG_KEY_CHECKPOINTER, - CONFIG_KEY_GRAPH_COUNT, CONFIG_KEY_READ, CONFIG_KEY_SEND, CONFIG_KEY_TASK_ID, @@ -430,7 +429,6 @@ def prepare_single_task( manager.get_child(f"graph:step:{step}") if manager else None ), configurable={ - CONFIG_KEY_GRAPH_COUNT: 0, CONFIG_KEY_TASK_ID: task_id, # deque.extend is thread-safe CONFIG_KEY_SEND: partial( @@ -541,7 +539,6 @@ def prepare_single_task( else None ), configurable={ - CONFIG_KEY_GRAPH_COUNT: 0, CONFIG_KEY_TASK_ID: task_id, # deque.extend is thread-safe CONFIG_KEY_SEND: partial( diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index a40b5f8f6..452103c9e 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -41,7 +41,6 @@ CONFIG_KEY_DEDUPE_TASKS, CONFIG_KEY_DELEGATE, CONFIG_KEY_ENSURE_LATEST, - CONFIG_KEY_GRAPH_COUNT, CONFIG_KEY_RESUMING, CONFIG_KEY_STREAM, CONFIG_KEY_TASK_ID, @@ -55,10 +54,12 @@ TASKS, ) from langgraph.errors import ( + _SEEN_CHECKPOINT_NS, CheckpointNotLatest, EmptyInputError, GraphDelegate, GraphInterrupt, + MultipleSubgraphsError, ) from langgraph.managed.base import ( ManagedValueMapping, @@ -221,12 +222,11 @@ def __init__( self.config = patch_configurable( self.config, {"checkpoint_ns": "", "checkpoint_id": None} ) - if self.is_nested: - if config["configurable"].get(CONFIG_KEY_GRAPH_COUNT, 0) > 0: - raise ValueError("Detected multiple subgraphs called in a single node.") + if self.is_nested and self.checkpointer is not None: + if self.config["configurable"]["checkpoint_ns"] in _SEEN_CHECKPOINT_NS: + raise MultipleSubgraphsError else: - # mutate config so that sibling subgraphs can be detected - self.config["configurable"][CONFIG_KEY_GRAPH_COUNT] = 1 + _SEEN_CHECKPOINT_NS.add(self.config["configurable"]["checkpoint_ns"]) if ( CONFIG_KEY_CHECKPOINT_MAP in self.config["configurable"] and self.config["configurable"].get("checkpoint_ns") diff --git a/libs/langgraph/langgraph/pregel/retry.py b/libs/langgraph/langgraph/pregel/retry.py index 476b8ef32..33c60d875 100644 --- a/libs/langgraph/langgraph/pregel/retry.py +++ b/libs/langgraph/langgraph/pregel/retry.py @@ -4,8 +4,8 @@ import time from typing import Optional, Sequence -from langgraph.constants import CONFIG_KEY_GRAPH_COUNT, CONFIG_KEY_RESUMING -from langgraph.errors import GraphInterrupt +from langgraph.constants import CONFIG_KEY_RESUMING +from langgraph.errors import _SEEN_CHECKPOINT_NS, GraphInterrupt from langgraph.types import PregelExecutableTask, RetryPolicy from langgraph.utils.config import patch_configurable @@ -70,9 +70,14 @@ def run_with_retry( exc_info=exc, ) # signal subgraphs to resume (if available) - config = patch_configurable( - config, {CONFIG_KEY_RESUMING: True, CONFIG_KEY_GRAPH_COUNT: 0} - ) + config = patch_configurable(config, {CONFIG_KEY_RESUMING: True}) + # clear checkpoint_ns seen (for subgraph detection) + if checkpoint_ns := config["configurable"].get("checkpoint_ns"): + _SEEN_CHECKPOINT_NS.discard(checkpoint_ns) + finally: + # clear checkpoint_ns seen (for subgraph detection) + if checkpoint_ns := config["configurable"].get("checkpoint_ns"): + _SEEN_CHECKPOINT_NS.discard(checkpoint_ns) async def arun_with_retry( @@ -138,6 +143,11 @@ async def arun_with_retry( exc_info=exc, ) # signal subgraphs to resume (if available) - config = patch_configurable( - config, {CONFIG_KEY_RESUMING: True, CONFIG_KEY_GRAPH_COUNT: 0} - ) + config = patch_configurable(config, {CONFIG_KEY_RESUMING: True}) + # clear checkpoint_ns seen (for subgraph detection) + if checkpoint_ns := config["configurable"].get("checkpoint_ns"): + _SEEN_CHECKPOINT_NS.discard(checkpoint_ns) + finally: + # clear checkpoint_ns seen (for subgraph detection) + if checkpoint_ns := config["configurable"].get("checkpoint_ns"): + _SEEN_CHECKPOINT_NS.discard(checkpoint_ns) diff --git a/libs/langgraph/langgraph/types.py b/libs/langgraph/langgraph/types.py index afbc98505..f8a8a74c6 100644 --- a/libs/langgraph/langgraph/types.py +++ b/libs/langgraph/langgraph/types.py @@ -1,12 +1,26 @@ from collections import deque from dataclasses import dataclass -from typing import Any, Callable, Literal, NamedTuple, Optional, Sequence, Type, Union +from typing import ( + Any, + Callable, + Literal, + NamedTuple, + Optional, + Sequence, + Type, + Union, +) from langchain_core.runnables import Runnable, RunnableConfig -from langgraph.checkpoint.base import CheckpointMetadata +from langgraph.checkpoint.base import BaseCheckpointSaver, CheckpointMetadata All = Literal["*"] +"""Special value to indicate that graph should interrupt on all nodes.""" + +Checkpointer = Union[None, Literal[False], BaseCheckpointSaver] +"""Type of the checkpointer to use for a subgraph. False disables checkpointing, +even if the parent graph has a checkpointer. None inherits checkpointer.""" StreamMode = Literal["values", "updates", "debug", "messages", "custom"] """How the stream method should emit outputs. diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index 1fa11ead2..18bccbe75 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -53,7 +53,7 @@ ) from langgraph.checkpoint.memory import MemorySaver from langgraph.constants import ERROR, PULL, PUSH -from langgraph.errors import InvalidUpdateError, NodeInterrupt +from langgraph.errors import InvalidUpdateError, MultipleSubgraphsError, NodeInterrupt from langgraph.graph import END, Graph from langgraph.graph.graph import START from langgraph.graph.message import MessageGraph, add_messages @@ -1861,7 +1861,12 @@ def test_invoke_two_processes_two_in_join_two_out(mocker: MockerFixture) -> None assert [*executor.map(app.invoke, [2] * 100)] == [[13, 13]] * 100 -def test_invoke_join_then_call_other_pregel(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_SYNC) +def test_invoke_join_then_call_other_pregel( + mocker: MockerFixture, request: pytest.FixtureRequest, checkpointer_name: str +) -> None: + checkpointer = request.getfixturevalue(f"checkpointer_{checkpointer_name}") + add_one = mocker.Mock(side_effect=lambda x: x + 1) add_10_each = mocker.Mock(side_effect=lambda x: [y + 10 for y in x]) @@ -1912,6 +1917,17 @@ def test_invoke_join_then_call_other_pregel(mocker: MockerFixture) -> None: with ThreadPoolExecutor() as executor: assert [*executor.map(app.invoke, [[2, 3]] * 10)] == [27] * 10 + # add checkpointer + app.checkpointer = checkpointer + # subgraph is called twice in the same node, through .map(), so raises + with pytest.raises(MultipleSubgraphsError): + app.invoke([2, 3], {"configurable": {"thread_id": "1"}}) + + # set inner graph checkpointer NeverCheckpoint + inner_app.checkpointer = False + # subgraph still called twice, but checkpointing for inner graph is disabled + assert app.invoke([2, 3], {"configurable": {"thread_id": "1"}}) == 27 + def test_invoke_two_processes_one_in_two_out(mocker: MockerFixture) -> None: add_one = mocker.Mock(side_effect=lambda x: x + 1) diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index 416c9ffe2..0bd9ed1f9 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -52,7 +52,7 @@ ) from langgraph.checkpoint.memory import MemorySaver from langgraph.constants import ERROR, PULL, PUSH -from langgraph.errors import InvalidUpdateError, NodeInterrupt +from langgraph.errors import InvalidUpdateError, MultipleSubgraphsError, NodeInterrupt from langgraph.graph import END, Graph, StateGraph from langgraph.graph.graph import START from langgraph.graph.message import MessageGraph, add_messages @@ -2080,7 +2080,10 @@ async def test_invoke_two_processes_two_in_join_two_out(mocker: MockerFixture) - ] -async def test_invoke_join_then_call_other_pregel(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_ASYNC) +async def test_invoke_join_then_call_other_pregel( + mocker: MockerFixture, checkpointer_name: str +) -> None: add_one = mocker.Mock(side_effect=lambda x: x + 1) add_10_each = mocker.Mock(side_effect=lambda x: [y + 10 for y in x]) @@ -2133,6 +2136,18 @@ async def test_invoke_join_then_call_other_pregel(mocker: MockerFixture) -> None 27 for _ in range(10) ] + async with awith_checkpointer(checkpointer_name) as checkpointer: + # add checkpointer + app.checkpointer = checkpointer + # subgraph is called twice in the same node, through .map(), so raises + with pytest.raises(MultipleSubgraphsError): + await app.ainvoke([2, 3], {"configurable": {"thread_id": "1"}}) + + # set inner graph checkpointer NeverCheckpoint + inner_app.checkpointer = False + # subgraph still called twice, but checkpointing for inner graph is disabled + assert await app.ainvoke([2, 3], {"configurable": {"thread_id": "1"}}) == 27 + async def test_invoke_two_processes_one_in_two_out(mocker: MockerFixture) -> None: add_one = mocker.Mock(side_effect=lambda x: x + 1) From 644f14991e71ea0e5e8cf24155dd0dc6af41eb87 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sun, 22 Sep 2024 12:04:17 -0700 Subject: [PATCH 11/14] Fix kfka --- libs/langgraph/langgraph/pregel/loop.py | 7 ++++++- .../langgraph/scheduler/kafka/orchestrator.py | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index 452103c9e..45b8798af 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -197,6 +197,7 @@ def __init__( specs: Mapping[str, Union[BaseChannel, ManagedValueSpec]], output_keys: Union[str, Sequence[str]], stream_keys: Union[str, Sequence[str]], + check_subgraphs: bool = True, debug: bool = False, ) -> None: self.stream = stream @@ -222,7 +223,7 @@ def __init__( self.config = patch_configurable( self.config, {"checkpoint_ns": "", "checkpoint_id": None} ) - if self.is_nested and self.checkpointer is not None: + if check_subgraphs and self.is_nested and self.checkpointer is not None: if self.config["configurable"]["checkpoint_ns"] in _SEEN_CHECKPOINT_NS: raise MultipleSubgraphsError else: @@ -641,6 +642,7 @@ def __init__( specs: Mapping[str, Union[BaseChannel, ManagedValueSpec]], output_keys: Union[str, Sequence[str]] = EMPTY_SEQ, stream_keys: Union[str, Sequence[str]] = EMPTY_SEQ, + check_subgraphs: bool = True, debug: bool = False, ) -> None: super().__init__( @@ -653,6 +655,7 @@ def __init__( specs=specs, output_keys=output_keys, stream_keys=stream_keys, + check_subgraphs=check_subgraphs, debug=debug, ) self.stack = ExitStack() @@ -762,6 +765,7 @@ def __init__( specs: Mapping[str, Union[BaseChannel, ManagedValueSpec]], output_keys: Union[str, Sequence[str]] = EMPTY_SEQ, stream_keys: Union[str, Sequence[str]] = EMPTY_SEQ, + check_subgraphs: bool = True, debug: bool = False, ) -> None: super().__init__( @@ -774,6 +778,7 @@ def __init__( specs=specs, output_keys=output_keys, stream_keys=stream_keys, + check_subgraphs=check_subgraphs, debug=debug, ) self.store = AsyncBatchedStore(self.store) if self.store else None diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py index 097429bb6..a94a3bd0a 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py @@ -158,6 +158,7 @@ async def attempt(self, msg: MessageToOrchestrator) -> None: specs=graph.channels, output_keys=graph.output_channels, stream_keys=graph.stream_channels, + check_subgraphs=False, ) as loop: if loop.tick( input_keys=graph.input_channels, From 74f34db9e453df4097280262e54c9eb992f2a1e0 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sun, 22 Sep 2024 12:04:43 -0700 Subject: [PATCH 12/14] Fix sync kafka --- libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py index a94a3bd0a..39e7b755b 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py @@ -348,6 +348,7 @@ def attempt(self, msg: MessageToOrchestrator) -> None: specs=graph.channels, output_keys=graph.output_channels, stream_keys=graph.stream_channels, + check_subgraphs=False, ) as loop: if loop.tick( input_keys=graph.input_channels, From e76f49c7f0eb3b9cff9d5641384a441b5a121c67 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sun, 22 Sep 2024 12:37:24 -0700 Subject: [PATCH 13/14] Fix flaky test --- libs/langgraph/tests/any_str.py | 17 +++++++++++++++++ libs/langgraph/tests/test_pregel.py | 12 ++++++------ libs/langgraph/tests/test_pregel_async.py | 12 ++++++------ 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/libs/langgraph/tests/any_str.py b/libs/langgraph/tests/any_str.py index 9a1977a8c..5995d0e52 100644 --- a/libs/langgraph/tests/any_str.py +++ b/libs/langgraph/tests/any_str.py @@ -2,6 +2,23 @@ from typing import Any, Sequence, Union +class FloatBetween(float): + def __init__(self, min_value: float, max_value: float) -> None: + super().__init__() + self.min_value = min_value + self.max_value = max_value + + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, float) + and other >= self.min_value + and other <= self.max_value + ) + + def __hash__(self) -> int: + return hash((float(self), self.min_value, self.max_value)) + + class AnyStr(str): def __init__(self, prefix: Union[str, re.Pattern] = "") -> None: super().__init__() diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index 18bccbe75..605f00747 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -72,7 +72,7 @@ from langgraph.pregel.retry import RetryPolicy from langgraph.store.memory import MemoryStore from langgraph.types import Interrupt, PregelTask, Send, StreamWriter -from tests.any_str import AnyDict, AnyStr, AnyVersion, UnsortedSequence +from tests.any_str import AnyDict, AnyStr, AnyVersion, FloatBetween, UnsortedSequence from tests.conftest import ALL_CHECKPOINTERS_SYNC, SHOULD_CHECK_SNAPSHOTS from tests.fake_chat import FakeChatModel from tests.fake_tracer import FakeTracer @@ -8596,22 +8596,22 @@ def outer_2(state: State): assert chunks == [ # arrives before "inner" finishes ( - 0.0, + FloatBetween(0.0, 0.1), ( (AnyStr("inner:"),), {"inner_1": {"my_key": "got here", "my_other_key": ""}}, ), ), - (0.2, ((), {"outer_1": {"my_key": " and parallel"}})), + (FloatBetween(0.2, 0.3), ((), {"outer_1": {"my_key": " and parallel"}})), ( - 0.5, + FloatBetween(0.5, 0.6), ( (AnyStr("inner:"),), {"inner_2": {"my_key": " and there", "my_other_key": "got here"}}, ), ), - (0.5, ((), {"inner": {"my_key": "got here and there"}})), - (0.5, ((), {"outer_2": {"my_key": " and back again"}})), + (FloatBetween(0.5, 0.6), ((), {"inner": {"my_key": "got here and there"}})), + (FloatBetween(0.5, 0.6), ((), {"outer_2": {"my_key": " and back again"}})), ] diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index 0bd9ed1f9..d17925aca 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -70,7 +70,7 @@ from langgraph.pregel.retry import RetryPolicy from langgraph.store.memory import MemoryStore from langgraph.types import Interrupt, PregelTask, Send, StreamWriter -from tests.any_str import AnyDict, AnyStr, AnyVersion, UnsortedSequence +from tests.any_str import AnyDict, AnyStr, AnyVersion, FloatBetween, UnsortedSequence from tests.conftest import ( ALL_CHECKPOINTERS_ASYNC, ALL_CHECKPOINTERS_ASYNC_PLUS_NONE, @@ -7202,22 +7202,22 @@ async def outer_2(state: State): assert chunks == [ # arrives before "inner" finishes ( - 0.0, + FloatBetween(0.0, 0.1), ( (AnyStr("inner:"),), {"inner_1": {"my_key": "got here", "my_other_key": ""}}, ), ), - (0.2, ((), {"outer_1": {"my_key": " and parallel"}})), + (FloatBetween(0.2, 0.3), ((), {"outer_1": {"my_key": " and parallel"}})), ( - 0.5, + FloatBetween(0.5, 0.6), ( (AnyStr("inner:"),), {"inner_2": {"my_key": " and there", "my_other_key": "got here"}}, ), ), - (0.5, ((), {"inner": {"my_key": "got here and there"}})), - (0.5, ((), {"outer_2": {"my_key": " and back again"}})), + (FloatBetween(0.5, 0.6), ((), {"inner": {"my_key": "got here and there"}})), + (FloatBetween(0.5, 0.6), ((), {"outer_2": {"my_key": " and back again"}})), ] From 84a45d186c22fa32fa4352a7317950406e1cb883 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Sun, 22 Sep 2024 12:42:34 -0700 Subject: [PATCH 14/14] Fix --- libs/langgraph/tests/any_str.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libs/langgraph/tests/any_str.py b/libs/langgraph/tests/any_str.py index 5995d0e52..5643a00fb 100644 --- a/libs/langgraph/tests/any_str.py +++ b/libs/langgraph/tests/any_str.py @@ -1,8 +1,13 @@ import re from typing import Any, Sequence, Union +from typing_extensions import Self + class FloatBetween(float): + def __new__(cls, min_value: float, max_value: float) -> Self: + return super().__new__(cls, min_value) + def __init__(self, min_value: float, max_value: float) -> None: super().__init__() self.min_value = min_value