diff --git a/burr/core/persistence.py b/burr/core/persistence.py index f2af19f2..f2788a2c 100644 --- a/burr/core/persistence.py +++ b/burr/core/persistence.py @@ -352,6 +352,16 @@ def __del__(self): # closes connection at end when things are being shutdown. self.connection.close() + def __getstate__(self): + return {key: value for key, value in self.__dict__.items() if key != "connection"} + + def __setstate__(self, state): + for key, value in state.items(): + setattr(self, key, value) + self.connection = sqlite3.connect( + self.db_path, **self._connect_kwargs if self._connect_kwargs is not None else {} + ) + class InMemoryPersister(BaseStatePersister): """In-memory persister for testing purposes. This is not recommended for production use.""" diff --git a/burr/integrations/ray.py b/burr/integrations/ray.py new file mode 100644 index 00000000..d5201a71 --- /dev/null +++ b/burr/integrations/ray.py @@ -0,0 +1,38 @@ +import concurrent.futures + +import ray + + +class RayExecutor(concurrent.futures.Executor): + """Ray parallel executor -- implementation of concurrent.futures.Executor. + Currently experimental""" + + def __init__(self, shutdown_on_end: bool = False): + """Creates a Ray executor -- remember to call ray.init() before running anything!""" + self.shutdown_on_end = shutdown_on_end + + def submit(self, fn, *args, **kwargs): + """Submits to ray -- creates a python future by calling ray.remote + + :param fn: Function to submit + :param args: Args for the fn + :param kwargs: Kwargs for the fn + :return: The future for the fn + """ + if not ray.is_initialized(): + raise RuntimeError("Ray is not initialized. Call ray.init() before running anything!") + ray_fn = ray.remote(fn) + object_ref = ray_fn.remote(*args, **kwargs) + future = object_ref.future() + + return future + + def shutdown(self, wait=True, **kwargs): + """Shuts down the executor by shutting down ray + + :param wait: Whether to wait -- required for hte API but not respected (yet) + :param kwargs: Keyword arguments -- not used yet + """ + if self.shutdown_on_end: + if ray.is_initialized(): + ray.shutdown() diff --git a/burr/tracking/client.py b/burr/tracking/client.py index aab7c9c0..fbe7beda 100644 --- a/burr/tracking/client.py +++ b/burr/tracking/client.py @@ -298,7 +298,7 @@ def load_state( sequence_id: int = -1, storage_dir: str = DEFAULT_STORAGE_DIR, ) -> tuple[dict, str]: - """THis is deprecated and will be removed when we migrate over demos. Do not use! Instead use + """This is deprecated and will be removed when we migrate over demos. Do not use! Instead use the persistence API :py:class:`initialize_from ` to load state. @@ -360,6 +360,18 @@ def _ensure_dir_structure(self, app_id: str): logger.info(f"Creating application directory: {application_path}") os.makedirs(application_path) + def __setstate__(self, state): + self.__dict__.update(state) + + def __getstate__(self): + out = { + key: value for key, value in self.__dict__.items() if key != "f" + } # the file we don't want to serialize + # Note that this will only work if we also call post_application_create + # For now that's OK as that's the only reason we'll add it -- if we want more distribution later we'll have to serialize the file + out["f"] = None + return out + def post_application_create( self, *, @@ -378,6 +390,7 @@ def post_application_create( encoding="utf-8", errors="replace", ) + graph_path = os.path.join(self.storage_dir, app_id, self.GRAPH_FILENAME) if os.path.exists(graph_path): logger.info(f"Graph already exists at {graph_path}. Not overwriting.") diff --git a/docs/reference/integrations/index.rst b/docs/reference/integrations/index.rst index 08ae18af..685da57a 100644 --- a/docs/reference/integrations/index.rst +++ b/docs/reference/integrations/index.rst @@ -14,3 +14,4 @@ Integrations -- we will be adding more langchain pydantic haystack + ray diff --git a/docs/reference/integrations/ray.rst b/docs/reference/integrations/ray.rst new file mode 100644 index 00000000..c781442b --- /dev/null +++ b/docs/reference/integrations/ray.rst @@ -0,0 +1,8 @@ +=== +Ray +=== + +The Burr Ray integration allows you to run :ref:`parallel sub-applications ` on `Ray `_. + +.. autoclass:: burr.integrations.ray.RayExecutor + :members: diff --git a/docs/reference/persister.rst b/docs/reference/persister.rst index bd0e7f3d..06f4dde8 100644 --- a/docs/reference/persister.rst +++ b/docs/reference/persister.rst @@ -37,7 +37,7 @@ Supported Implementations Currently we support the following, although we highly recommend you contribute your own! We will be adding more shortly. -.. autoclass:: burr.core.persistence.SQLLitePersister +.. autoclass:: burr.core.persistence.SQLitePersister :members: .. automethod:: __init__ diff --git a/examples/ray/README.md b/examples/ray/README.md new file mode 100644 index 00000000..f5efd3fd --- /dev/null +++ b/examples/ray/README.md @@ -0,0 +1,7 @@ +# Parallelism on Burr + +This is supporting code for two blog posts: +1. [Parallel Multi Agent Workflows with Burr](https://blog.dagworks.io/p/93838d1f-52b5-4a72-999f-9cab9733d4fe) +2. [Parallel, Fault-Tolerant Agents with Burr/Ray](https://blog.dagworks.io/p/5baf1077-2490-44bc-afff-fcdafe18e819) + +You can find basic code in [application.py](application.py) and run it in [notebook.ipynb](notebook.ipynb). Read the blog posts to get a sense for the motivation/design behind this. diff --git a/examples/ray/__init__.py b/examples/ray/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/ray/application.py b/examples/ray/application.py new file mode 100644 index 00000000..e24b75aa --- /dev/null +++ b/examples/ray/application.py @@ -0,0 +1,206 @@ +from typing import Any, Dict, List, Optional, Tuple + +import openai +import ray + +from burr.common.async_utils import SyncOrAsyncGenerator +from burr.core import Application, ApplicationBuilder, Condition, GraphBuilder, State, action +from burr.core.application import ApplicationContext +from burr.core.parallelism import MapStates, RunnableGraph, SubgraphType +from burr.core.persistence import SQLitePersister +from burr.integrations.ray import RayExecutor + + +# full agent +def _query_llm(prompt: str) -> str: + """Simple wrapper around the OpenAI API.""" + client = openai.Client() + return ( + client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "system", "content": "You are a helpful assistant"}, + {"role": "user", "content": prompt}, + ], + ) + .choices[0] + .message.content + ) + + +@action( + reads=["feedback", "current_draft", "poem_type", "poem_subject"], + writes=["current_draft", "draft_history", "num_drafts"], +) +def write(state: State) -> Tuple[dict, State]: + """Writes a draft of a poem.""" + poem_subject = state["poem_subject"] + poem_type = state["poem_type"] + current_draft = state.get("current_draft") + feedback = state.get("feedback") + + parts = [ + f'You are an AI poet. Create a {poem_type} poem on the following subject: "{poem_subject}". ' + "It is absolutely imperative that you respond with only the poem and no other text." + ] + + if current_draft: + parts.append(f'Here is the current draft of the poem: "{current_draft}".') + + if feedback: + parts.append(f'Please incorporate the following feedback: "{feedback}".') + + parts.append( + f"Ensure the poem is creative, adheres to the style of a {poem_type}, and improves upon the previous draft." + ) + + prompt = "\n".join(parts) + + draft = _query_llm(prompt) + + return {"draft": draft}, state.update( + current_draft=draft, + draft_history=state.get("draft_history", []) + [draft], + ).increment(num_drafts=1) + + +@action(reads=["current_draft", "poem_type", "poem_subject"], writes=["feedback"]) +def edit(state: State) -> Tuple[dict, State]: + """Edits a draft of a poem, providing feedback""" + poem_subject = state["poem_subject"] + poem_type = state["poem_type"] + current_draft = state["current_draft"] + + prompt = f""" + You are an AI poetry critic. Review the following {poem_type} poem based on the subject: "{poem_subject}". + Here is the current draft of the poem: "{current_draft}". + Provide detailed feedback to improve the poem. If the poem is already excellent and needs no changes, simply respond with an empty string. + """ + feedback = _query_llm(prompt) + + return {"feedback": feedback}, state.update(feedback=feedback) + + +@action(reads=["current_draft"], writes=["final_draft"]) +def final_draft(state: State) -> Tuple[dict, State]: + return {"final_draft": state["current_draft"]}, state.update(final_draft=state["current_draft"]) + + +# full agent +@action( + reads=[], + writes=[ + "max_drafts", + "poem_types", + "poem_subject", + ], +) +def user_input(state: State, max_drafts: int, poem_types: List[str], poem_subject: str) -> State: + """Collects user input for the poem generation process.""" + return state.update(max_drafts=max_drafts, poem_types=poem_types, poem_subject=poem_subject) + + +class GenerateAllPoems(MapStates): + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> SyncOrAsyncGenerator[State]: + for poem_type in state["poem_types"]: + yield state.update(current_draft=None, poem_type=poem_type, feedback=[], num_drafts=0) + + def action(self, state: State, inputs: Dict[str, Any]) -> SubgraphType: + graph = ( + GraphBuilder() + .with_actions( + edit, + write, + final_draft, + ) + .with_transitions( + ("write", "edit", Condition.expr(f"num_drafts < {state['max_drafts']}")), + ("write", "final_draft"), + ("edit", "final_draft", Condition.expr("len(feedback) == 0")), + ("edit", "write"), + ) + ).build() + return RunnableGraph(graph=graph, entrypoint="write", halt_after=["final_draft"]) + + def reduce(self, state: State, results: SyncOrAsyncGenerator[State]) -> State: + proposals = [] + for output_state in results: + proposals.append(output_state["final_draft"]) + return state.append(proposals=proposals) + + @property + def writes(self) -> list[str]: + return ["proposals"] + + @property + def reads(self) -> list[str]: + return ["poem_types", "poem_subject", "max_drafts"] + + +@action(reads=["proposals", "poem_types"], writes=["final_results"]) +def final_results(state: State) -> Tuple[dict, State]: + # joins them into a string + proposals = state["proposals"] + final_results = "\n\n".join( + [f"{poem_type}:\n{proposal}" for poem_type, proposal in zip(state["poem_types"], proposals)] + ) + return {"final_results": final_results}, state.update(final_results=final_results) + + +def application_multithreaded() -> Application: + app = ( + ApplicationBuilder() + .with_actions(user_input, final_results, generate_all_poems=GenerateAllPoems()) + .with_transitions( + ("user_input", "generate_all_poems"), + ("generate_all_poems", "final_results"), + ) + .with_tracker(project="demo:parallel_agents") + .with_entrypoint("user_input") + .build() + ) + return app + + +def application(app_id: Optional[str] = None) -> Application: + persister = SQLitePersister(db_path="./db") + persister.initialize() + app = ( + ApplicationBuilder() + .with_actions(user_input, final_results, generate_all_poems=GenerateAllPoems()) + .with_transitions( + ("user_input", "generate_all_poems"), + ("generate_all_poems", "final_results"), + ) + .with_tracker(project="demo:parallel_agents_fault_tolerance") + .with_parallel_executor(RayExecutor) + .with_state_persister(persister) + .initialize_from( + persister, resume_at_next_action=True, default_state={}, default_entrypoint="user_input" + ) + .with_identifiers(app_id=app_id) + .build() + ) + return app + + +if __name__ == "__main__": + ray.init() + app = application() + app_id = app.uid + act, _, state = app.run( + halt_after=["final_results"], + inputs={ + "max_drafts": 2, + "poem_types": [ + "sonnet", + "limerick", + "haiku", + "acrostic", + ], + "poem_subject": "state machines", + }, + ) + print(state) diff --git a/examples/ray/notebook.ipynb b/examples/ray/notebook.ipynb new file mode 100644 index 00000000..e3f94ce5 --- /dev/null +++ b/examples/ray/notebook.ipynb @@ -0,0 +1,482 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a24201eb-2541-4ef1-bed2-14f34fe37ae7", + "metadata": {}, + "source": [ + "# Distributed/Fault Tolerant Agents with Burr" + ] + }, + { + "cell_type": "markdown", + "id": "2055510a-1b2a-4deb-8896-0f65e6a773dd", + "metadata": {}, + "source": [ + "This is supporting code for two blog posts:\n", + "1. [Parallel Multi Agent Workflows with Burr](https://blog.dagworks.io/p/93838d1f-52b5-4a72-999f-9cab9733d4fe)\n", + "2. [Parallel, Fault-Tolerant Agents with Burr/Ray](https://blog.dagworks.io/p/5baf1077-2490-44bc-afff-fcdafe18e819)\n", + "\n", + "This runs the application using a local ray instance." + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "37a24484-1c2c-4e63-b45f-bb35a7c7151e", + "metadata": {}, + "outputs": [], + "source": [ + "import ray\n", + "import pprint\n", + "\n", + "import application as parallel_application\n", + "from burr.core import State" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "62ed5b17-9795-4426-85c1-a3384bb7d25f", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Requirement already satisfied: burr in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (0.32.0)\n", + "Requirement already satisfied: openai in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (1.47.1)\n", + "Requirement already satisfied: ray in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (2.40.0)\n", + "Requirement already satisfied: anyio<5,>=3.5.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from openai) (4.4.0)\n", + "Requirement already satisfied: distro<2,>=1.7.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from openai) (1.9.0)\n", + "Requirement already satisfied: httpx<1,>=0.23.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from openai) (0.27.0)\n", + "Requirement already satisfied: jiter<1,>=0.4.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from openai) (0.5.0)\n", + "Requirement already satisfied: pydantic<3,>=1.9.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from openai) (2.8.2)\n", + "Requirement already satisfied: sniffio in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from openai) (1.3.1)\n", + "Requirement already satisfied: tqdm>4 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from openai) (4.66.4)\n", + "Requirement already satisfied: typing-extensions<5,>=4.11 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from openai) (4.12.2)\n", + "Requirement already satisfied: click>=7.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (8.1.7)\n", + "Requirement already satisfied: filelock in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (3.15.4)\n", + "Requirement already satisfied: jsonschema in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (4.23.0)\n", + "Requirement already satisfied: msgpack<2.0.0,>=1.0.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (1.0.8)\n", + "Requirement already satisfied: packaging in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (24.1)\n", + "Requirement already satisfied: protobuf!=3.19.5,>=3.15.3 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (4.25.4)\n", + "Requirement already satisfied: pyyaml in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (6.0.1)\n", + "Requirement already satisfied: aiosignal in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (1.3.1)\n", + "Requirement already satisfied: frozenlist in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (1.4.1)\n", + "Requirement already satisfied: requests in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from ray) (2.32.3)\n", + "Requirement already satisfied: idna>=2.8 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from anyio<5,>=3.5.0->openai) (3.7)\n", + "Requirement already satisfied: certifi in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from httpx<1,>=0.23.0->openai) (2024.7.4)\n", + "Requirement already satisfied: httpcore==1.* in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from httpx<1,>=0.23.0->openai) (1.0.5)\n", + "Requirement already satisfied: h11<0.15,>=0.13 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from httpcore==1.*->httpx<1,>=0.23.0->openai) (0.14.0)\n", + "Requirement already satisfied: annotated-types>=0.4.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from pydantic<3,>=1.9.0->openai) (0.7.0)\n", + "Requirement already satisfied: pydantic-core==2.20.1 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from pydantic<3,>=1.9.0->openai) (2.20.1)\n", + "Requirement already satisfied: attrs>=22.2.0 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from jsonschema->ray) (23.2.0)\n", + "Requirement already satisfied: jsonschema-specifications>=2023.03.6 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from jsonschema->ray) (2023.12.1)\n", + "Requirement already satisfied: referencing>=0.28.4 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from jsonschema->ray) (0.35.1)\n", + "Requirement already satisfied: rpds-py>=0.7.1 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from jsonschema->ray) (0.19.1)\n", + "Requirement already satisfied: charset-normalizer<4,>=2 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from requests->ray) (3.3.2)\n", + "Requirement already satisfied: urllib3<3,>=1.21.1 in /Users/elijahbenizzy/.pyenv/versions/3.12.0/envs/burr-3-12/lib/python3.12/site-packages (from requests->ray) (2.2.2)\n", + "\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.2.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m24.3.1\u001b[0m\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpython3.12 -m pip install --upgrade pip\u001b[0m\n", + "Note: you may need to restart the kernel to use updated packages.\n", + "The burr.integrations.notebook extension is already loaded. To reload it, use:\n", + " %reload_ext burr.integrations.notebook\n" + ] + } + ], + "source": [ + "# execute to load the Burr and Hamilton extensions\n", + "%pip install burr openai ray\n", + "%load_ext burr.integrations.notebook" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "eb838d9c-ff30-4bc2-82b6-79b571c5b11c", + "metadata": {}, + "outputs": [], + "source": [ + "app = parallel_application.application()" + ] + }, + { + "cell_type": "markdown", + "id": "d693522a-fd47-4019-8935-6a38fcf46ebf", + "metadata": {}, + "source": [ + "# Application Graph\n", + "\n", + "This is the flowchart of the application. Note that `generate_all_poems` is actually a recursive set of sub-applications." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "3b6335ea-8f1d-48fb-b74a-4fea033b360f", + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "user_input\n", + "\n", + "user_input\n", + "\n", + "\n", + "\n", + "generate_all_poems\n", + "\n", + "generate_all_poems\n", + "\n", + "\n", + "\n", + "user_input->generate_all_poems\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__poem_subject\n", + "\n", + "input: poem_subject\n", + "\n", + "\n", + "\n", + "input__poem_subject->user_input\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__max_drafts\n", + "\n", + "input: max_drafts\n", + "\n", + "\n", + "\n", + "input__max_drafts->user_input\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__poem_types\n", + "\n", + "input: poem_types\n", + "\n", + "\n", + "\n", + "input__poem_types->user_input\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_results\n", + "\n", + "final_results\n", + "\n", + "\n", + "\n", + "generate_all_poems->final_results\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "ApplicationGraph(actions=[user_input: {} -> max_drafts, poem_types, poem_subject, final_results: proposals, poem_types -> final_results, generate_all_poems: poem_types, poem_subject, max_drafts -> proposals], transitions=[Transition(from_=user_input: {} -> max_drafts, poem_types, poem_subject, to=generate_all_poems: poem_types, poem_subject, max_drafts -> proposals, condition=condition: default), Transition(from_=generate_all_poems: poem_types, poem_subject, max_drafts -> proposals, to=final_results: proposals, poem_types -> final_results, condition=condition: default)], entrypoint=user_input: {} -> max_drafts, poem_types, poem_subject)" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "app.graph" + ] + }, + { + "cell_type": "markdown", + "id": "81209a79-aa12-42ba-bfe0-468be5d2cacf", + "metadata": {}, + "source": [ + "# Sub-application Graph\n", + "\n", + "This represents the sub-application inside `generate_all_poems` -- we generate one of these for each poem type specified by the user (E.G. limerick, haiku, sonnet, etc...)." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "58bd3d4d-70b8-42bc-b520-6d5bb95e8575", + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "edit\n", + "\n", + "edit\n", + "\n", + "\n", + "\n", + "write\n", + "\n", + "write\n", + "\n", + "\n", + "\n", + "edit->write\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "final_draft\n", + "\n", + "final_draft\n", + "\n", + "\n", + "\n", + "edit->final_draft\n", + "\n", + "\n", + "len(feedback) == 0\n", + "\n", + "\n", + "\n", + "write->edit\n", + "\n", + "\n", + "num_drafts < 2\n", + "\n", + "\n", + "\n", + "write->final_draft\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "Graph(actions=[edit: current_draft, poem_type, poem_subject -> feedback, write: feedback, current_draft, poem_type, poem_subject -> current_draft, draft_history, num_drafts, final_draft: current_draft -> final_draft], transitions=[Transition(from_=write: feedback, current_draft, poem_type, poem_subject -> current_draft, draft_history, num_drafts, to=edit: current_draft, poem_type, poem_subject -> feedback, condition=condition: num_drafts < 2), Transition(from_=write: feedback, current_draft, poem_type, poem_subject -> current_draft, draft_history, num_drafts, to=final_draft: current_draft -> final_draft, condition=condition: default), Transition(from_=edit: current_draft, poem_type, poem_subject -> feedback, to=final_draft: current_draft -> final_draft, condition=condition: len(feedback) == 0), Transition(from_=edit: current_draft, poem_type, poem_subject -> feedback, to=write: feedback, current_draft, poem_type, poem_subject -> current_draft, draft_history, num_drafts, condition=condition: default)])" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Do not access the sub-application this way -- just for the demo!\n", + "app.graph.get_action(\"generate_all_poems\").action(state=State({\"max_drafts\" : 2}), inputs={}).graph" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "60988516-1d20-40e6-a484-214f3ee86383", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# execute cell to launch the UI\n", + "%burr_ui" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "id": "3573dad5-bb92-47c3-9496-230ab6fd9354", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-12-22 20:40:19,330\tINFO worker.py:1654 -- Calling ray.init() again after it has already been called.\n", + "This is trying to return without having computed a single action -- we'll end up just returning some Nones. This means that nothing was executed (E.G. that the state machine had nowhere to go). Either fix the state machine orthe halt conditions, or both... Halt conditions are: halt_before=[], halt_after=['final_results'].Note that this is considered undefined behavior -- if you get here, you should fix!\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'__PRIOR_STEP': 'final_results',\n", + " '__SEQUENCE_ID': 3,\n", + " 'final_results': 'sonnet:\\n'\n", + " \"['In myriad states, a cyclic dance unfolds, \\\\nUpon the \"\n", + " 'stage of logic, firm and clear. \\\\nAn automaton with '\n", + " 'structured bounds it holds, \\\\nDesigned by minds, both '\n", + " 'intricate and near. \\\\n\\\\nWith each transition, patterns '\n", + " 'trace the flow, \\\\nA pulse of truth propels the forward '\n", + " 'sweep. \\\\nConditions mark the paths, both high and low, '\n", + " '\\\\nAs inputs carve formations, wide and deep. '\n", + " '\\\\n\\\\nDeterministic threads, like textiles spun, \\\\nWeave '\n", + " 'tapestries of boundless, coded choice. \\\\nA binary ballet '\n", + " 'that shies from none, \\\\nYet whispers softly with a silent '\n", + " 'voice. \\\\n\\\\nIn finite bounds, infinite patterns weave, '\n", + " '\\\\nOrder from chaos, logic born to leave. \\', \"In a realm '\n", + " 'where nodes intertwine, \\\\nDwells the dream of a state '\n", + " \"machine's line. \\\\nWith transitions so neat, \\\\nA puzzle \"\n", + " 'complete, \\\\nIt dances through states, a design.\", \"Beads '\n", + " 'of states align, \\\\nSilent rhythms weave their dance— '\n", + " '\\\\nLogic\\'s steady hand.\", \\'Silent gears in whispered '\n", + " 'dance engage, \\\\nTransitions weave through every stage. '\n", + " '\\\\nAbstract notions structured with flair, \\\\nTapestries '\n", + " 'of circuits, precise and rare. \\\\nEach decision flows with '\n", + " 'calculated grace, \\\\n\\\\nMachines hum softly, hidden in '\n", + " 'embrace. \\\\nAs states shift, each holds its refrain, '\n", + " '\\\\nChoreographed steps in methodical chain. \\\\nHarmony '\n", + " 'thrives in digital cadence set, \\\\nIn vivid lines of '\n", + " 'binary duet. \\\\nNodes in purpose, connections aligned, '\n", + " '\\\\nEngines of logic map the mind. \\\\nSignals transition, '\n", + " \"clear and free.']\",\n", + " 'max_drafts': 2,\n", + " 'poem_subject': 'state machines',\n", + " 'poem_types': ['sonnet', 'limerick', 'haiku', 'acrostic'],\n", + " 'proposals': [['In myriad states, a cyclic dance unfolds, \\n'\n", + " 'Upon the stage of logic, firm and clear. \\n'\n", + " 'An automaton with structured bounds it holds, \\n'\n", + " 'Designed by minds, both intricate and near. \\n'\n", + " '\\n'\n", + " 'With each transition, patterns trace the flow, \\n'\n", + " 'A pulse of truth propels the forward sweep. \\n'\n", + " 'Conditions mark the paths, both high and low, \\n'\n", + " 'As inputs carve formations, wide and deep. \\n'\n", + " '\\n'\n", + " 'Deterministic threads, like textiles spun, \\n'\n", + " 'Weave tapestries of boundless, coded choice. \\n'\n", + " 'A binary ballet that shies from none, \\n'\n", + " 'Yet whispers softly with a silent voice. \\n'\n", + " '\\n'\n", + " 'In finite bounds, infinite patterns weave, \\n'\n", + " 'Order from chaos, logic born to leave. ',\n", + " 'In a realm where nodes intertwine, \\n'\n", + " \"Dwells the dream of a state machine's line. \\n\"\n", + " 'With transitions so neat, \\n'\n", + " 'A puzzle complete, \\n'\n", + " 'It dances through states, a design.',\n", + " 'Beads of states align, \\n'\n", + " 'Silent rhythms weave their dance— \\n'\n", + " \"Logic's steady hand.\",\n", + " 'Silent gears in whispered dance engage, \\n'\n", + " 'Transitions weave through every stage. \\n'\n", + " 'Abstract notions structured with flair, \\n'\n", + " 'Tapestries of circuits, precise and rare. \\n'\n", + " 'Each decision flows with calculated grace, \\n'\n", + " '\\n'\n", + " 'Machines hum softly, hidden in embrace. \\n'\n", + " 'As states shift, each holds its refrain, \\n'\n", + " 'Choreographed steps in methodical chain. \\n'\n", + " 'Harmony thrives in digital cadence set, \\n'\n", + " 'In vivid lines of binary duet. \\n'\n", + " 'Nodes in purpose, connections aligned, \\n'\n", + " 'Engines of logic map the mind. \\n'\n", + " 'Signals transition, clear and free.']]}\n" + ] + } + ], + "source": [ + "ray.init(ignore_reinit_error=True)\n", + "act, _, state = app.run(\n", + " halt_after=[\"final_results\"],\n", + " inputs={\n", + " \"max_drafts\": 2,\n", + " \"poem_types\": [\n", + " \"sonnet\",\n", + " \"limerick\",\n", + " \"haiku\",\n", + " \"acrostic\",\n", + " ],\n", + " \"poem_subject\": \"state machines\",\n", + " },\n", + ")\n", + "pprint.pprint(state.get_all())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c0bf20f4-63b6-4fc1-a7f0-bdbae62f6635", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/ray/statemachine.png b/examples/ray/statemachine.png new file mode 100644 index 00000000..572bb706 Binary files /dev/null and b/examples/ray/statemachine.png differ diff --git a/examples/ray/substatemachine.png b/examples/ray/substatemachine.png new file mode 100644 index 00000000..ad6d806d Binary files /dev/null and b/examples/ray/substatemachine.png differ diff --git a/pyproject.toml b/pyproject.toml index 37cdd79d..f98906c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "burr" -version = "0.35.1" +version = "0.36.0" dependencies = [] # yes, there are none requires-python = ">=3.9" authors = [ @@ -62,7 +62,8 @@ tests = [ "pyarrow", "redis", "burr[opentelemetry]", - "burr[haystack]" + "burr[haystack]", + "burr[ray]" ] documentation = [ @@ -75,6 +76,7 @@ documentation = [ "sphinx-toolbox", "psycopg2-binary", "redis", + "ray", "sphinxcontrib-googleanalytics" ] @@ -168,6 +170,10 @@ opentelemetry = [ "opentelemetry-api", "opentelemetry-sdk", ] + +ray = [ + "ray" +] [tool.setuptools] include-package-data = true diff --git a/telemetry/.DS_Store b/telemetry/.DS_Store deleted file mode 100644 index 27f2d7ad..00000000 Binary files a/telemetry/.DS_Store and /dev/null differ diff --git a/telemetry/ui/src/components/routes/app/StepList.tsx b/telemetry/ui/src/components/routes/app/StepList.tsx index c5276111..ddae49ac 100644 --- a/telemetry/ui/src/components/routes/app/StepList.tsx +++ b/telemetry/ui/src/components/routes/app/StepList.tsx @@ -279,9 +279,10 @@ const ActionTableRow = (props: { + {props.minimized && } {!props.minimized && ( <> - + {props.displayLinksCol && ( - + {childCount > 0 ? (
{ navigate( `/project/${props.projectId}/${subApp.child.partition_key || 'null'}/${subApp.child.app_id}` @@ -526,7 +527,7 @@ const LinkSubTable = (props: { {subApp.child.app_id}
- + - + {!props.minimized && ( )} - - + {props.displayAnnotations && } ) : ( @@ -1478,7 +1478,9 @@ export const StepList = (props: { latestTimeSeen={props.latestTimeSeen} setTraceExpanded={setTraceExpanded} projectId={props.projectId} - pauseLocation={beforePause ? 'bottom' : undefined} + pauseLocation={ + beforePause ? (props.topToBottomChronological ? 'top' : 'bottom') : undefined + } pauseTime={step.pauseAfterLastStepMillis} topToBottomChronological={props.topToBottomChronological} displayAnnotations={props.displayAnnotations} @@ -1827,7 +1829,7 @@ const ParentLink = (props: { {!props.minimized && ( <> - + {/* › */} State: + # This assert ensures we only visit once per app, globally + # Thus if we're restarting this will break + return state.update(original_n=state["n"]) + + @action(reads=["n"], writes=["n", "n_history"]) + def even(state: State) -> State: + return state.update(n=state["n"] // 2).append(n_history=state["n"]) + + @action(reads=["n"], writes=["n", "n_history"]) + def odd(state: State) -> State: + return state.update(n=3 * state["n"] + 1).append(n_history=state["n"]) + + collatz_graph = ( + GraphBuilder() + .with_actions( + initial, + even, + odd, + result=Result("n_history"), + ) + .with_transitions( + (["initial", "even"], "result", expr("n == 1")), + (["initial", "even", "odd"], "even", expr("n % 2 == 0")), + (["initial", "even", "odd"], "odd", expr("n % 2 != 0")), + ) + .build() + ) + + @action(reads=[], writes=["ns"]) + def map_step(state: State, min_number: int = MIN_NUMBER, max_number: int = MAX_NUMBER) -> State: + return state.update(ns=list(range(min_number, max_number))) + + class ParallelCollatz(MapStates): + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + for item in state["ns"]: + yield state.update(n=item) + + def action(self, state: State, inputs: Dict[str, Any]) -> SubgraphType: + return RunnableGraph( + collatz_graph, + entrypoint="initial", + halt_after=["result"], + ) + + def reduce(self, state: State, results: Generator[State, None, None]) -> State: + new_state = state + count_mapping = {} + for result in results: + count_mapping[result["original_n"]] = len(result["n_history"]) + return new_state.update(counts=count_mapping) + + @property + def writes(self) -> list[str]: + return ["counts"] + + @property + def reads(self) -> list[str]: + return ["ns"] + + app_id = f"collatz_test_{str(uuid.uuid4())}" + containing_application = ( + ApplicationBuilder() + .with_actions( + map_step, + parallel_collatz=ParallelCollatz(), + final=Result("counts"), + ) + .with_transitions( + ("map_step", "parallel_collatz"), + ("parallel_collatz", "final"), + ) + .with_identifiers(app_id=app_id) + .with_parallel_executor(RayExecutor) + .with_entrypoint("map_step") + .build() + ) + *_, final_state = containing_application.run(halt_after=["final"]) + assert len(final_state["counts"]) == MAX_NUMBER - MIN_NUMBER