diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py index 8e586c96e..9cbb6bf83 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/executor.py @@ -35,6 +35,7 @@ MessageToExecutor, MessageToOrchestrator, Producer, + Sendable, Topics, ) from langgraph.utils.config import patch_configurable @@ -129,7 +130,9 @@ async def each(self, msg: MessageToExecutor) -> None: input=orjson.Fragment( self.graph.checkpointer.serde.dumps(arg["input"]) ), - finally_executor=[msg], + finally_send=[ + Sendable(topic=self.topics.executor, value=msg) + ], ) ), # use thread_id, checkpoint_ns as partition key @@ -211,7 +214,7 @@ async def attempt(self, msg: MessageToExecutor) -> None: MessageToOrchestrator( input=None, config=msg["config"], - finally_executor=msg.get("finally_executor"), + finally_send=msg.get("finally_send"), ) ), # use thread_id, checkpoint_ns as partition key @@ -322,7 +325,9 @@ def each(self, msg: MessageToExecutor) -> None: input=orjson.Fragment( self.graph.checkpointer.serde.dumps(arg["input"]) ), - finally_executor=[msg], + finally_send=[ + Sendable(topic=self.topics.executor, value=msg) + ], ) ), # use thread_id, checkpoint_ns as partition key @@ -403,7 +408,7 @@ def attempt(self, msg: MessageToExecutor) -> None: MessageToOrchestrator( input=None, config=msg["config"], - finally_executor=msg.get("finally_executor"), + finally_send=msg.get("finally_send"), ) ), # use thread_id, checkpoint_ns as partition key diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py index a04d49537..1ad9c5c5b 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py @@ -187,7 +187,7 @@ async def attempt(self, msg: MessageToOrchestrator) -> None: }, ), task=ExecutorTask(id=task.id, path=task.path), - finally_executor=msg.get("finally_executor"), + finally_send=msg.get("finally_send"), ) ), ) @@ -212,12 +212,16 @@ async def attempt(self, msg: MessageToOrchestrator) -> None: ) ], ) - elif loop.status == "done" and msg.get("finally_executor"): - # schedule any finally_executor tasks + elif loop.status == "done" and msg.get("finally_send"): + # send any finally_send messages futs = await asyncio.gather( *( - self.producer.send(self.topics.executor, value=serde.dumps(m)) - for m in msg["finally_executor"] + self.producer.send( + m["topic"], + value=serde.dumps(m["value"]) if m.get("value") else None, + key=serde.dumps(m["key"]) if m.get("key") else None, + ) + for m in msg["finally_send"] ) ) # wait for messages to be sent @@ -288,7 +292,6 @@ def __next__(self) -> list[MessageToOrchestrator]: recs = self.consumer.getmany( timeout_ms=self.batch_max_ms, max_records=self.batch_max_n ) - print("orch.__next__", recs) # dedupe messages, eg. if multiple nodes finish around same time uniq = set(msg.value for msgs in recs.values() for msg in msgs) msgs: list[MessageToOrchestrator] = [serde.loads(msg) for msg in uniq] @@ -370,7 +373,7 @@ def attempt(self, msg: MessageToOrchestrator) -> None: }, ), task=ExecutorTask(id=task.id, path=task.path), - finally_executor=msg.get("finally_executor"), + finally_send=msg.get("finally_send"), ) ), ) @@ -394,11 +397,15 @@ def attempt(self, msg: MessageToOrchestrator) -> None: ) ], ) - elif loop.status == "done" and msg.get("finally_executor"): - # schedule any finally_executor tasks + elif loop.status == "done" and msg.get("finally_send"): + # schedule any finally_send msgs futs = [ - self.producer.send(self.topics.executor, value=serde.dumps(m)) - for m in msg["finally_executor"] + self.producer.send( + m["topic"], + value=serde.dumps(m["value"]) if m.get("value") else None, + key=serde.dumps(m["key"]) if m.get("key") else None, + ) + for m in msg["finally_send"] ] # wait for messages to be sent concurrent.futures.wait(futs) diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/types.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/types.py index 2471b6eb6..8230960b4 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/types.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/types.py @@ -11,10 +11,16 @@ class Topics(NamedTuple): error: str +class Sendable(TypedDict): + topic: str + value: Optional[Any] + key: Optional[Any] + + class MessageToOrchestrator(TypedDict): input: Optional[dict[str, Any]] config: RunnableConfig - finally_executor: Optional[Sequence["MessageToExecutor"]] + finally_send: Optional[Sequence[Sendable]] class ExecutorTask(TypedDict): @@ -25,7 +31,7 @@ class ExecutorTask(TypedDict): class MessageToExecutor(TypedDict): config: RunnableConfig task: ExecutorTask - finally_executor: Optional[Sequence["MessageToExecutor"]] + finally_send: Optional[Sequence[Sendable]] class ErrorMessage(TypedDict): diff --git a/libs/scheduler-kafka/tests/drain.py b/libs/scheduler-kafka/tests/drain.py index 0f0c22d44..84a3463c2 100644 --- a/libs/scheduler-kafka/tests/drain.py +++ b/libs/scheduler-kafka/tests/drain.py @@ -32,7 +32,9 @@ async def drain_topics_async( def done() -> bool: return ( len(orch_msgs) > 0 + and any(orch_msgs) and len(exec_msgs) > 0 + and any(exec_msgs) and not orch_msgs[-1] and not exec_msgs[-1] ) @@ -97,7 +99,9 @@ def drain_topics( def done() -> bool: return ( len(orch_msgs) > 0 + and any(orch_msgs) and len(exec_msgs) > 0 + and any(exec_msgs) and not orch_msgs[-1] and not exec_msgs[-1] ) @@ -110,7 +114,6 @@ def orchestrator() -> None: if debug: print("\n---\norch", len(msgs), msgs) if done(): - print("am i done? orchestrator") event.set() if event.is_set(): break @@ -126,7 +129,6 @@ def executor() -> None: if debug: print("\n---\nexec", len(msgs), msgs) if done(): - print("am i done? executor") event.set() if event.is_set(): break diff --git a/libs/scheduler-kafka/tests/test_fanout.py b/libs/scheduler-kafka/tests/test_fanout.py index ee8719509..46c993576 100644 --- a/libs/scheduler-kafka/tests/test_fanout.py +++ b/libs/scheduler-kafka/tests/test_fanout.py @@ -136,7 +136,7 @@ async def test_fanout_graph(topics: Topics, acheckpointer: BaseCheckpointSaver) "tags": [], }, "input": None, - "finally_executor": None, + "finally_send": None, } for c in reversed(history) for _ in c.tasks @@ -161,7 +161,7 @@ async def test_fanout_graph(topics: Topics, acheckpointer: BaseCheckpointSaver) "id": t.id, "path": list(t.path), }, - "finally_executor": None, + "finally_send": None, } for c in reversed(history) for t in c.tasks @@ -218,7 +218,7 @@ async def test_fanout_graph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[1:]) # the last one wasn't executed # orchestrator messages appear only after tasks for that checkpoint @@ -245,7 +245,7 @@ async def test_fanout_graph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[1:]) # the last one wasn't executed for t in c.tasks diff --git a/libs/scheduler-kafka/tests/test_fanout_sync.py b/libs/scheduler-kafka/tests/test_fanout_sync.py index 9829e5cee..584254a72 100644 --- a/libs/scheduler-kafka/tests/test_fanout_sync.py +++ b/libs/scheduler-kafka/tests/test_fanout_sync.py @@ -133,7 +133,7 @@ def test_fanout_graph(topics: Topics, checkpointer: BaseCheckpointSaver) -> None "tags": [], }, "input": None, - "finally_executor": None, + "finally_send": None, } for c in reversed(history) for _ in c.tasks @@ -158,7 +158,7 @@ def test_fanout_graph(topics: Topics, checkpointer: BaseCheckpointSaver) -> None "id": t.id, "path": list(t.path), }, - "finally_executor": None, + "finally_send": None, } for c in reversed(history) for t in c.tasks @@ -216,7 +216,7 @@ def test_fanout_graph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[1:]) # the last one wasn't executed # orchestrator messages appear only after tasks for that checkpoint @@ -243,7 +243,7 @@ def test_fanout_graph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[1:]) # the last one wasn't executed for t in c.tasks diff --git a/libs/scheduler-kafka/tests/test_subgraph.py b/libs/scheduler-kafka/tests/test_subgraph.py index a9708b951..ad6d7553c 100644 --- a/libs/scheduler-kafka/tests/test_subgraph.py +++ b/libs/scheduler-kafka/tests/test_subgraph.py @@ -174,7 +174,7 @@ async def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[1:]) # the last one wasn't executed # orchestrator messages appear only after tasks for that checkpoint @@ -228,28 +228,31 @@ async def test_subgraph_w_interrupt( ], "route": "weather", }, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": False, - "checkpoint_id": history[0].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": False, + "checkpoint_id": history[0].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[0].tasks[0].id, + "path": list(history[0].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[0].tasks[0].id, - "path": list(history[0].tasks[0].path), }, } ], @@ -283,28 +286,31 @@ async def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": False, - "checkpoint_id": history[0].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": False, + "checkpoint_id": history[0].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[0].tasks[0].id, + "path": list(history[0].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[0].tasks[0].id, - "path": list(history[0].tasks[0].path), }, } ], @@ -340,7 +346,7 @@ async def test_subgraph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": None, + "finally_send": None, } for c in reversed(history) for t in c.tasks @@ -376,28 +382,31 @@ async def test_subgraph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": False, - "checkpoint_id": history[0].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": False, + "checkpoint_id": history[0].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[0].tasks[0].id, + "path": list(history[0].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[0].tasks[0].id, - "path": list(history[0].tasks[0].path), }, } ], @@ -473,28 +482,31 @@ async def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": True, - "checkpoint_id": history[1].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": True, + "checkpoint_id": history[1].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[1].tasks[0].id, + "path": list(history[1].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[1].tasks[0].id, - "path": list(history[1].tasks[0].path), }, } ], @@ -528,28 +540,31 @@ async def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": True, - "checkpoint_id": history[1].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": True, + "checkpoint_id": history[1].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[1].tasks[0].id, + "path": list(history[1].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[1].tasks[0].id, - "path": list(history[1].tasks[0].path), }, } ], @@ -575,7 +590,7 @@ async def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[:2]) for _ in c.tasks @@ -606,7 +621,7 @@ async def test_subgraph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[:2]) for t in c.tasks @@ -642,28 +657,31 @@ async def test_subgraph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": True, - "checkpoint_id": history[1].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": True, + "checkpoint_id": history[1].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[1].tasks[0].id, + "path": list(history[1].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[1].tasks[0].id, - "path": list(history[1].tasks[0].path), }, } ], @@ -690,7 +708,7 @@ async def test_subgraph_w_interrupt( "recursion_limit": 25, "tags": [], }, - "finally_executor": None, + "finally_send": None, "task": { "id": history[1].tasks[0].id, "path": list(history[1].tasks[0].path), diff --git a/libs/scheduler-kafka/tests/test_subgraph_sync.py b/libs/scheduler-kafka/tests/test_subgraph_sync.py index a616049d9..53f651050 100644 --- a/libs/scheduler-kafka/tests/test_subgraph_sync.py +++ b/libs/scheduler-kafka/tests/test_subgraph_sync.py @@ -173,7 +173,7 @@ def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[1:]) # the last one wasn't executed # orchestrator messages appear only after tasks for that checkpoint @@ -227,28 +227,31 @@ def test_subgraph_w_interrupt( ], "route": "weather", }, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": False, - "checkpoint_id": history[0].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": False, + "checkpoint_id": history[0].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[0].tasks[0].id, + "path": list(history[0].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[0].tasks[0].id, - "path": list(history[0].tasks[0].path), }, } ], @@ -282,28 +285,31 @@ def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": False, - "checkpoint_id": history[0].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": False, + "checkpoint_id": history[0].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[0].tasks[0].id, + "path": list(history[0].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[0].tasks[0].id, - "path": list(history[0].tasks[0].path), }, } ], @@ -339,7 +345,7 @@ def test_subgraph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": None, + "finally_send": None, } for c in reversed(history) for t in c.tasks @@ -375,28 +381,31 @@ def test_subgraph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": False, - "checkpoint_id": history[0].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": False, + "checkpoint_id": history[0].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[0].tasks[0].id, + "path": list(history[0].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[0].tasks[0].id, - "path": list(history[0].tasks[0].path), }, } ], @@ -471,28 +480,31 @@ def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": True, - "checkpoint_id": history[1].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": True, + "checkpoint_id": history[1].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[1].tasks[0].id, + "path": list(history[1].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[1].tasks[0].id, - "path": list(history[1].tasks[0].path), }, } ], @@ -526,28 +538,31 @@ def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": True, - "checkpoint_id": history[1].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": True, + "checkpoint_id": history[1].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[1].tasks[0].id, + "path": list(history[1].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[1].tasks[0].id, - "path": list(history[1].tasks[0].path), }, } ], @@ -573,7 +588,7 @@ def test_subgraph_w_interrupt( "tags": [], }, "input": None, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[:2]) for _ in c.tasks @@ -604,7 +619,7 @@ def test_subgraph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": None, + "finally_send": None, } for c in reversed(history[:2]) for t in c.tasks @@ -640,28 +655,31 @@ def test_subgraph_w_interrupt( "id": t.id, "path": list(t.path), }, - "finally_executor": [ + "finally_send": [ { - "config": { - "callbacks": None, - "configurable": { - "__pregel_dedupe_tasks": True, - "__pregel_ensure_latest": True, - "__pregel_resuming": True, - "checkpoint_id": history[1].config["configurable"][ - "checkpoint_id" - ], - "checkpoint_ns": "", - "thread_id": "1", + "topic": topics.executor, + "value": { + "config": { + "callbacks": None, + "configurable": { + "__pregel_dedupe_tasks": True, + "__pregel_ensure_latest": True, + "__pregel_resuming": True, + "checkpoint_id": history[1].config[ + "configurable" + ]["checkpoint_id"], + "checkpoint_ns": "", + "thread_id": "1", + }, + "metadata": AnyDict(), + "recursion_limit": 25, + "tags": [], + }, + "finally_send": None, + "task": { + "id": history[1].tasks[0].id, + "path": list(history[1].tasks[0].path), }, - "metadata": AnyDict(), - "recursion_limit": 25, - "tags": [], - }, - "finally_executor": None, - "task": { - "id": history[1].tasks[0].id, - "path": list(history[1].tasks[0].path), }, } ], @@ -688,7 +706,7 @@ def test_subgraph_w_interrupt( "recursion_limit": 25, "tags": [], }, - "finally_executor": None, + "finally_send": None, "task": { "id": history[1].tasks[0].id, "path": list(history[1].tasks[0].path),