Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more comments and docstrings #1792

Merged
merged 15 commits into from
Sep 23, 2024
558 changes: 279 additions & 279 deletions docs/docs/how-tos/map-reduce.ipynb

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions docs/docs/reference/graphs.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ handler: python

## StreamMode

::: langgraph.pregel.StreamMode
::: langgraph.types.StreamMode

## Constants

Expand Down Expand Up @@ -69,8 +69,12 @@ builder.add_conditional_edges("my_node", my_condition)

## Send

::: langgraph.constants.Send
::: langgraph.types.Send

## Interrupt

::: langgraph.types.Interrupt

## RetryPolicy

::: langgraph.pregel.types.RetryPolicy
::: langgraph.types.RetryPolicy
170 changes: 73 additions & 97 deletions libs/langgraph/langgraph/constants.py
Original file line number Diff line number Diff line change
@@ -1,133 +1,109 @@
from dataclasses import dataclass
from types import MappingProxyType
from typing import Any, Literal, Mapping
from typing import Any, Mapping

from langgraph.types import Interrupt, Send # noqa: F401

# Interrupt, Send re-exported for backwards compatibility


# --- Empty read-only containers ---
EMPTY_MAP: Mapping[str, Any] = MappingProxyType({})
EMPTY_SEQ: tuple[str, ...] = tuple()

# --- Public constants ---
TAG_HIDDEN = "langsmith:hidden"
# tag to hide a node/edge from certain tracing/streaming environments
START = "__start__"
# the first (maybe virtual) node in graph-style Pregel
END = "__end__"
# the last (maybe virtual) node in graph-style Pregel

# --- Reserved write keys ---
INPUT = "__input__"
# for values passed as input to the graph
INTERRUPT = "__interrupt__"
# for dynamic interrupts raised by nodes
ERROR = "__error__"
# for errors raised by nodes
NO_WRITES = "__no_writes__"
# marker to signal node didn't write anything
SCHEDULED = "__scheduled__"
# marker to signal node was scheduled (in distributed mode)
TASKS = "__pregel_tasks"
# for Send objects returned by nodes/edges, corresponds to PUSH below

# --- Reserved config.configurable keys ---
CONFIG_KEY_SEND = "__pregel_send"
# holds the `write` function that accepts writes to state/edges/reserved keys
CONFIG_KEY_READ = "__pregel_read"
# holds the `read` function that returns a copy of the current state
CONFIG_KEY_CHECKPOINTER = "__pregel_checkpointer"
# holds a `BaseCheckpointSaver` passed from parent graph to child graphs
CONFIG_KEY_STREAM = "__pregel_stream"
# holds a `StreamProtocol` passed from parent graph to child graphs
CONFIG_KEY_STREAM_WRITER = "__pregel_stream_writer"
# holds a `StreamWriter` for stream_mode=custom
CONFIG_KEY_STORE = "__pregel_store"
# holds a `BaseStore` made available to managed values
CONFIG_KEY_RESUMING = "__pregel_resuming"
# holds a boolean indicating if subgraphs should resume from a previous checkpoint
CONFIG_KEY_TASK_ID = "__pregel_task_id"
# holds the task ID for the current task
CONFIG_KEY_DEDUPE_TASKS = "__pregel_dedupe_tasks"
# holds a boolean indicating if tasks should be deduplicated (for distributed mode)
CONFIG_KEY_ENSURE_LATEST = "__pregel_ensure_latest"
# holds a boolean indicating whether to assert the requested checkpoint is the latest
# (for distributed mode)
CONFIG_KEY_DELEGATE = "__pregel_delegate"
# this one part of public API so more readable
# holds a boolean indicating whether to delegate subgraphs (for distributed mode)
CONFIG_KEY_CHECKPOINT_MAP = "checkpoint_map"
INTERRUPT = "__interrupt__"
ERROR = "__error__"
NO_WRITES = "__no_writes__"
SCHEDULED = "__scheduled__"
TASKS = "__pregel_tasks" # for backwards compat, this is the original name of PUSH
# holds a mapping of checkpoint_ns -> checkpoint_id for parent graphs
CONFIG_KEY_CHECKPOINT_ID = "checkpoint_id"
# holds the current checkpoint_id, if any
CONFIG_KEY_CHECKPOINT_NS = "checkpoint_ns"
# holds the current checkpoint_ns, "" for root graph

# --- Other constants ---
PUSH = "__pregel_push"
# denotes push-style tasks, ie. those created by Send objects
PULL = "__pregel_pull"
# denotes pull-style tasks, ie. those triggered by edges
RUNTIME_PLACEHOLDER = "__pregel_runtime_placeholder__"
# placeholder for managed values replaced at runtime
NS_SEP = "|"
# for checkpoint_ns, separates each level (ie. graph|subgraph|subsubgraph)
NS_END = ":"
# for checkpoint_ns, for each level, separates the namespace from the task_id

RESERVED = {
SCHEDULED,
TAG_HIDDEN,
# reserved write keys
INPUT,
INTERRUPT,
ERROR,
NO_WRITES,
SCHEDULED,
TASKS,
PUSH,
PULL,
# reserved config.configurable keys
CONFIG_KEY_SEND,
CONFIG_KEY_READ,
CONFIG_KEY_CHECKPOINTER,
CONFIG_KEY_CHECKPOINT_MAP,
CONFIG_KEY_STREAM,
CONFIG_KEY_STREAM_WRITER,
CONFIG_KEY_STORE,
CONFIG_KEY_CHECKPOINT_MAP,
CONFIG_KEY_RESUMING,
CONFIG_KEY_TASK_ID,
CONFIG_KEY_DEDUPE_TASKS,
CONFIG_KEY_ENSURE_LATEST,
CONFIG_KEY_DELEGATE,
INPUT,
CONFIG_KEY_CHECKPOINT_MAP,
CONFIG_KEY_CHECKPOINT_ID,
CONFIG_KEY_CHECKPOINT_NS,
# other constants
PUSH,
PULL,
RUNTIME_PLACEHOLDER,
NS_SEP,
NS_END,
}
TAG_HIDDEN = "langsmith:hidden"

START = "__start__"
END = "__end__"

NS_SEP = "|"
NS_END = ":"

EMPTY_MAP: Mapping[str, Any] = MappingProxyType({})


class Send:
"""A message or packet to send to a specific node in the graph.

The `Send` class is used within a `StateGraph`'s conditional edges to
dynamically invoke a node with a custom state at the next step.

Importantly, the sent state can differ from the core graph's state,
allowing for flexible and dynamic workflow management.

One such example is a "map-reduce" workflow where your graph invokes
the same node multiple times in parallel with different states,
before aggregating the results back into the main graph's state.

Attributes:
node (str): The name of the target node to send the message to.
arg (Any): The state or message to send to the target node.

Examples:
>>> from typing import Annotated
>>> import operator
>>> class OverallState(TypedDict):
... subjects: list[str]
... jokes: Annotated[list[str], operator.add]
...
>>> from langgraph.constants import Send
>>> from langgraph.graph import END, START
>>> def continue_to_jokes(state: OverallState):
... return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
...
>>> from langgraph.graph import StateGraph
>>> builder = StateGraph(OverallState)
>>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
>>> builder.add_conditional_edges(START, continue_to_jokes)
>>> builder.add_edge("generate_joke", END)
>>> graph = builder.compile()
>>>
>>> # Invoking with two subjects results in a generated joke for each
>>> graph.invoke({"subjects": ["cats", "dogs"]})
{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
"""

node: str
arg: Any

def __init__(self, /, node: str, arg: Any) -> None:
"""
Initialize a new instance of the Send class.

Args:
node (str): The name of the target node to send the message to.
arg (Any): The state or message to send to the target node.
"""
self.node = node
self.arg = arg

def __hash__(self) -> int:
return hash((self.node, self.arg))

def __repr__(self) -> str:
return f"Send(node={self.node!r}, arg={self.arg!r})"

def __eq__(self, value: object) -> bool:
return (
isinstance(value, Send)
and self.node == value.node
and self.arg == value.arg
)


@dataclass
class Interrupt:
value: Any
when: Literal["during"] = "during"
33 changes: 18 additions & 15 deletions libs/langgraph/langgraph/errors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Any, Sequence

from langgraph.checkpoint.base import EmptyChannelError
from langgraph.constants import Interrupt
from langgraph.checkpoint.base import EmptyChannelError # noqa: F401
from langgraph.types import Interrupt

# EmptyChannelError re-exported for backwards compatibility


class GraphRecursionError(RecursionError):
Expand All @@ -24,13 +26,14 @@ class GraphRecursionError(RecursionError):


class InvalidUpdateError(Exception):
"""Raised when attempting to update a channel with an invalid sequence of updates."""
"""Raised when attempting to update a channel with an invalid set of updates."""

pass


class GraphInterrupt(Exception):
"""Raised when a subgraph is interrupted."""
"""Raised when a subgraph is interrupted, suppressed by the root graph.
Never raised directly, or surfaced to the user."""

def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None:
super().__init__(interrupts)
Expand All @@ -44,7 +47,7 @@ def __init__(self, value: Any) -> None:


class GraphDelegate(Exception):
"""Raised when a graph is delegated."""
"""Raised when a graph is delegated (for distributed mode)."""

def __init__(self, *args: dict[str, Any]) -> None:
super().__init__(*args)
Expand All @@ -57,22 +60,22 @@ class EmptyInputError(Exception):


class TaskNotFound(Exception):
"""Raised when the executor is unable to find a task."""
"""Raised when the executor is unable to find a task (for distributed mode)."""

pass


class CheckpointNotLatest(Exception):
"""Raised when the checkpoint is not the latest version."""
"""Raised when the checkpoint is not the latest version (for distributed mode)."""

pass


class MultipleSubgraphsError(Exception):
"""Raised when multiple subgraphs are called inside the same node."""

pass


__all__ = [
"GraphRecursionError",
"InvalidUpdateError",
"GraphInterrupt",
"NodeInterrupt",
"EmptyInputError",
"EmptyChannelError",
]
_SEEN_CHECKPOINT_NS: set[str] = set()
"""Used for subgraph detection."""
5 changes: 2 additions & 3 deletions libs/langgraph/langgraph/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from typing_extensions import Self

from langgraph.channels.ephemeral_value import EphemeralValue
from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.constants import (
END,
NS_END,
Expand All @@ -38,8 +37,8 @@
from langgraph.errors import InvalidUpdateError
from langgraph.pregel import Channel, Pregel
from langgraph.pregel.read import PregelNode
from langgraph.pregel.types import All
from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry
from langgraph.types import All, Checkpointer
from langgraph.utils.runnable import RunnableCallable, coerce_to_runnable

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -406,7 +405,7 @@ def validate(self, interrupt: Optional[Sequence[str]] = None) -> Self:

def compile(
self,
checkpointer: Optional[BaseCheckpointSaver] = None,
checkpointer: Checkpointer = None,
interrupt_before: Optional[Union[All, list[str]]] = None,
interrupt_after: Optional[Union[All, list[str]]] = None,
debug: bool = False,
Expand Down
7 changes: 3 additions & 4 deletions libs/langgraph/langgraph/graph/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from langgraph.channels.ephemeral_value import EphemeralValue
from langgraph.channels.last_value import LastValue
from langgraph.channels.named_barrier_value import NamedBarrierValue
from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.constants import NS_END, NS_SEP, TAG_HIDDEN
from langgraph.errors import InvalidUpdateError
from langgraph.graph.graph import END, START, Branch, CompiledGraph, Graph, Send
Expand All @@ -45,9 +44,9 @@
is_writable_managed_value,
)
from langgraph.pregel.read import ChannelRead, PregelNode
from langgraph.pregel.types import All, RetryPolicy
from langgraph.pregel.write import SKIP_WRITE, ChannelWrite, ChannelWriteEntry
from langgraph.store.base import BaseStore
from langgraph.types import All, Checkpointer, RetryPolicy
from langgraph.utils.fields import get_field_default
from langgraph.utils.pydantic import create_model
from langgraph.utils.runnable import coerce_to_runnable
Expand Down Expand Up @@ -400,7 +399,7 @@ def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> Self:

def compile(
self,
checkpointer: Optional[BaseCheckpointSaver] = None,
checkpointer: Checkpointer = None,
*,
store: Optional[BaseStore] = None,
interrupt_before: Optional[Union[All, list[str]]] = None,
Expand All @@ -413,7 +412,7 @@ def compile(
streamed, batched, and run asynchronously.

Args:
checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
checkpointer (Checkpointer): An optional checkpoint saver object.
This serves as a fully versioned "memory" for the graph, allowing
the graph to be paused and resumed, and replayed from any point.
interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
Expand Down
4 changes: 2 additions & 2 deletions libs/langgraph/langgraph/prebuilt/chat_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
from langchain_core.tools import BaseTool

from langgraph._api.deprecation import deprecated_parameter
from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.graph import StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.message import add_messages
from langgraph.managed import IsLastStep
from langgraph.prebuilt.tool_executor import ToolExecutor
from langgraph.prebuilt.tool_node import ToolNode
from langgraph.types import Checkpointer


# We create the AgentState that we will pass around
Expand Down Expand Up @@ -132,7 +132,7 @@ def create_react_agent(
state_schema: Optional[StateSchemaType] = None,
messages_modifier: Optional[MessagesModifier] = None,
state_modifier: Optional[StateModifier] = None,
checkpointer: Optional[BaseCheckpointSaver] = None,
checkpointer: Checkpointer = None,
interrupt_before: Optional[list[str]] = None,
interrupt_after: Optional[list[str]] = None,
debug: bool = False,
Expand Down
Loading
Loading