Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] server launcher + ui #23

Merged
merged 20 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions agentfile/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from agentfile.control_plane import FastAPIControlPlane
from agentfile.launchers import LocalLauncher
from agentfile.control_plane import ControlPlaneServer
from agentfile.launchers import LocalLauncher, ServerLauncher
from agentfile.message_queues import SimpleMessageQueue
from agentfile.orchestrators import (
AgentOrchestrator,
Expand All @@ -19,8 +19,9 @@
"SimpleMessageQueue",
# launchers
"LocalLauncher",
"ServerLauncher",
# control planes
"FastAPIControlPlane",
"ControlPlaneServer",
# orchestrators
"AgentOrchestrator",
"PipelineOrchestrator",
Expand Down
Empty file added agentfile/app/__init__.py
Empty file.
207 changes: 207 additions & 0 deletions agentfile/app/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import httpx
import logging
import pprint
from typing import Any, Optional

from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.reactive import reactive
from textual.widgets import Button, Header, Footer, Static, Input

from agentfile.app.components.human_list import HumanTaskList
from agentfile.app.components.service_list import ServicesList
from agentfile.app.components.task_list import TasksList
from agentfile.app.components.types import ButtonType
from agentfile.types import TaskDefinition


class LlamaAgentsMonitor(App):
CSS = """
Screen {
layout: grid;
grid-size: 2;
grid-columns: 1fr 2fr;
padding: 0;
}

#left-panel {
width: 100%;
height: 100%;
}

#right-panel {
width: 100%;
height: 100%;
}

.section {
background: $panel;
padding: 1;
margin-bottom: 0;
}

#tasks {
height: auto;
max-height: 50%;
}

#services {
height: auto;
max-height: 50%;
}

VerticalScroll {
height: auto;
max-height: 100%;
border: solid $primary;
margin-bottom: 1;
}

#right-panel VerticalScroll {
max-height: 100%;
}

Button {
width: 100%;
margin-bottom: 1;
}

#details {
background: $boost;
padding: 1;
text-align: left;
}

#new-task {
dock: bottom;
margin-bottom: 1;
width: 100%;
}
"""

details = reactive("")
selected_service_type = reactive("")
selected_service_url = reactive("")

def __init__(self, control_plane_url: str, **kwargs: Any):
self.control_plane_url = control_plane_url
super().__init__(**kwargs)

def compose(self) -> ComposeResult:
yield Header()
with Static(id="left-panel"):
yield ServicesList(id="services", control_plane_url=self.control_plane_url)
yield TasksList(id="tasks", control_plane_url=self.control_plane_url)
with VerticalScroll(id="right-panel"):
yield Static("Task or service details", id="details")
yield Input(placeholder="Enter: New task", id="new-task")
yield Footer()

async def on_mount(self) -> None:
self.set_interval(5, self.refresh_details)

async def watch_details(self, new_details: str) -> None:
if not new_details:
return

selected_type = ButtonType(new_details.split(":")[0].strip())

if selected_type == ButtonType.SERVICE:
self.query_one("#details").update(new_details)
elif selected_type == ButtonType.TASK:
self.query_one("#details").update(new_details)

async def watch_selected_service_type(self, new_service_type: str) -> None:
if not new_service_type:
return

if new_service_type == "human_service":
await self.query_one("#right-panel").mount(
HumanTaskList(self.selected_service_url), after=0
)
else:
try:
await self.query_one(HumanTaskList).remove()
except Exception:
# not mounted yet
pass

async def refresh_details(
self,
button_type: Optional[ButtonType] = None,
selected_label: Optional[str] = None,
) -> None:
if not self.details and button_type is None and selected_label is None:
return

selected_type = button_type or ButtonType(self.details.split(":")[0].strip())
selected_label = (
selected_label or self.details.split(":")[1].split("\n")[0].strip()
)

if selected_type == ButtonType.SERVICE:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.control_plane_url}/services/{selected_label}"
)
service_def = response.json()

service_dict = service_def
service_url = ""
if service_def.get("host") and service_def.get("port"):
service_url = f"http://{service_def['host']}:{service_def['port']}"
response = await client.get(f"{service_url}/")
service_dict = response.json()

# format the service details nicely
service_string = pprint.pformat(service_dict)

self.details = (
f"{selected_type.value}: {selected_label}\n\n{service_string}"
)

self.selected_service_url = service_url
self.selected_service_type = service_dict.get("type")
elif selected_type == ButtonType.TASK:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.control_plane_url}/tasks/{selected_label}"
)
task_dict = response.json()

# flatten the TaskResult object
if task_dict["state"].get("result"):
task_dict["state"]["result"] = task_dict["state"]["result"]["result"]

# format the task details nicely
task_string = pprint.pformat(task_dict)

self.details = f"{selected_type.value}: {selected_label}\n\n{task_string}"
self.selected_service_type = ""
self.selected_service_url = ""

async def on_button_pressed(self, event: Button.Pressed) -> None:
# Update the details panel with the selected item
await self.refresh_details(
button_type=event.button.type, selected_label=event.button.label
)

async def on_input_submitted(self, event: Input.Submitted) -> None:
new_task = TaskDefinition(input=event.value).model_dump()
async with httpx.AsyncClient() as client:
await client.post(f"{self.control_plane_url}/tasks", json=new_task)

# clear the input
self.query_one("#new-task").value = ""


def run(control_plane_url: str = "http://127.0.0.1:8000") -> None:
# remove info logging for httpx
logging.getLogger("httpx").setLevel(logging.WARNING)

app = LlamaAgentsMonitor(control_plane_url=control_plane_url)
app.run()


if __name__ == "__main__":
run()
Empty file.
96 changes: 96 additions & 0 deletions agentfile/app/components/human_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import httpx
from typing import Any, List

from textual.app import ComposeResult
from textual.containers import VerticalScroll, Container
from textual.reactive import reactive
from textual.widgets import Button, Static, Input

from agentfile.app.components.types import ButtonType
from agentfile.types import HumanResponse, TaskDefinition


class HumanTaskButton(Button):
type: ButtonType = ButtonType.HUMAN
task_id: str = ""


class HumanTaskList(Static):
tasks: List[TaskDefinition] = reactive([])
selected_task: str = reactive("")

def __init__(self, human_service_url: str, **kwargs: Any):
self.human_service_url = human_service_url
super().__init__(**kwargs)

def compose(self) -> ComposeResult:
with VerticalScroll(id="human-tasks-scroll"):
for task in self.tasks:
button = HumanTaskButton(task.input)
button.task_id = task.task_id
yield button

async def on_mount(self) -> None:
self.set_interval(2, self.refresh_tasks)

async def refresh_tasks(self) -> None:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.human_service_url}/tasks")
tasks = response.json()

new_tasks = []
for task in tasks:
new_tasks.append(TaskDefinition(**task))

self.tasks = [*new_tasks]

async def watch_tasks(self, new_tasks: List[TaskDefinition]) -> None:
try:
tasks_scroll = self.query_one("#human-tasks-scroll")
await tasks_scroll.remove_children()
for task in new_tasks:
button = HumanTaskButton(task.input)
button.task_id = task.task_id
await tasks_scroll.mount(button)
except Exception:
pass

async def watch_selected_task(self, new_task: str) -> None:
if not new_task:
return

try:
await self.query_one("#respond").remove()
except Exception:
# not mounted yet
pass

container = Container(
Static(f"Task: {new_task}"),
Input(
placeholder="Type your response here",
),
id="respond",
)

# mount the container
await self.mount(container)

def on_button_pressed(self, event: Button.Pressed) -> None:
# Update the details panel with the selected item
self.selected_task = event.button.label

async def on_input_submitted(self, event: Input.Submitted) -> None:
response = HumanResponse(result=event.value).model_dump()
async with httpx.AsyncClient() as client:
await client.post(
f"{self.human_service_url}/tasks/{self.selected_task}/handle",
json=response,
)

# remove the input container
await self.query_one("#respond").remove()

# remove the task from the list
new_tasks = [task for task in self.tasks if task.task_id != self.selected_task]
self.tasks = [*new_tasks]
49 changes: 49 additions & 0 deletions agentfile/app/components/service_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import httpx
from typing import Any, List

from textual.app import ComposeResult
from textual.containers import VerticalScroll
from textual.reactive import reactive
from textual.widgets import Button, Static

from agentfile.app.components.types import ButtonType


class ServiceButton(Button):
type: ButtonType = ButtonType.SERVICE


class ServicesList(Static):
services: List[str] = reactive([])

def __init__(self, control_plane_url: str, **kwargs: Any):
self.control_plane_url = control_plane_url
super().__init__(**kwargs)

def compose(self) -> ComposeResult:
with VerticalScroll(id="services-scroll"):
for service in self.services:
yield ServiceButton(service)

async def on_mount(self) -> None:
self.set_interval(2, self.refresh_services)

async def refresh_services(self) -> None:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.control_plane_url}/services")
services_dict = response.json()

new_services = []
for service_name in services_dict:
new_services.append(service_name)

self.services = [*new_services]

async def watch_services(self, new_services: List[str]) -> None:
try:
services_scroll = self.query_one("#services-scroll")
await services_scroll.remove_children()
for service in new_services:
await services_scroll.mount(ServiceButton(service))
except Exception:
pass
Loading
Loading