Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime): Execute codes in a sandbox environment #2119

Merged
merged 1 commit into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions dbgpt/agent/core/memory/gpts/gpts_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(
message_memory if message_memory is not None else DefaultGptsMessageMemory()
)

self.messages_cache: defaultdict = defaultdict(List[GptsMessage])
self.messages_cache: defaultdict = defaultdict(list)
self.channels: defaultdict = defaultdict(Queue)
self.enable_vis_map: defaultdict = defaultdict(bool)
self.start_round_map: defaultdict = defaultdict(int)
Expand Down Expand Up @@ -374,9 +374,9 @@ async def _messages_to_agents_vis(
"receiver": message.receiver,
"model": message.model_name,
"markdown": view_info,
"resource": message.resource_info
if message.resource_info
else None,
"resource": (
message.resource_info if message.resource_info else None
),
}
)
return await vis_client.get(VisAgentMessages.vis_tag()).display(
Expand Down Expand Up @@ -427,3 +427,20 @@ async def _messages_to_app_link_vis(
else:
param["status"] = Status.COMPLETE.value
return await vis_client.get(VisAppLink.vis_tag()).display(content=param)

async def chat_messages(
self,
conv_id: str,
):
"""Get chat messages."""
while True:
queue = self.queue(conv_id)
if not queue:
break
item = await queue.get()
if item == "[DONE]":
queue.task_done()
break
else:
yield item
await asyncio.sleep(0.005)
8 changes: 8 additions & 0 deletions dbgpt/app/component_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def initialize_components(
# Register serve apps
register_serve_apps(system_app, CFG, param.port)
_initialize_operators()
_initialize_code_server(system_app)


def _initialize_model_cache(system_app: SystemApp, port: int):
Expand Down Expand Up @@ -132,6 +133,7 @@ def _initialize_openapi(system_app: SystemApp):


def _initialize_operators():
from dbgpt.app.operators.code import CodeMapOperator
from dbgpt.app.operators.converter import StringToInteger
from dbgpt.app.operators.datasource import (
HODatasourceExecutorOperator,
Expand All @@ -140,3 +142,9 @@ def _initialize_operators():
from dbgpt.app.operators.llm import HOLLMOperator, HOStreamingLLMOperator
from dbgpt.app.operators.rag import HOKnowledgeOperator
from dbgpt.serve.agent.resource.datasource import DatasourceResource


def _initialize_code_server(system_app: SystemApp):
from dbgpt.util.code.server import initialize_code_server

initialize_code_server(system_app)
322 changes: 322 additions & 0 deletions dbgpt/app/operators/code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
"""Code operators for DB-GPT.

The code will be executed in a sandbox environment, which is isolated from the host
system. You can limit the memory and file system access of the code execution.
"""

import json
import logging
import os

from dbgpt.core import ModelRequest
from dbgpt.core.awel import MapOperator
from dbgpt.core.awel.flow import (
TAGS_ORDER_HIGH,
IOField,
OperatorCategory,
OptionValue,
Parameter,
ViewMetadata,
ui,
)
from dbgpt.util.code.server import get_code_server
from dbgpt.util.i18n_utils import _

logger = logging.getLogger(__name__)

_FN_PYTHON_MAP = """
import os
import json
import lyric_task
from lyric_py_task.imports import msgpack

def fn_map(args: dict[str, any]) -> dict[str, any]:
text = args.get("text")
return {
"text": text,
"key0": "customized key",
"key1": "hello, world",
"key2": [1, 2, 3],
"key3": {"a": 1, "b": 2},
}
"""

_FN_JAVASCRIPT_MAP = """
function fn_map(args) {
var text = args.text;
return {
text: text,
key0: "customized key",
key1: "hello, world",
key2: [1, 2, 3],
key3: {a: 1, b: 2},
};
}
"""


class CodeMapOperator(MapOperator[dict, dict]):
metadata = ViewMetadata(
label=_("Code Map Operator"),
name="default_code_map_operator",
description=_(
"Handle input dictionary with code and return output dictionary after execution."
),
category=OperatorCategory.CODE,
parameters=[
Parameter.build_from(
_("Code Editor"),
"code",
type=str,
optional=True,
default=_FN_PYTHON_MAP,
placeholder=_("Please input your code"),
description=_("The code to be executed."),
ui=ui.UICodeEditor(
language="python",
),
),
Parameter.build_from(
_("Language"),
"lang",
type=str,
optional=True,
default="python",
placeholder=_("Please select the language"),
description=_("The language of the code."),
options=[
OptionValue(label="Python", name="python", value="python"),
OptionValue(
label="JavaScript", name="javascript", value="javascript"
),
],
ui=ui.UISelect(),
),
Parameter.build_from(
_("Call Name"),
"call_name",
type=str,
optional=True,
default="fn_map",
placeholder=_("Please input the call name"),
description=_("The call name of the function."),
),
],
inputs=[
IOField.build_from(
_("Input Data"),
"input",
type=dict,
description=_("The input dictionary."),
)
],
outputs=[
IOField.build_from(
_("Output Data"),
"output",
type=dict,
description=_("The output dictionary."),
)
],
tags={"order": TAGS_ORDER_HIGH},
)

def __init__(
self,
code: str = _FN_PYTHON_MAP,
lang: str = "python",
call_name: str = "fn_map",
**kwargs,
):
super().__init__(**kwargs)
self.code = code
self.lang = lang
self.call_name = call_name

async def map(self, input_value: dict) -> dict:
exec_input_data_bytes = json.dumps(input_value).encode("utf-8")
code_server = await get_code_server()
result = await code_server.exec1(
self.code, exec_input_data_bytes, call_name=self.call_name, lang=self.lang
)
logger.info(f"Code execution result: {result}")
return result.output


_REQ_BUILD_PY_FUNC = """
import os

def fn_map(args: dict[str, any]) -> dict[str, any]:

llm_model = args.get("model", os.getenv("DBGPT_RUNTIME_LLM_MODEL"))
messages: str | list[str] = args.get("messages", [])
if isinstance(messages, str):
human_message = messages
else:
human_message = messages[0]

temperature = float(args.get("temperature") or 0.5)
max_new_tokens = int(args.get("max_new_tokens") or 2048)
conv_uid = args.get("conv_uid", "")
print("Conv uid is: ", conv_uid)

messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "human", "content": human_message}
]
return {
"model": llm_model,
"messages": messages,
"temperature": temperature,
"max_new_tokens": max_new_tokens
}
"""

_REQ_BUILD_JS_FUNC = """
function fn_map(args) {
var llm_model = args.model || "chatgpt_proxyllm";
var messages = args.messages || [];
var human_message = messages[0];
var temperature = parseFloat(args.temperature) || 0.5;
var max_new_tokens = parseInt(args.max_new_tokens) || 2048;
var conv_uid = args.conv_uid || "";
console.log("Conv uid is: ", conv_uid);

messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "human", "content": human_message}
];
return {
model: llm_model,
messages: messages,
temperature: temperature,
max_new_tokens: max_new_tokens
};
}
"""


class CodeDictToModelRequestOperator(MapOperator[dict, ModelRequest]):
metadata = ViewMetadata(
label=_("Code Dict to Model Request Operator"),
name="default_code_dict_to_model_request_operator",
description=_(
"Handle input dictionary with code and return output ModelRequest after execution."
),
category=OperatorCategory.CODE,
parameters=[
Parameter.build_from(
_("Code Editor"),
"code",
type=str,
optional=True,
default=_REQ_BUILD_PY_FUNC,
placeholder=_("Please input your code"),
description=_("The code to be executed."),
ui=ui.UICodeEditor(
language="python",
),
),
Parameter.build_from(
_("Language"),
"lang",
type=str,
optional=True,
default="python",
placeholder=_("Please select the language"),
description=_("The language of the code."),
options=[
OptionValue(label="Python", name="python", value="python"),
OptionValue(
label="JavaScript", name="javascript", value="javascript"
),
],
ui=ui.UISelect(),
),
Parameter.build_from(
_("Call Name"),
"call_name",
type=str,
optional=True,
default="fn_map",
placeholder=_("Please input the call name"),
description=_("The call name of the function."),
),
],
inputs=[
IOField.build_from(
_("Input Data"),
"input",
type=dict,
description=_("The input dictionary."),
)
],
outputs=[
IOField.build_from(
_("Output Data"),
"output",
type=ModelRequest,
description=_("The output ModelRequest."),
)
],
tags={"order": TAGS_ORDER_HIGH},
)

def __init__(
self,
code: str = _REQ_BUILD_PY_FUNC,
lang: str = "python",
call_name: str = "fn_map",
**kwargs,
):
super().__init__(**kwargs)
self.code = code
self.lang = lang
self.call_name = call_name

async def map(self, input_value: dict) -> ModelRequest:
from lyric import PyTaskFsConfig, PyTaskMemoryConfig, PyTaskResourceConfig

exec_input_data_bytes = json.dumps(input_value).encode("utf-8")
code_server = await get_code_server()
model_name = os.getenv("LLM_MODEL")

fs = PyTaskFsConfig(
preopens=[
# Mount the /tmp directory to the /tmp directory in the sandbox
# Directory permissions are set to 3 (read and write)
# File permissions are set to 3 (read and write)
("/tmp", "/tmp", 3, 3),
# Mount the current directory to the /home directory in the sandbox
# Directory and file permissions are set to 1 (read)
(".", "/home", 1, 1),
]
)
memory = PyTaskMemoryConfig(memory_limit=50 * 1024 * 1024) # 50MB in bytes
resources = PyTaskResourceConfig(
fs=fs,
memory=memory,
env_vars=[
("DBGPT_RUNTIME_LLM_MODEL", model_name),
],
)
result = await code_server.exec1(
self.code,
exec_input_data_bytes,
call_name=self.call_name,
lang=self.lang,
resources=resources,
)
logger.info(f"Code execution result: {result}")
if result.exit_code != 0:
raise RuntimeError(f"Code execution failed: {result.logs}")

if not result.output:
raise RuntimeError(f"Code execution failed: {result.logs}")

if not isinstance(result.output, dict):
raise RuntimeError(
f"Code execution failed, invalid output: {result.output}"
)
logger.info(f"Code execution result: {result}")
return ModelRequest(**result.output)
Loading
Loading