From 8f87cd1c914360d8dc58729dcc06ba701a708236 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Thu, 12 Sep 2024 11:01:37 -0700 Subject: [PATCH] Performance improvements in core library (#1683) * Performance improvements in core library - Avoid creating new callback manager when received one as arg - Avoid looking for config when already received one as arg - Avoid copies of values in ensure_config/merge_configs - Implement version of ensure_config that accepts multiple configs (avoids calling merge_configs first) - Avoid calling merge_configs when we only need to attach extra tags/metadata * Fix * Fix * Try again * Debug ci job * Fix * Try again * Try again * Try again * Some more variations * Attach annotation to first changed file * Fix * Re-enable benchmarks --- .github/workflows/_integration_test.yml | 2 +- .github/workflows/_lint.yml | 2 +- .github/workflows/bench.yml | 44 ++++- libs/langgraph/langgraph/pregel/__init__.py | 9 +- libs/langgraph/langgraph/pregel/algo.py | 31 ++-- libs/langgraph/langgraph/pregel/io.py | 5 +- libs/langgraph/langgraph/pregel/loop.py | 7 +- libs/langgraph/langgraph/pregel/read.py | 30 ++-- libs/langgraph/langgraph/utils/config.py | 181 +++++++++++++++++--- libs/langgraph/langgraph/utils/runnable.py | 31 ++-- 10 files changed, 257 insertions(+), 85 deletions(-) diff --git a/.github/workflows/_integration_test.yml b/.github/workflows/_integration_test.yml index 728f1b86d..4f4e9bac7 100644 --- a/.github/workflows/_integration_test.yml +++ b/.github/workflows/_integration_test.yml @@ -22,7 +22,7 @@ jobs: - uses: actions/checkout@v4 - name: Get changed files id: changed-files - uses: Ana06/get-changed-files@v2.2.0 + uses: Ana06/get-changed-files@v2.3.0 with: filter: "libs/cli/**" - name: Set up Python ${{ matrix.python-version }} + Poetry ${{ env.POETRY_VERSION }} diff --git a/.github/workflows/_lint.yml b/.github/workflows/_lint.yml index 9ba0ff5a9..dbb7133ea 100644 --- a/.github/workflows/_lint.yml +++ b/.github/workflows/_lint.yml @@ -34,7 +34,7 @@ jobs: - uses: actions/checkout@v4 - name: Get changed files id: changed-files - uses: Ana06/get-changed-files@v2.2.0 + uses: Ana06/get-changed-files@v2.3.0 with: filter: "${{ inputs.working-directory }}/**" - name: Set up Python ${{ matrix.python-version }} + Poetry ${{ env.POETRY_VERSION }} diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 2e7041fd6..1e1117e5c 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -27,13 +27,13 @@ jobs: - name: Install dependencies run: poetry install --with dev - name: Run benchmarks - run: make benchmark + run: OUTPUT=out/benchmark-baseline.json make -s benchmark - name: Upload benchmark baseline uses: actions/upload-artifact@v4 with: - name: benchmark-baseline.json - path: libs/langgraph/out/benchmark.json - compare: + name: benchmark-baseline + path: libs/langgraph/out/benchmark-baseline.json + benchmark: runs-on: ubuntu-latest defaults: run: @@ -41,6 +41,11 @@ jobs: needs: [baseline] steps: - uses: actions/checkout@v4 + - id: files + name: Get changed files + uses: Ana06/get-changed-files@v2.3.0 + with: + format: json - name: Set up Python 3.11 + Poetry ${{ env.POETRY_VERSION }} uses: "./.github/actions/poetry_setup" with: @@ -50,13 +55,36 @@ jobs: - name: Install dependencies run: poetry install --with dev - name: Run benchmarks - run: make benchmark + id: benchmark + run: | + { + echo 'OUTPUT<> "$GITHUB_OUTPUT" - name: Download benchmark baseline uses: actions/download-artifact@v4 with: - name: benchmark-baseline.json path: libs/langgraph/out + merge-multiple: true - name: Compare benchmarks - run: poetry run pyperf compare_to out/benchmark-baseline.json out/benchmark.json --table --group-by-speed >> $GITHUB_OUTPUT + id: compare + run: | + { + echo 'OUTPUT<> "$GITHUB_OUTPUT" - name: Annotation - run: echo "::notice file=libs/langgraph/bench/__main__.py::$GITHUB_OUTPUT" + uses: actions/github-script@v7 + with: + script: | + const file = JSON.parse(`${{ steps.files.outputs.added_modified_renamed }}`)[0] + core.notice(`${{ steps.benchmark.outputs.OUTPUT }}`, { + title: 'Benchmark results', + file, + }) + core.notice(`${{ steps.compare.outputs.OUTPUT }}`, { + title: 'Comparison against main', + file, + }) diff --git a/libs/langgraph/langgraph/pregel/__init__.py b/libs/langgraph/langgraph/pregel/__init__.py index c2324f920..b5cdb6b19 100644 --- a/libs/langgraph/langgraph/pregel/__init__.py +++ b/libs/langgraph/langgraph/pregel/__init__.py @@ -29,7 +29,6 @@ from langchain_core.runnables.base import Input, Output from langchain_core.runnables.config import ( RunnableConfig, - ensure_config, get_async_callback_manager_for_config, get_callback_manager_for_config, ) @@ -86,6 +85,7 @@ from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry from langgraph.store.base import BaseStore from langgraph.utils.config import ( + ensure_config, merge_configs, patch_checkpoint_map, patch_config, @@ -1156,7 +1156,7 @@ def output() -> Iterator: else: yield payload - config = ensure_config(merge_configs(self.config, config)) + config = ensure_config(self.config, config) callback_manager = get_callback_manager_for_config(config) run_manager = callback_manager.on_chain_start( None, @@ -1337,7 +1337,7 @@ def output() -> Iterator: else: yield payload - config = ensure_config(merge_configs(self.config, config)) + config = ensure_config(self.config, config) callback_manager = get_async_callback_manager_for_config(config) run_manager = await callback_manager.on_chain_start( None, @@ -1402,8 +1402,7 @@ def output() -> Iterator: # channel updates from step N are only visible in step N+1 # channels are guaranteed to be immutable for the duration of the step, # with channel updates applied only at the transition between steps - while await asyncio.to_thread( - loop.tick, + while loop.tick( input_keys=self.input_channels, interrupt_before=interrupt_before, interrupt_after=interrupt_after, diff --git a/libs/langgraph/langgraph/pregel/algo.py b/libs/langgraph/langgraph/pregel/algo.py index 9f9837980..f23424ca0 100644 --- a/libs/langgraph/langgraph/pregel/algo.py +++ b/libs/langgraph/langgraph/pregel/algo.py @@ -46,6 +46,8 @@ from langgraph.pregel.types import All, PregelExecutableTask, PregelTask from langgraph.utils.config import merge_configs, patch_config +EMPTY_SEQ = tuple() + class WritesProtocol(Protocol): name: str @@ -78,7 +80,10 @@ def should_interrupt( task for task in tasks if ( - (not task.config or TAG_HIDDEN not in task.config.get("tags")) + ( + not task.config + or TAG_HIDDEN not in task.config.get("tags", EMPTY_SEQ) + ) if interrupt_nodes == "*" else task.name in interrupt_nodes ) @@ -182,11 +187,10 @@ def apply_writes( for chan in task.triggers if chan not in RESERVED and chan in channels }: - if channels[chan].consume(): - if get_next_version is not None: - checkpoint["channel_versions"][chan] = get_next_version( - max_version, channels[chan] - ) + if channels[chan].consume() and get_next_version is not None: + checkpoint["channel_versions"][chan] = get_next_version( + max_version, channels[chan] + ) # clear pending sends if checkpoint["pending_sends"]: @@ -216,8 +220,7 @@ def apply_writes( updated_channels: set[str] = set() for chan, vals in pending_writes_by_channel.items(): if chan in channels: - updated = channels[chan].update(vals) - if updated and get_next_version is not None: + if channels[chan].update(vals) and get_next_version is not None: checkpoint["channel_versions"][chan] = get_next_version( max_version, channels[chan] ) @@ -370,6 +373,8 @@ def prepare_single_task( proc = processes[packet.node] if node := proc.node: managed.replace_runtime_placeholders(step, packet.arg) + if proc.metadata: + metadata.update(proc.metadata) writes = deque() task_checkpoint_ns = f"{checkpoint_ns}:{task_id}" return PregelExecutableTask( @@ -379,9 +384,7 @@ def prepare_single_task( writes, patch_config( merge_configs( - config, - processes[packet.node].config, - {"metadata": metadata}, + config, {"metadata": metadata, "tags": proc.tags} ), run_name=packet.node, callbacks=( @@ -478,6 +481,8 @@ def prepare_single_task( if for_execution: if node := proc.node: + if proc.metadata: + metadata.update(proc.metadata) writes = deque() task_checkpoint_ns = f"{checkpoint_ns}:{task_id}" return PregelExecutableTask( @@ -487,9 +492,7 @@ def prepare_single_task( writes, patch_config( merge_configs( - config, - proc.config, - {"metadata": metadata}, + config, {"metadata": metadata, "tags": proc.tags} ), run_name=name, callbacks=( diff --git a/libs/langgraph/langgraph/pregel/io.py b/libs/langgraph/langgraph/pregel/io.py index 286282f1d..a02afbdec 100644 --- a/libs/langgraph/langgraph/pregel/io.py +++ b/libs/langgraph/langgraph/pregel/io.py @@ -97,6 +97,9 @@ def __radd__(self, other: dict[str, Any]) -> "AddableUpdatesDict": raise TypeError("AddableUpdatesDict does not support right-side addition") +EMPTY_SEQ = tuple() + + def map_output_updates( output_channels: Union[str, Sequence[str]], tasks: list[tuple[PregelExecutableTask, Sequence[tuple[str, Any]]]], @@ -106,7 +109,7 @@ def map_output_updates( output_tasks = [ (t, ww) for t, ww in tasks - if (not t.config or TAG_HIDDEN not in t.config.get("tags")) + if (not t.config or TAG_HIDDEN not in t.config.get("tags", EMPTY_SEQ)) and ww[0][0] != ERROR and ww[0][0] != INTERRUPT ] diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index 9d96fbc3e..2cf6fab5e 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -554,7 +554,9 @@ def _output_writes( self, task_id: str, writes: Sequence[tuple[str, Any]], *, cached: bool = False ) -> None: if task := self.tasks.get(task_id): - if task.config is not None and TAG_HIDDEN in task.config.get("tags"): + if task.config is not None and TAG_HIDDEN in task.config.get( + "tags", EMPTY_SEQ + ): return if writes[0][0] != ERROR and writes[0][0] != INTERRUPT: self._emit( @@ -806,3 +808,6 @@ async def __aexit__( return await asyncio.shield( self.stack.__aexit__(exc_type, exc_value, traceback) ) + + +EMPTY_SEQ = tuple() diff --git a/libs/langgraph/langgraph/pregel/read.py b/libs/langgraph/langgraph/pregel/read.py index 4d9944661..e0e483ffd 100644 --- a/libs/langgraph/langgraph/pregel/read.py +++ b/libs/langgraph/langgraph/pregel/read.py @@ -120,7 +120,9 @@ class PregelNode(Runnable): retry_policy: Optional[RetryPolicy] - config: RunnableConfig + tags: Optional[Sequence[str]] + + metadata: Optional[Mapping[str, Any]] def __init__( self, @@ -133,7 +135,6 @@ def __init__( metadata: Optional[Mapping[str, Any]] = None, bound: Optional[Runnable[Any, Any]] = None, retry_policy: Optional[RetryPolicy] = None, - config: Optional[RunnableConfig] = None, ) -> None: self.channels = channels self.triggers = list(triggers) @@ -141,9 +142,8 @@ def __init__( self.writers = writers or [] self.bound = bound if bound is not None else DEFAULT_BOUND self.retry_policy = retry_policy - self.config = merge_configs( - config, {"tags": tags or [], "metadata": metadata or {}} - ) + self.tags = tags + self.metadata = metadata def copy(self, update: dict[str, Any]) -> PregelNode: attrs = {**self.__dict__, **update} @@ -162,7 +162,7 @@ def flat_writers(self) -> list[Runnable]: # careful to not modify the original writers list or ChannelWrite writers[-2] = ChannelWrite( writes=writers[-2].writes + writers[-1].writes, - tags=writers[-2].config["tags"] if writers[-2].config else None, + tags=writers[-2].tags, require_at_least_one_of=writers[-2].require_at_least_one_of, ) writers.pop() @@ -238,7 +238,11 @@ def invoke( config: Optional[RunnableConfig] = None, **kwargs: Optional[Any], ) -> Output: - return self.bound.invoke(input, merge_configs(self.config, config), **kwargs) + return self.bound.invoke( + input, + merge_configs({"metadata": self.metadata, "tags": self.tags}, config), + **kwargs, + ) async def ainvoke( self, @@ -247,7 +251,9 @@ async def ainvoke( **kwargs: Optional[Any], ) -> Output: return await self.bound.ainvoke( - input, merge_configs(self.config, config), **kwargs + input, + merge_configs({"metadata": self.metadata, "tags": self.tags}, config), + **kwargs, ) def stream( @@ -257,7 +263,9 @@ def stream( **kwargs: Optional[Any], ) -> Iterator[Output]: yield from self.bound.stream( - input, merge_configs(self.config, config), **kwargs + input, + merge_configs({"metadata": self.metadata, "tags": self.tags}, config), + **kwargs, ) async def astream( @@ -267,6 +275,8 @@ async def astream( **kwargs: Optional[Any], ) -> AsyncIterator[Output]: async for item in self.bound.astream( - input, merge_configs(self.config, config), **kwargs + input, + merge_configs({"metadata": self.metadata, "tags": self.tags}, config), + **kwargs, ): yield item diff --git a/libs/langgraph/langgraph/utils/config.py b/libs/langgraph/langgraph/utils/config.py index 461c89024..4a69050a1 100644 --- a/libs/langgraph/langgraph/utils/config.py +++ b/libs/langgraph/langgraph/utils/config.py @@ -1,8 +1,14 @@ -from typing import Any, Optional +from collections import ChainMap +from typing import Any, Optional, Sequence -from langchain_core.callbacks import Callbacks +from langchain_core.callbacks import AsyncCallbackManager, CallbackManager, Callbacks from langchain_core.runnables import RunnableConfig -from langchain_core.runnables.config import COPIABLE_KEYS, DEFAULT_RECURSION_LIMIT +from langchain_core.runnables.config import ( + CONFIG_KEYS, + COPIABLE_KEYS, + DEFAULT_RECURSION_LIMIT, + var_child_runnable_config, +) from langgraph.checkpoint.base import CheckpointMetadata from langgraph.constants import CONFIG_KEY_CHECKPOINT_MAP @@ -53,56 +59,56 @@ def merge_configs(*configs: Optional[RunnableConfig]) -> RunnableConfig: for config in configs: if config is None: continue - for key in config: + for key, value in config.items(): + if not value: + continue if key == "metadata": - base[key] = { # type: ignore - **base.get(key, {}), # type: ignore - **(config.get(key) or {}), # type: ignore - } + if base_value := base.get(key): # type: ignore + base[key] = {**base_value, **value} # type: ignore + else: + base[key] = value elif key == "tags": - base[key] = sorted( # type: ignore - set(base.get(key, []) + (config.get(key) or [])), # type: ignore - ) + if base_value := base.get(key): # type: ignore + base[key] = [*base_value, *value] # type: ignore + else: + base[key] = value elif key == "configurable": - base[key] = { # type: ignore - **base.get(key, {}), # type: ignore - **(config.get(key) or {}), # type: ignore - } + if base_value := base.get(key): # type: ignore + base[key] = {**base_value, **value} # type: ignore + else: + base[key] = value elif key == "callbacks": base_callbacks = base.get("callbacks") - these_callbacks = config["callbacks"] # callbacks can be either None, list[handler] or manager # so merging two callbacks values has 6 cases - if isinstance(these_callbacks, list): + if isinstance(value, list): if base_callbacks is None: - base["callbacks"] = these_callbacks.copy() + base["callbacks"] = value.copy() elif isinstance(base_callbacks, list): - base["callbacks"] = base_callbacks + these_callbacks + base["callbacks"] = base_callbacks + value else: # base_callbacks is a manager mngr = base_callbacks.copy() - for callback in these_callbacks: + for callback in value: mngr.add_handler(callback, inherit=True) base["callbacks"] = mngr - elif these_callbacks is not None: - # these_callbacks is a manager + else: + # value is a manager if base_callbacks is None: - base["callbacks"] = these_callbacks.copy() + base["callbacks"] = value.copy() elif isinstance(base_callbacks, list): - mngr = these_callbacks.copy() + mngr = value.copy() for callback in base_callbacks: mngr.add_handler(callback, inherit=True) base["callbacks"] = mngr else: # base_callbacks is also a manager - base["callbacks"] = base_callbacks.merge(these_callbacks) + base["callbacks"] = base_callbacks.merge(value) elif key == "recursion_limit": if config["recursion_limit"] != DEFAULT_RECURSION_LIMIT: base["recursion_limit"] = config["recursion_limit"] - elif key in COPIABLE_KEYS and config[key] is not None: # type: ignore[literal-required] - base[key] = config[key].copy() # type: ignore[literal-required] else: - base[key] = config[key] or base.get(key) # type: ignore + base[key] = config[key] return base @@ -150,3 +156,122 @@ def patch_config( if configurable is not None: config["configurable"] = {**config.get("configurable", {}), **configurable} return config + + +def get_callback_manager_for_config( + config: RunnableConfig, tags: Optional[Sequence[str]] = None +) -> CallbackManager: + """Get a callback manager for a config. + + Args: + config (RunnableConfig): The config. + + Returns: + CallbackManager: The callback manager. + """ + from langchain_core.callbacks.manager import CallbackManager + + # merge tags + all_tags = config.get("tags") + if all_tags is not None and tags is not None: + all_tags = [*all_tags, *tags] + elif tags is not None: + all_tags = tags + # use existing callbacks if they exist + if (callbacks := config.get("callbacks")) and isinstance( + callbacks, CallbackManager + ): + if all_tags: + callbacks.add_tags(all_tags) + if metadata := config.get("metadata"): + callbacks.add_metadata(metadata) + return callbacks + else: + # otherwise create a new manager + return CallbackManager.configure( + inheritable_callbacks=config.get("callbacks"), + inheritable_tags=all_tags, + inheritable_metadata=config.get("metadata"), + ) + + +def get_async_callback_manager_for_config( + config: RunnableConfig, + tags: Optional[Sequence[str]] = None, +) -> AsyncCallbackManager: + """Get an async callback manager for a config. + + Args: + config (RunnableConfig): The config. + + Returns: + AsyncCallbackManager: The async callback manager. + """ + from langchain_core.callbacks.manager import AsyncCallbackManager + + # merge tags + all_tags = config.get("tags") + if all_tags is not None and tags is not None: + all_tags = [*all_tags, *tags] + elif tags is not None: + all_tags = tags + # use existing callbacks if they exist + if (callbacks := config.get("callbacks")) and isinstance( + callbacks, AsyncCallbackManager + ): + if all_tags: + callbacks.add_tags(all_tags) + if metadata := config.get("metadata"): + callbacks.add_metadata(metadata) + return callbacks + else: + # otherwise create a new manager + return AsyncCallbackManager.configure( + inheritable_callbacks=config.get("callbacks"), + inheritable_tags=config.get("tags"), + inheritable_metadata=config.get("metadata"), + ) + + +def ensure_config(*configs: Optional[RunnableConfig]) -> RunnableConfig: + """Ensure that a config is a dict with all keys present. + + Args: + config (Optional[RunnableConfig], optional): The config to ensure. + Defaults to None. + + Returns: + RunnableConfig: The ensured config. + """ + empty = RunnableConfig( + tags=[], + metadata=ChainMap(), + callbacks=None, + recursion_limit=DEFAULT_RECURSION_LIMIT, + configurable={}, + ) + if var_config := var_child_runnable_config.get(): + empty.update( + { + k: v.copy() if k in COPIABLE_KEYS else v # type: ignore[attr-defined] + for k, v in var_config.items() + if v is not None + }, + ) + for config in configs: + if config is None: + continue + for k, v in config.items(): + if v is not None and k in CONFIG_KEYS: + empty[k] = v + for k, v in config.items(): + if v is not None and k not in CONFIG_KEYS: + empty["configurable"][k] = v + for key, value in empty["configurable"].items(): + if ( + not key.startswith("__") + and isinstance(value, (str, int, float, bool)) + and key not in empty["metadata"] + ): + empty["metadata"][key] = value + return empty diff --git a/libs/langgraph/langgraph/utils/runnable.py b/libs/langgraph/langgraph/utils/runnable.py index c59bd0b18..7230d67bb 100644 --- a/libs/langgraph/langgraph/utils/runnable.py +++ b/libs/langgraph/langgraph/utils/runnable.py @@ -5,7 +5,7 @@ from contextlib import AsyncExitStack from contextvars import copy_context from functools import partial, wraps -from typing import Any, AsyncIterator, Awaitable, Callable, Iterator, Optional +from typing import Any, AsyncIterator, Awaitable, Callable, Iterator, Optional, Sequence from langchain_core.runnables.base import ( Runnable, @@ -16,9 +16,6 @@ RunnableSequence, ) from langchain_core.runnables.config import ( - ensure_config, - get_async_callback_manager_for_config, - get_callback_manager_for_config, run_in_executor, var_child_runnable_config, ) @@ -26,7 +23,12 @@ from langchain_core.tracers._streaming import _StreamingCallbackHandler from typing_extensions import TypeGuard -from langgraph.utils.config import merge_configs, patch_config +from langgraph.utils.config import ( + ensure_config, + get_async_callback_manager_for_config, + get_callback_manager_for_config, + patch_config, +) try: from langchain_core.runnables.config import _set_config_context @@ -54,7 +56,7 @@ def __init__( afunc: Optional[Callable[..., Awaitable[Optional[Runnable]]]] = None, *, name: Optional[str] = None, - tags: Optional[list[str]] = None, + tags: Optional[Sequence[str]] = None, trace: bool = True, recurse: bool = True, **kwargs: Any, @@ -78,7 +80,7 @@ def __init__( self.afunc = afunc if afunc is not None: self.afunc_accepts_config = accepts_config(afunc) - self.config: Optional[RunnableConfig] = {"tags": tags} if tags else None + self.tags = tags self.kwargs = kwargs self.trace = trace self.recurse = recurse @@ -103,11 +105,11 @@ def invoke( kwargs = {**self.kwargs, **kwargs} if self.func_accepts_config: kwargs["config"] = config - config = ensure_config(merge_configs(self.config, config)) + if config is None: + config = ensure_config() context = copy_context() if self.trace: - config = ensure_config(config) - callback_manager = get_callback_manager_for_config(config) + callback_manager = get_callback_manager_for_config(config, self.tags) run_manager = callback_manager.on_chain_start( None, input, @@ -139,10 +141,11 @@ async def ainvoke( kwargs = {**self.kwargs, **kwargs} if self.afunc_accepts_config: kwargs["config"] = config - config = ensure_config(merge_configs(self.config, config)) + if config is None: + config = ensure_config() context = copy_context() if self.trace: - callback_manager = get_async_callback_manager_for_config(config) + callback_manager = get_async_callback_manager_for_config(config, self.tags) run_manager = await callback_manager.on_chain_start( None, input, @@ -318,7 +321,6 @@ def invoke( self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> Output: # setup callbacks and context - config = ensure_config(config) callback_manager = get_callback_manager_for_config(config) # start the root run run_manager = callback_manager.on_chain_start( @@ -356,7 +358,6 @@ async def ainvoke( **kwargs: Optional[Any], ) -> Output: # setup callbacks - config = ensure_config(config) callback_manager = get_async_callback_manager_for_config(config) # start the root run run_manager = await callback_manager.on_chain_start( @@ -398,7 +399,6 @@ def stream( **kwargs: Optional[Any], ) -> Iterator[Output]: # setup callbacks - config = ensure_config(config) callback_manager = get_callback_manager_for_config(config) # start the root run run_manager = callback_manager.on_chain_start( @@ -460,7 +460,6 @@ async def astream( **kwargs: Optional[Any], ) -> AsyncIterator[Output]: # setup callbacks - config = ensure_config(config) callback_manager = get_async_callback_manager_for_config(config) # start the root run run_manager = await callback_manager.on_chain_start(