langgraph/how-tos/map-reduce/ #609
Replies: 18 comments 13 replies
-
I used this case, but I got an error on the line "return [Send("generate_joke", {"subject": s}) for s in state['subjects']] " that says "Callable() takes no arguments". How should I solve this? |
Beta Was this translation helpful? Give feedback.
-
I have a question regrading parallel execution, when two nodes or more are executed in parallel, with all the same state, changes are made at the same time to the same state, how do you aggregate the unified the state at the end? |
Beta Was this translation helpful? Give feedback.
-
What if I wanted to pause on a parallel state and resume execution due to external factors. How would I reference the parallel state to continue execution? Is that possible now or a future feature? |
Beta Was this translation helpful? Give feedback.
-
Hello, Thanks for great work ! While returning the output of the instance-processed function (here generate_joke), I got the following error and I don't understand why it does happen: Here is what I'm returning: Where "jokes" is an attribute of the overall class. May it be a compatibility issue ? Or do you have any idea ? Thanks |
Beta Was this translation helpful? Give feedback.
-
That use of conditional edges with 'Send' I'm having trouble understanding. When would you need to use 'Send' in a conditional edge vs. replacing that conditional edge with a node that does the same thing?
|
Beta Was this translation helpful? Give feedback.
-
Hi I have used SEND class but I think it might have a problem with tool, when a different node sends objects to a node with a tool this raises an error |
Beta Was this translation helpful? Give feedback.
-
I found a case where the model returning the joke index was not using 0 based indexing.
|
Beta Was this translation helpful? Give feedback.
-
Can there be 2 map-reduce branches in one Langgraph? |
Beta Was this translation helpful? Give feedback.
-
HI,I want to know if there is any difference between use Send api and runnable.abatch operation in Node. |
Beta Was this translation helpful? Give feedback.
-
Hi can you show a more simpler example code like the one which you made for human in the loop, i seem to be getting lost in how send works and how graph.add_conditional_edges works in this scenario and can multiple send be used. |
Beta Was this translation helpful? Give feedback.
-
If there are dependencies between decomposed tasks, how should it be implemented? |
Beta Was this translation helpful? Give feedback.
-
Hey all, Two questions:
Question 1
Image(app.get_graph().draw_mermaid_png())
import * as tslab from "tslab";
const representation = app.getGraph();
const image = await representation.drawMermaidPng();
const arrayBuffer = await image.arrayBuffer();
tslab.display.png(new Uint8Array(arrayBuffer)); Question 2 import operator
from typing import Annotated, TypedDict
from langchain_openai import ChatOpenAI
#from langchain_anthropic import ChatAnthropic
from langgraph.constants import Send
from langgraph.graph import END, StateGraph, START
# NOTE:
# - if you're using langchain-core >= 0.3, you need to use pydantic v2
# - if you're using langchain-core >= 0.2,<0.3, you need to use pydantic v1
from langchain_core import __version__ as core_version
from packaging import version
core_version = version.parse(core_version)
if (core_version.major, core_version.minor) < (0, 3):
from pydantic.v1 import BaseModel, Field
else:
from pydantic import BaseModel, Field
model = ChatOpenAI(model="gpt-4o", temperature=0, api_key=OPEN_AI_KEY)
# Model and prompts
# Define model and prompts we will use
subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.
{jokes}"""
class Subjects(BaseModel):
subjects: list[str]
class Joke(BaseModel):
joke: str
class BestJoke(BaseModel):
id: int = Field(description="Index of the best joke, starting with 0", ge=0)
# Graph components: define the components that will make up the graph
# This will be the overall state of the main graph.
# It will contain a topic (which we expect the user to provide)
# and then will generate a list of subjects, and then a joke for
# each subject
class OverallState(TypedDict):
topic: str
subjects: list
# Notice here we use the operator.add
# This is because we want combine all the jokes we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
jokes: Annotated[list, operator.add]
best_selected_joke: str
# This will be the state of the node that we will "map" all
# subjects to in order to generate a joke
class JokeState(TypedDict):
subject: str
# This is the function we will use to generate the subjects of the jokes
def generate_topics(state: OverallState):
prompt = subjects_prompt.format(topic=state["topic"])
response = model.with_structured_output(Subjects).invoke(prompt)
return {"subjects": response.subjects}
# Here we generate a joke, given a subject
def generate_joke(state: JokeState):
prompt = joke_prompt.format(subject=state["subject"])
response = model.with_structured_output(Joke).invoke(prompt)
return {"jokes": [response.joke]}
# Here we define the logic to map out over the generated subjects
# We will use this an edge in the graph
def continue_to_jokes(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
# Here we will judge the best joke
def best_joke(state: OverallState):
jokes = "\n\n".join(state["jokes"])
prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
response = model.with_structured_output(BestJoke).invoke(prompt)
return {"best_selected_joke": state["jokes"][response.id]}
# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
app = graph.compile()
# Call the graph: here we call it to generate a list of jokes
for s in app.stream({"topic": "animals"}):
print(s) and the output
|
Beta Was this translation helpful? Give feedback.
-
How can the SEND approach be applied in this case? I have annual reports for companies A and B, each with its dedicated retrieval tool: retrieval-tool-A for Company A and retrieval-tool-B for Company B. When a user asks, "What were the revenues of Company A and B in the past year?" the typical tools-assisted agent splits this into two queries: "What were the revenues for Company A in the past year?" (handled by retrieval-tool-A) and "What were the revenues for Company B in the past year?" (managed by retrieval-tool-B). These answers are then combined to provide the final response. Can this process be parallelized using the SEND approach? Since the two queries are independent and use separate tools, running them in parallel could significantly reduce response time and enhance the robustness and scalability of tool-assisted agentic methods. |
Beta Was this translation helpful? Give feedback.
-
I got an error using ollama llama3.1 with chatopenai. But not with a paid openai model. How can I modifiy to overcome this error? ValidationError Traceback (most recent call last) File ~/miniconda3/envs/Langgraph/lib/python3.10/site-packages/langgraph/pregel/init.py:1298, in Pregel.stream(self, input, config, stream_mode, output_keys, interrupt_before, interrupt_after, debug, subgraphs) ValidationError: 1 validation error for Subjects |
Beta Was this translation helpful? Give feedback.
-
Question: I want to modify this flow where I add an additional node (retrieve_jokes - retrieving similar jokes) between the "generate_subject" and "generate_joke" nodes. The new node should retrieve related jokes based on the subject and feed them into the joke generation process. Here’s the flow I am envisioning: Generate Subject → New Node: Retrieve similar jokes from a vector DB by the subject. Generate Joke for Each Retrieved similar jokes: Generate a new joke for each retrieved document using the subject as context. Reduce: Choose the best joke from the generated jokes.
I need to extend functionality to save states for every subject (evaluator or self-reflection might be added for node to generate joke). should be dynamic due to uncountable number of subject. How to modify this flow to get this kindda graph? |
Beta Was this translation helpful? Give feedback.
-
some body know how to disable parallel, it there a option can control this feature enable or disable ? In some time, I need branches execute in sequence. thanks |
Beta Was this translation helpful? Give feedback.
-
Is there a way to manage the maximum concurrency of the parallel branches? I need to limit it in line with the rate limit of my LLM saas api provider |
Beta Was this translation helpful? Give feedback.
-
Thanks for this, it's really useful to see how Send API works. However, I ran the exact same code, yet I only see one joke?
Anyone else facing the same issue? |
Beta Was this translation helpful? Give feedback.
-
langgraph/how-tos/map-reduce/
Build language agents as graphs
https://langchain-ai.github.io/langgraph/how-tos/map-reduce/
Beta Was this translation helpful? Give feedback.
All reactions