diff --git a/assets/wechat.jpg b/assets/wechat.jpg index b5dfaaec5..414aa006f 100644 Binary files a/assets/wechat.jpg and b/assets/wechat.jpg differ diff --git a/dbgpt/agent/core/memory/gpts/gpts_memory.py b/dbgpt/agent/core/memory/gpts/gpts_memory.py index 6f1da404b..1468cf5b6 100644 --- a/dbgpt/agent/core/memory/gpts/gpts_memory.py +++ b/dbgpt/agent/core/memory/gpts/gpts_memory.py @@ -35,7 +35,7 @@ def __init__( message_memory if message_memory is not None else DefaultGptsMessageMemory() ) - self.messages_cache: defaultdict = defaultdict(List[GptsMessage]) + self.messages_cache: defaultdict = defaultdict(list) self.channels: defaultdict = defaultdict(Queue) self.enable_vis_map: defaultdict = defaultdict(bool) self.start_round_map: defaultdict = defaultdict(int) @@ -374,9 +374,9 @@ async def _messages_to_agents_vis( "receiver": message.receiver, "model": message.model_name, "markdown": view_info, - "resource": message.resource_info - if message.resource_info - else None, + "resource": ( + message.resource_info if message.resource_info else None + ), } ) return await vis_client.get(VisAgentMessages.vis_tag()).display( @@ -427,3 +427,20 @@ async def _messages_to_app_link_vis( else: param["status"] = Status.COMPLETE.value return await vis_client.get(VisAppLink.vis_tag()).display(content=param) + + async def chat_messages( + self, + conv_id: str, + ): + """Get chat messages.""" + while True: + queue = self.queue(conv_id) + if not queue: + break + item = await queue.get() + if item == "[DONE]": + queue.task_done() + break + else: + yield item + await asyncio.sleep(0.005) diff --git a/dbgpt/agent/core/plan/awel/agent_operator.py b/dbgpt/agent/core/plan/awel/agent_operator.py index 650c92cf0..8a68d307a 100644 --- a/dbgpt/agent/core/plan/awel/agent_operator.py +++ b/dbgpt/agent/core/plan/awel/agent_operator.py @@ -22,7 +22,6 @@ # TODO: Don't dependent on MixinLLMOperator from dbgpt.model.operators.llm_operator import MixinLLMOperator -from dbgpt.serve.prompt.api.endpoints import get_service from dbgpt.util.i18n_utils import _ from .... import ActionOutput @@ -291,6 +290,7 @@ async def get_agent( prompt_template = None if self.awel_agent.agent_prompt: + from dbgpt.serve.prompt.api.endpoints import get_service prompt_service = get_service() prompt_template = prompt_service.get_template( self.awel_agent.agent_prompt.code diff --git a/dbgpt/agent/core/plan/awel/agent_operator_resource.py b/dbgpt/agent/core/plan/awel/agent_operator_resource.py index 226274f8c..bbdbbfa68 100644 --- a/dbgpt/agent/core/plan/awel/agent_operator_resource.py +++ b/dbgpt/agent/core/plan/awel/agent_operator_resource.py @@ -12,8 +12,6 @@ ResourceCategory, register_resource, ) -from dbgpt.serve.prompt.api.endpoints import get_service - from ....resource.base import AgentResource, ResourceType from ....resource.manage import get_resource_manager from ....util.llm.llm import LLMConfig, LLMStrategyType @@ -21,6 +19,7 @@ def _agent_resource_prompt_values() -> List[OptionValue]: + from dbgpt.serve.prompt.api.endpoints import get_service prompt_service = get_service() prompts = prompt_service.get_target_prompt() return [ diff --git a/dbgpt/app/component_configs.py b/dbgpt/app/component_configs.py index e50ec5679..df77ee014 100644 --- a/dbgpt/app/component_configs.py +++ b/dbgpt/app/component_configs.py @@ -61,6 +61,7 @@ def initialize_components( # Register serve apps register_serve_apps(system_app, CFG, param.port) _initialize_operators() + _initialize_code_server(system_app) def _initialize_model_cache(system_app: SystemApp, port: int): @@ -132,6 +133,7 @@ def _initialize_openapi(system_app: SystemApp): def _initialize_operators(): + from dbgpt.app.operators.code import CodeMapOperator from dbgpt.app.operators.converter import StringToInteger from dbgpt.app.operators.datasource import ( HODatasourceExecutorOperator, @@ -140,3 +142,9 @@ def _initialize_operators(): from dbgpt.app.operators.llm import HOLLMOperator, HOStreamingLLMOperator from dbgpt.app.operators.rag import HOKnowledgeOperator from dbgpt.serve.agent.resource.datasource import DatasourceResource + + +def _initialize_code_server(system_app: SystemApp): + from dbgpt.util.code.server import initialize_code_server + + initialize_code_server(system_app) diff --git a/dbgpt/app/knowledge/api.py b/dbgpt/app/knowledge/api.py index cb5c8b370..9e00db3ec 100644 --- a/dbgpt/app/knowledge/api.py +++ b/dbgpt/app/knowledge/api.py @@ -3,8 +3,9 @@ import shutil import tempfile from typing import List +from pathlib import Path -from fastapi import APIRouter, Depends, File, Form, UploadFile +from fastapi import APIRouter, Depends, File, Form, UploadFile, HTTPException from dbgpt._private.config import Config from dbgpt.app.knowledge.request.request import ( @@ -332,54 +333,70 @@ def document_delete(space_name: str, query_request: DocumentQueryRequest): @router.post("/knowledge/{space_name}/document/upload") async def document_upload( - space_name: str, - doc_name: str = Form(...), - doc_type: str = Form(...), - doc_file: UploadFile = File(...), + space_name: str, + doc_name: str = Form(...), + doc_type: str = Form(...), + doc_file: UploadFile = File(...), ): - print(f"/document/upload params: {space_name}") - try: - if doc_file: - if not os.path.exists(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space_name)): - os.makedirs(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space_name)) - # We can not move temp file in windows system when we open file in context of `with` - tmp_fd, tmp_path = tempfile.mkstemp( - dir=os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space_name) - ) - with os.fdopen(tmp_fd, "wb") as tmp: - tmp.write(await doc_file.read()) - shutil.move( - tmp_path, - os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, space_name, doc_file.filename), - ) - request = KnowledgeDocumentRequest() - request.doc_name = doc_name - request.doc_type = doc_type - request.content = os.path.join( - KNOWLEDGE_UPLOAD_ROOT_PATH, space_name, doc_file.filename - ) - space_res = knowledge_space_service.get_knowledge_space( - KnowledgeSpaceRequest(name=space_name) - ) - if len(space_res) == 0: - # create default space - if "default" != space_name: - raise Exception(f"you have not create your knowledge space.") - knowledge_space_service.create_knowledge_space( - KnowledgeSpaceRequest( - name=space_name, - desc="first db-gpt rag application", - owner="dbgpt", - ) - ) - return Result.succ( - knowledge_space_service.create_knowledge_document( - space=space_name, request=request - ) - ) - return Result.failed(code="E000X", msg=f"doc_file is None") - except Exception as e: - return Result.failed(code="E000X", msg=f"document add error {e}") + print(f"/document/upload params: {space_name}") + try: + if doc_file: + # Sanitize inputs to prevent path traversal + safe_space_name = os.path.basename(space_name) + safe_filename = os.path.basename(doc_file.filename) + + # Create absolute paths and verify they are within allowed directory + upload_dir = os.path.abspath(os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, safe_space_name)) + target_path = os.path.abspath(os.path.join(upload_dir, safe_filename)) + + if not os.path.abspath(KNOWLEDGE_UPLOAD_ROOT_PATH) in target_path: + raise HTTPException(status_code=400, detail="Invalid path detected") + + if not os.path.exists(upload_dir): + os.makedirs(upload_dir) + + # Create temp file + tmp_fd, tmp_path = tempfile.mkstemp(dir=upload_dir) + + try: + with os.fdopen(tmp_fd, "wb") as tmp: + tmp.write(await doc_file.read()) + + shutil.move(tmp_path, target_path) + + request = KnowledgeDocumentRequest() + request.doc_name = doc_name + request.doc_type = doc_type + request.content = target_path + + space_res = knowledge_space_service.get_knowledge_space( + KnowledgeSpaceRequest(name=safe_space_name) + ) + if len(space_res) == 0: + # create default space + if "default" != safe_space_name: + raise Exception(f"you have not create your knowledge space.") + knowledge_space_service.create_knowledge_space( + KnowledgeSpaceRequest( + name=safe_space_name, + desc="first db-gpt rag application", + owner="dbgpt", + ) + ) + return Result.succ( + knowledge_space_service.create_knowledge_document( + space=safe_space_name, request=request + ) + ) + except Exception as e: + # Clean up temp file if anything goes wrong + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise e + + return Result.failed(code="E000X", msg=f"doc_file is None") + except Exception as e: + return Result.failed(code="E000X", msg=f"document add error {e}") @router.post("/knowledge/{space_name}/document/sync") diff --git a/dbgpt/app/operators/code.py b/dbgpt/app/operators/code.py new file mode 100644 index 000000000..853483a96 --- /dev/null +++ b/dbgpt/app/operators/code.py @@ -0,0 +1,322 @@ +"""Code operators for DB-GPT. + +The code will be executed in a sandbox environment, which is isolated from the host +system. You can limit the memory and file system access of the code execution. +""" + +import json +import logging +import os + +from dbgpt.core import ModelRequest +from dbgpt.core.awel import MapOperator +from dbgpt.core.awel.flow import ( + TAGS_ORDER_HIGH, + IOField, + OperatorCategory, + OptionValue, + Parameter, + ViewMetadata, + ui, +) +from dbgpt.util.code.server import get_code_server +from dbgpt.util.i18n_utils import _ + +logger = logging.getLogger(__name__) + +_FN_PYTHON_MAP = """ +import os +import json +import lyric_task +from lyric_py_task.imports import msgpack + +def fn_map(args: dict[str, any]) -> dict[str, any]: + text = args.get("text") + return { + "text": text, + "key0": "customized key", + "key1": "hello, world", + "key2": [1, 2, 3], + "key3": {"a": 1, "b": 2}, + } +""" + +_FN_JAVASCRIPT_MAP = """ +function fn_map(args) { + var text = args.text; + return { + text: text, + key0: "customized key", + key1: "hello, world", + key2: [1, 2, 3], + key3: {a: 1, b: 2}, + }; +} +""" + + +class CodeMapOperator(MapOperator[dict, dict]): + metadata = ViewMetadata( + label=_("Code Map Operator"), + name="default_code_map_operator", + description=_( + "Handle input dictionary with code and return output dictionary after execution." + ), + category=OperatorCategory.CODE, + parameters=[ + Parameter.build_from( + _("Code Editor"), + "code", + type=str, + optional=True, + default=_FN_PYTHON_MAP, + placeholder=_("Please input your code"), + description=_("The code to be executed."), + ui=ui.UICodeEditor( + language="python", + ), + ), + Parameter.build_from( + _("Language"), + "lang", + type=str, + optional=True, + default="python", + placeholder=_("Please select the language"), + description=_("The language of the code."), + options=[ + OptionValue(label="Python", name="python", value="python"), + OptionValue( + label="JavaScript", name="javascript", value="javascript" + ), + ], + ui=ui.UISelect(), + ), + Parameter.build_from( + _("Call Name"), + "call_name", + type=str, + optional=True, + default="fn_map", + placeholder=_("Please input the call name"), + description=_("The call name of the function."), + ), + ], + inputs=[ + IOField.build_from( + _("Input Data"), + "input", + type=dict, + description=_("The input dictionary."), + ) + ], + outputs=[ + IOField.build_from( + _("Output Data"), + "output", + type=dict, + description=_("The output dictionary."), + ) + ], + tags={"order": TAGS_ORDER_HIGH}, + ) + + def __init__( + self, + code: str = _FN_PYTHON_MAP, + lang: str = "python", + call_name: str = "fn_map", + **kwargs, + ): + super().__init__(**kwargs) + self.code = code + self.lang = lang + self.call_name = call_name + + async def map(self, input_value: dict) -> dict: + exec_input_data_bytes = json.dumps(input_value).encode("utf-8") + code_server = await get_code_server() + result = await code_server.exec1( + self.code, exec_input_data_bytes, call_name=self.call_name, lang=self.lang + ) + logger.info(f"Code execution result: {result}") + return result.output + + +_REQ_BUILD_PY_FUNC = """ +import os + +def fn_map(args: dict[str, any]) -> dict[str, any]: + + llm_model = args.get("model", os.getenv("DBGPT_RUNTIME_LLM_MODEL")) + messages: str | list[str] = args.get("messages", []) + if isinstance(messages, str): + human_message = messages + else: + human_message = messages[0] + + temperature = float(args.get("temperature") or 0.5) + max_new_tokens = int(args.get("max_new_tokens") or 2048) + conv_uid = args.get("conv_uid", "") + print("Conv uid is: ", conv_uid) + + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "human", "content": human_message} + ] + return { + "model": llm_model, + "messages": messages, + "temperature": temperature, + "max_new_tokens": max_new_tokens + } +""" + +_REQ_BUILD_JS_FUNC = """ +function fn_map(args) { + var llm_model = args.model || "chatgpt_proxyllm"; + var messages = args.messages || []; + var human_message = messages[0]; + var temperature = parseFloat(args.temperature) || 0.5; + var max_new_tokens = parseInt(args.max_new_tokens) || 2048; + var conv_uid = args.conv_uid || ""; + console.log("Conv uid is: ", conv_uid); + + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "human", "content": human_message} + ]; + return { + model: llm_model, + messages: messages, + temperature: temperature, + max_new_tokens: max_new_tokens + }; +} +""" + + +class CodeDictToModelRequestOperator(MapOperator[dict, ModelRequest]): + metadata = ViewMetadata( + label=_("Code Dict to Model Request Operator"), + name="default_code_dict_to_model_request_operator", + description=_( + "Handle input dictionary with code and return output ModelRequest after execution." + ), + category=OperatorCategory.CODE, + parameters=[ + Parameter.build_from( + _("Code Editor"), + "code", + type=str, + optional=True, + default=_REQ_BUILD_PY_FUNC, + placeholder=_("Please input your code"), + description=_("The code to be executed."), + ui=ui.UICodeEditor( + language="python", + ), + ), + Parameter.build_from( + _("Language"), + "lang", + type=str, + optional=True, + default="python", + placeholder=_("Please select the language"), + description=_("The language of the code."), + options=[ + OptionValue(label="Python", name="python", value="python"), + OptionValue( + label="JavaScript", name="javascript", value="javascript" + ), + ], + ui=ui.UISelect(), + ), + Parameter.build_from( + _("Call Name"), + "call_name", + type=str, + optional=True, + default="fn_map", + placeholder=_("Please input the call name"), + description=_("The call name of the function."), + ), + ], + inputs=[ + IOField.build_from( + _("Input Data"), + "input", + type=dict, + description=_("The input dictionary."), + ) + ], + outputs=[ + IOField.build_from( + _("Output Data"), + "output", + type=ModelRequest, + description=_("The output ModelRequest."), + ) + ], + tags={"order": TAGS_ORDER_HIGH}, + ) + + def __init__( + self, + code: str = _REQ_BUILD_PY_FUNC, + lang: str = "python", + call_name: str = "fn_map", + **kwargs, + ): + super().__init__(**kwargs) + self.code = code + self.lang = lang + self.call_name = call_name + + async def map(self, input_value: dict) -> ModelRequest: + from lyric import PyTaskFsConfig, PyTaskMemoryConfig, PyTaskResourceConfig + + exec_input_data_bytes = json.dumps(input_value).encode("utf-8") + code_server = await get_code_server() + model_name = os.getenv("LLM_MODEL") + + fs = PyTaskFsConfig( + preopens=[ + # Mount the /tmp directory to the /tmp directory in the sandbox + # Directory permissions are set to 3 (read and write) + # File permissions are set to 3 (read and write) + ("/tmp", "/tmp", 3, 3), + # Mount the current directory to the /home directory in the sandbox + # Directory and file permissions are set to 1 (read) + (".", "/home", 1, 1), + ] + ) + memory = PyTaskMemoryConfig(memory_limit=50 * 1024 * 1024) # 50MB in bytes + resources = PyTaskResourceConfig( + fs=fs, + memory=memory, + env_vars=[ + ("DBGPT_RUNTIME_LLM_MODEL", model_name), + ], + ) + result = await code_server.exec1( + self.code, + exec_input_data_bytes, + call_name=self.call_name, + lang=self.lang, + resources=resources, + ) + logger.info(f"Code execution result: {result}") + if result.exit_code != 0: + raise RuntimeError(f"Code execution failed: {result.logs}") + + if not result.output: + raise RuntimeError(f"Code execution failed: {result.logs}") + + if not isinstance(result.output, dict): + raise RuntimeError( + f"Code execution failed, invalid output: {result.output}" + ) + logger.info(f"Code execution result: {result}") + return ModelRequest(**result.output) diff --git a/dbgpt/app/operators/converter.py b/dbgpt/app/operators/converter.py index 1115e0de4..bcf9824f6 100644 --- a/dbgpt/app/operators/converter.py +++ b/dbgpt/app/operators/converter.py @@ -1,5 +1,6 @@ """Type Converter Operators.""" +from dbgpt.core import ModelOutput from dbgpt.core.awel import MapOperator from dbgpt.core.awel.flow import ( TAGS_ORDER_HIGH, @@ -184,3 +185,22 @@ class BooleanToString(MapOperator[bool, str]): def __init__(self, **kwargs): """Create a new BooleanToString operator.""" super().__init__(map_function=lambda x: str(x), **kwargs) + + +class ModelOutputToDict(MapOperator[ModelOutput, dict]): + """Converts a model output to a dictionary.""" + + metadata = ViewMetadata( + label=_("Model Output to Dict"), + name="default_converter_model_output_to_dict", + description=_("Converts a model output to a dictionary."), + category=OperatorCategory.TYPE_CONVERTER, + parameters=[], + inputs=[IOField.build_from(_("Model Output"), "model_output", ModelOutput)], + outputs=[IOField.build_from(_("Dictionary"), "dict", dict)], + tags={"order": TAGS_ORDER_HIGH}, + ) + + def __init__(self, **kwargs): + """Create a new ModelOutputToDict operator.""" + super().__init__(map_function=lambda x: x.to_dict(), **kwargs) diff --git a/dbgpt/app/scene/base_chat.py b/dbgpt/app/scene/base_chat.py index 677ac2bd6..d1fc27334 100644 --- a/dbgpt/app/scene/base_chat.py +++ b/dbgpt/app/scene/base_chat.py @@ -277,6 +277,8 @@ async def stream_call(self): ) payload.span_id = span.span_id try: + msg = "ERROR! No response from model" + view_msg = msg async for output in self.call_streaming_operator(payload): # Plugin research in result generation msg = self.prompt_template.output_parser.parse_model_stream_resp_ex( diff --git a/dbgpt/core/awel/flow/base.py b/dbgpt/core/awel/flow/base.py index 99aa77c8b..336b3a32b 100644 --- a/dbgpt/core/awel/flow/base.py +++ b/dbgpt/core/awel/flow/base.py @@ -151,6 +151,7 @@ def __init__(self, label: str, description: str): "database": _CategoryDetail("Database", "Interact with the database"), "type_converter": _CategoryDetail("Type Converter", "Convert the type"), "example": _CategoryDetail("Example", "Example operator"), + "code": _CategoryDetail("Code", "Code operator"), } @@ -169,6 +170,7 @@ class OperatorCategory(str, Enum): DATABASE = "database" TYPE_CONVERTER = "type_converter" EXAMPLE = "example" + CODE = "code" def label(self) -> str: """Get the label of the category.""" @@ -1361,6 +1363,8 @@ def _register_operator(view_cls: Optional[Type[T]]): """Register the operator.""" if not view_cls or not view_cls.metadata: return + if "metadata" not in view_cls.__dict__: + return # Skip the base class metadata = view_cls.metadata metadata.type_name = view_cls.__qualname__ metadata.type_cls = _get_type_name(view_cls) diff --git a/dbgpt/storage/vector_store/milvus_store.py b/dbgpt/storage/vector_store/milvus_store.py index 6b2d89f50..3763f14f8 100644 --- a/dbgpt/storage/vector_store/milvus_store.py +++ b/dbgpt/storage/vector_store/milvus_store.py @@ -458,7 +458,7 @@ def similar_search_with_scores( # convert to milvus expr filter. milvus_filter_expr = self.convert_metadata_filters(filters) if filters else None _, docs_and_scores = self._search( - query=text, topk=topk, expr=milvus_filter_expr + query=text, k=topk, expr=milvus_filter_expr ) if any(score < 0.0 or score > 1.0 for _, score, id in docs_and_scores): logger.warning( diff --git a/dbgpt/util/code/__init__.py b/dbgpt/util/code/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbgpt/util/code/server.py b/dbgpt/util/code/server.py new file mode 100644 index 000000000..9b312b8aa --- /dev/null +++ b/dbgpt/util/code/server.py @@ -0,0 +1,152 @@ +import asyncio +import logging +from enum import Enum, auto +from typing import Dict, Optional, cast + +from lyric import CodeResult, DefaultLyricDriver, PyTaskResourceConfig + +from dbgpt.component import BaseComponent, SystemApp + +logger = logging.getLogger(__name__) + + +class ServerState(Enum): + INIT = auto() + STARTING = auto() + READY = auto() + STOPPING = auto() + STOPPED = auto() + + +class CodeServer(BaseComponent): + def __init__(self, system_app: Optional[SystemApp] = None): + self.sys_app = system_app + super().__init__(system_app) + self._lcd = DefaultLyricDriver() + self._init_lock = asyncio.Lock() + self._state = ServerState.INIT + self._ready_event = asyncio.Event() + + def init_app(self, system_app: SystemApp): + self.sys_app = system_app + + def before_start(self): + if self._state == ServerState.INIT: + self._state = ServerState.STARTING + self._lcd.start() + + def before_stop(self): + self._state = ServerState.STOPPING + self._lcd.stop() + self._state = ServerState.STOPPED + self._ready_event.clear() + + async def async_after_start(self): + await self._ensure_initialized() + + async def _ensure_initialized(self): + """Ensure the server is initialized and workers are loaded.""" + if self._state == ServerState.READY: + return + + async with self._init_lock: + # Double check after acquiring lock + if self._state == ServerState.READY: + return + + if self._state == ServerState.INIT: + logger.info("Starting code server...") + self._state = ServerState.STARTING + self._lcd.start() + if self._state == ServerState.STARTING: + await self._lcd.lyric.load_default_workers() + self._state = ServerState.READY + self._ready_event.set() + logger.info("Code server is ready") + + async def wait_ready(self, timeout: Optional[float] = None) -> bool: + """Wait until server is ready. + + Args: + timeout: Maximum time to wait in seconds. None means wait forever. + + Returns: + bool: True if server is ready, False if timeout occurred + """ + try: + await asyncio.wait_for(self._ready_event.wait(), timeout) + return True + except asyncio.TimeoutError: + return False + + async def exec( + self, code: str, lang: str, resources: Optional[PyTaskResourceConfig] = None + ) -> CodeResult: + await self._ensure_initialized() + return await self._lcd.exec(code, lang, resources=resources) + + async def exec1( + self, + code: str, + input_bytes: bytes, + call_name: str, + lang: str = "python", + resources: Optional[PyTaskResourceConfig] = None, + ) -> CodeResult: + await self._ensure_initialized() + return await self._lcd.exec1( + code, input_bytes, call_name, lang=lang, resources=resources + ) + + async def parse_awel(self, code: str) -> Optional[Dict]: + """Parse the AWEL code. + + Return the flow metadata. + """ + raise NotImplementedError + + async def run_awel_operator(self, code: str): + """Run an AWEL operator in remote mode.""" + raise NotImplementedError + + +_SYSTEM_APP: Optional[SystemApp] = None + + +def initialize_code_server(system_app: SystemApp): + """Initialize the code server.""" + global _SYSTEM_APP + _SYSTEM_APP = system_app + code_server = CodeServer(system_app) + system_app.register_instance(code_server) + + +async def get_code_server( + system_app: Optional[SystemApp] = None, + wait_ready: bool = True, + timeout: Optional[float] = None, +) -> CodeServer: + """Return the code server. + + Args: + system_app (Optional[SystemApp]): The system app. Defaults to None. + wait_ready (bool): Whether to wait for server to be ready. Defaults to True. + timeout (Optional[float]): Maximum time to wait in seconds. None means wait forever. + + Returns: + CodeServer: The code server. + """ + if not _SYSTEM_APP: + if not system_app: + system_app = SystemApp() + initialize_code_server(system_app) + + app = system_app or _SYSTEM_APP + server = CodeServer.get_instance(cast(SystemApp, app)) + + if wait_ready: + await server._ensure_initialized() + if not await server.wait_ready(timeout): + raise TimeoutError("Timeout waiting for code server to be ready") + + return server diff --git a/docs/static/img/application/awel/awel_flow_page.png b/docs/static/img/application/awel/awel_flow_page.png new file mode 100644 index 000000000..8b84fdc17 Binary files /dev/null and b/docs/static/img/application/awel/awel_flow_page.png differ diff --git a/docs/static/img/application/awel/flow_def_rag_ko_1.png b/docs/static/img/application/awel/flow_def_rag_ko_1.png new file mode 100644 index 000000000..14b451183 Binary files /dev/null and b/docs/static/img/application/awel/flow_def_rag_ko_1.png differ diff --git a/docs/static/img/application/awel/flow_dev_empty_page_img.png b/docs/static/img/application/awel/flow_dev_empty_page_img.png new file mode 100644 index 000000000..8c738d3e3 Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_empty_page_img.png differ diff --git a/docs/static/img/application/awel/flow_dev_rag_ko_2.png b/docs/static/img/application/awel/flow_dev_rag_ko_2.png new file mode 100644 index 000000000..ac6517ed8 Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_rag_ko_2.png differ diff --git a/docs/static/img/application/awel/flow_dev_rag_llm_1.png b/docs/static/img/application/awel/flow_dev_rag_llm_1.png new file mode 100644 index 000000000..a6c8d4655 Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_rag_llm_1.png differ diff --git a/docs/static/img/application/awel/flow_dev_rag_llm_2_.png b/docs/static/img/application/awel/flow_dev_rag_llm_2_.png new file mode 100644 index 000000000..1082dffdd Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_rag_llm_2_.png differ diff --git a/docs/static/img/application/awel/flow_dev_rag_llm_3.png b/docs/static/img/application/awel/flow_dev_rag_llm_3.png new file mode 100644 index 000000000..020f75ad0 Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_rag_llm_3.png differ diff --git a/docs/static/img/application/awel/flow_dev_rag_output_1.png b/docs/static/img/application/awel/flow_dev_rag_output_1.png new file mode 100644 index 000000000..b8023a83e Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_rag_output_1.png differ diff --git a/docs/static/img/application/awel/flow_dev_rag_prompt_1.png b/docs/static/img/application/awel/flow_dev_rag_prompt_1.png new file mode 100644 index 000000000..bb95ce87c Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_rag_prompt_1.png differ diff --git a/docs/static/img/application/awel/flow_dev_rag_save_1.png b/docs/static/img/application/awel/flow_dev_rag_save_1.png new file mode 100644 index 000000000..3e1d68350 Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_rag_save_1.png differ diff --git a/docs/static/img/application/awel/flow_dev_rag_show_1.png b/docs/static/img/application/awel/flow_dev_rag_show_1.png new file mode 100644 index 000000000..0ae16c992 Binary files /dev/null and b/docs/static/img/application/awel/flow_dev_rag_show_1.png differ diff --git a/examples/agents/sandbox_code_agent_example.py b/examples/agents/sandbox_code_agent_example.py new file mode 100644 index 000000000..67cfdde1a --- /dev/null +++ b/examples/agents/sandbox_code_agent_example.py @@ -0,0 +1,305 @@ +"""Run your code assistant agent in a sandbox environment. + +This example demonstrates how to create a code assistant agent that can execute code +in a sandbox environment. The agent can execute Python and JavaScript code blocks +and provide the output to the user. The agent can also check the correctness of the +code execution results and provide feedback to the user. + + +You can limit the memory and file system resources available to the code execution +environment. The code execution environment is isolated from the host system, +preventing access to the internet and other external resources. +""" + +import asyncio +import logging +from typing import Optional, Tuple + +from dbgpt.agent import ( + Action, + ActionOutput, + AgentContext, + AgentMemory, + AgentMemoryFragment, + AgentMessage, + AgentResource, + ConversableAgent, + HybridMemory, + LLMConfig, + ProfileConfig, + UserProxyAgent, +) +from dbgpt.agent.expand.code_assistant_agent import CHECK_RESULT_SYSTEM_MESSAGE +from dbgpt.core import ModelMessageRoleType +from dbgpt.util.code_utils import UNKNOWN, extract_code, infer_lang +from dbgpt.util.string_utils import str_to_bool +from dbgpt.util.utils import colored +from dbgpt.vis.tags.vis_code import Vis, VisCode + +logger = logging.getLogger(__name__) + + +class SandboxCodeAction(Action[None]): + """Code Action Module.""" + + def __init__(self, **kwargs): + """Code action init.""" + super().__init__(**kwargs) + self._render_protocol = VisCode() + self._code_execution_config = {} + + @property + def render_protocol(self) -> Optional[Vis]: + """Return the render protocol.""" + return self._render_protocol + + async def run( + self, + ai_message: str, + resource: Optional[AgentResource] = None, + rely_action_out: Optional[ActionOutput] = None, + need_vis_render: bool = True, + **kwargs, + ) -> ActionOutput: + """Perform the action.""" + try: + code_blocks = extract_code(ai_message) + if len(code_blocks) < 1: + logger.info( + f"No executable code found in answer,{ai_message}", + ) + return ActionOutput( + is_exe_success=False, content="No executable code found in answer." + ) + elif len(code_blocks) > 1 and code_blocks[0][0] == UNKNOWN: + # found code blocks, execute code and push "last_n_messages" back + logger.info( + f"Missing available code block type, unable to execute code," + f"{ai_message}", + ) + return ActionOutput( + is_exe_success=False, + content="Missing available code block type, " + "unable to execute code.", + ) + exitcode, logs = await self.execute_code_blocks(code_blocks) + exit_success = exitcode == 0 + + content = ( + logs + if exit_success + else f"exitcode: {exitcode} (execution failed)\n {logs}" + ) + + param = { + "exit_success": exit_success, + "language": code_blocks[0][0], + "code": code_blocks, + "log": logs, + } + if not self.render_protocol: + raise NotImplementedError("The render_protocol should be implemented.") + view = await self.render_protocol.display(content=param) + return ActionOutput( + is_exe_success=exit_success, + content=content, + view=view, + thoughts=ai_message, + observations=content, + ) + except Exception as e: + logger.exception("Code Action Run Failed!") + return ActionOutput( + is_exe_success=False, content="Code execution exception," + str(e) + ) + + async def execute_code_blocks(self, code_blocks): + """Execute the code blocks and return the result.""" + from lyric import ( + PyTaskFilePerms, + PyTaskFsConfig, + PyTaskMemoryConfig, + PyTaskResourceConfig, + ) + + from dbgpt.util.code.server import get_code_server + + fs = PyTaskFsConfig( + preopens=[ + # Mount the /tmp directory to the /tmp directory in the sandbox + # Directory permissions are set to 3 (read and write) + # File permissions are set to 3 (read and write) + ("/tmp", "/tmp", 3, 3), + # Mount the current directory to the /home directory in the sandbox + # Directory and file permissions are set to 1 (read) + (".", "/home", 1, 1), + ] + ) + memory = PyTaskMemoryConfig(memory_limit=50 * 1024 * 1024) # 50MB in bytes + resources = PyTaskResourceConfig( + fs=fs, + memory=memory, + env_vars=[ + ("TEST_ENV", "hello, im an env var"), + ("TEST_ENV2", "hello, im another env var"), + ], + ) + + code_server = await get_code_server() + logs_all = "" + exitcode = -1 + for i, code_block in enumerate(code_blocks): + lang, code = code_block + if not lang: + lang = infer_lang(code) + print( + colored( + f"\n>>>>>>>> EXECUTING CODE BLOCK {i} " + f"(inferred language is {lang})...", + "red", + ), + flush=True, + ) + if lang in ["python", "Python"]: + result = await code_server.exec(code, "python", resources=resources) + exitcode = result.exit_code + logs = result.logs + elif lang in ["javascript", "JavaScript"]: + result = await code_server.exec(code, "javascript", resources=resources) + exitcode = result.exit_code + logs = result.logs + else: + # In case the language is not supported, we return an error message. + exitcode, logs = ( + 1, + f"unknown language {lang}", + ) + + logs_all += "\n" + logs + if exitcode != 0: + return exitcode, logs_all + return exitcode, logs_all + + +class SandboxCodeAssistantAgent(ConversableAgent): + """Code Assistant Agent.""" + + profile: ProfileConfig = ProfileConfig( + name="Turing", + role="CodeEngineer", + goal=( + "Solve tasks using your coding and language skills.\n" + "In the following cases, suggest python code (in a python coding block) or " + "javascript for the user to execute.\n" + " 1. When you need to collect info, use the code to output the info you " + "need, for example, get the current date/time, check the " + "operating system. After sufficient info is printed and the task is ready " + "to be solved based on your language skill, you can solve the task by " + "yourself.\n" + " 2. When you need to perform some task with code, use the code to " + "perform the task and output the result. Finish the task smartly." + ), + constraints=[ + "The user cannot provide any other feedback or perform any other " + "action beyond executing the code you suggest. The user can't modify " + "your code. So do not suggest incomplete code which requires users to " + "modify. Don't use a code block if it's not intended to be executed " + "by the user.Don't ask users to copy and paste results. Instead, " + "the 'Print' function must be used for output when relevant.", + "When using code, you must indicate the script type in the code block. " + "Please don't include multiple code blocks in one response.", + "If you receive user input that indicates an error in the code " + "execution, fix the error and output the complete code again. It is " + "recommended to use the complete code rather than partial code or " + "code changes. If the error cannot be fixed, or the task is not " + "resolved even after the code executes successfully, analyze the " + "problem, revisit your assumptions, gather additional information you " + "need from historical conversation records, and consider trying a " + "different approach.", + "Unless necessary, give priority to solving problems with python " "code.", + "The output content of the 'print' function will be passed to other " + "LLM agents as dependent data. Please control the length of the " + "output content of the 'print' function. The 'print' function only " + "outputs part of the key data information that is relied on, " + "and is as concise as possible.", + "Your code will by run in a sandbox environment(supporting python and " + "javascript), which means you can't access the internet or use any " + "libraries that are not in standard library.", + "It is prohibited to fabricate non-existent data to achieve goals.", + ], + desc=( + "Can independently write and execute python/shell code to solve various" + " problems" + ), + ) + + def __init__(self, **kwargs): + """Create a new CodeAssistantAgent instance.""" + super().__init__(**kwargs) + self._init_actions([SandboxCodeAction]) + + async def correctness_check( + self, message: AgentMessage + ) -> Tuple[bool, Optional[str]]: + """Verify whether the current execution results meet the target expectations.""" + task_goal = message.current_goal + action_report = message.action_report + if not action_report: + return False, "No execution solution results were checked" + check_result, model = await self.thinking( + messages=[ + AgentMessage( + role=ModelMessageRoleType.HUMAN, + content="Please understand the following task objectives and " + f"results and give your judgment:\n" + f"Task goal: {task_goal}\n" + f"Execution Result: {action_report.content}", + ) + ], + prompt=CHECK_RESULT_SYSTEM_MESSAGE, + ) + success = str_to_bool(check_result) + fail_reason = None + if not success: + fail_reason = ( + f"Your answer was successfully executed by the agent, but " + f"the goal cannot be completed yet. Please regenerate based on the " + f"failure reason:{check_result}" + ) + return success, fail_reason + + +async def main(): + from dbgpt.model.proxy import OpenAILLMClient + + llm_client = OpenAILLMClient(model_alias="gpt-4o-mini") + context: AgentContext = AgentContext(conv_id="test123") + agent_memory = AgentMemory(HybridMemory[AgentMemoryFragment].from_chroma()) + agent_memory.gpts_memory.init("test123") + + coder = ( + await SandboxCodeAssistantAgent() + .bind(context) + .bind(LLMConfig(llm_client=llm_client)) + .bind(agent_memory) + .build() + ) + + user_proxy = await UserProxyAgent().bind(context).bind(agent_memory).build() + + # First case: The user asks the agent to calculate 321 * 123 + await user_proxy.initiate_chat( + recipient=coder, + reviewer=user_proxy, + message="计算下321 * 123等于多少", + ) + + await user_proxy.initiate_chat( + recipient=coder, + reviewer=user_proxy, + message="Calculate 100 * 99, must use javascript code block", + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/awel/awel_flow_ui_components.py b/examples/awel/awel_flow_ui_components.py index ce411a79d..eebf2b16d 100644 --- a/examples/awel/awel_flow_ui_components.py +++ b/examples/awel/awel_flow_ui_components.py @@ -1134,7 +1134,23 @@ class ExampleFlowCodeEditorOperator(MapOperator[str, str]): ui=ui.UICodeEditor( language="python", ), - ) + ), + Parameter.build_from( + "Language", + "lang", + type=str, + optional=True, + default="python", + placeholder="Please select the language", + description="The language of the code.", + options=[ + OptionValue(label="Python", name="python", value="python"), + OptionValue( + label="JavaScript", name="javascript", value="javascript" + ), + ], + ui=ui.UISelect(), + ), ], inputs=[ IOField.build_from( @@ -1154,95 +1170,34 @@ class ExampleFlowCodeEditorOperator(MapOperator[str, str]): ], ) - def __init__(self, code: str, **kwargs): + def __init__(self, code: str, lang: str = "python", **kwargs): super().__init__(**kwargs) self.code = code + self.lang = lang async def map(self, user_name: str) -> str: """Map the user name to the code.""" - from dbgpt.util.code_utils import UNKNOWN, extract_code code = self.code - exitcode = -1 + exit_code = -1 try: - code_blocks = extract_code(self.code) - if len(code_blocks) < 1: - logger.info( - f"No executable code found in: \n{code}", - ) - raise ValueError(f"No executable code found in: \n{code}") - elif len(code_blocks) > 1 and code_blocks[0][0] == UNKNOWN: - # found code blocks, execute code and push "last_n_messages" back - logger.info( - f"Missing available code block type, unable to execute code," - f"\n{code}", - ) - raise ValueError( - "Missing available code block type, unable to execute code, " - f"\n{code}" - ) - exitcode, logs = await self.blocking_func_to_async( - self.execute_code_blocks, code_blocks - ) - # exitcode, logs = self.execute_code_blocks(code_blocks) + exit_code, logs = await self.execute_code_blocks(code, self.lang) except Exception as e: logger.error(f"Failed to execute code: {e}") logs = f"Failed to execute code: {e}" return ( - f"Your name is {user_name}, and your code is \n\n```python\n{self.code}" + f"Your name is {user_name}, and your code is \n\n```python\n{code}" f"\n\n```\n\nThe execution result is \n\n```\n{logs}\n\n```\n\n" - f"Exit code: {exitcode}." + f"Exit code: {exit_code}." ) - def execute_code_blocks(self, code_blocks): + async def execute_code_blocks(self, code_blocks: str, lang: str): """Execute the code blocks and return the result.""" - from dbgpt.util.code_utils import execute_code, infer_lang - from dbgpt.util.utils import colored - - logs_all = "" - exitcode = -1 - _code_execution_config = {"use_docker": False} - for i, code_block in enumerate(code_blocks): - lang, code = code_block - if not lang: - lang = infer_lang(code) - print( - colored( - f"\n>>>>>>>> EXECUTING CODE BLOCK {i} " - f"(inferred language is {lang})...", - "red", - ), - flush=True, - ) - if lang in ["bash", "shell", "sh"]: - exitcode, logs, image = execute_code( - code, lang=lang, **_code_execution_config - ) - elif lang in ["python", "Python"]: - if code.startswith("# filename: "): - filename = code[11 : code.find("\n")].strip() - else: - filename = None - exitcode, logs, image = execute_code( - code, - lang="python", - filename=filename, - **_code_execution_config, - ) - else: - # In case the language is not supported, we return an error message. - exitcode, logs, image = ( - 1, - f"unknown language {lang}", - None, - ) - # raise NotImplementedError - if image is not None: - _code_execution_config["use_docker"] = image - logs_all += "\n" + logs - if exitcode != 0: - return exitcode, logs_all - return exitcode, logs_all + from dbgpt.util.code.server import CodeResult, get_code_server + + code_server = await get_code_server(self.system_app) + result: CodeResult = await code_server.exec(code_blocks, lang) + return result.exit_code, result.logs class ExampleFlowDynamicParametersOperator(MapOperator[str, str]): diff --git a/setup.py b/setup.py index 2b6a9c32f..c9d13750e 100644 --- a/setup.py +++ b/setup.py @@ -517,13 +517,15 @@ def code_execution_requires(): """ pip install "dbgpt[code]" - Code execution dependencies. For building a docker image. + Code execution dependencies. """ setup_spec.extras["code"] = setup_spec.extras["core"] + [ - "pyzmq", "msgpack", # for AWEL operator serialization "cloudpickle", + "lyric-py>=0.1.4", + "lyric-py-worker>=0.1.4", + "lyric-js-worker>=0.1.4", ] @@ -723,6 +725,7 @@ def default_requires(): setup_spec.extras["default"] += setup_spec.extras["datasource"] setup_spec.extras["default"] += setup_spec.extras["torch"] setup_spec.extras["default"] += setup_spec.extras["cache"] + setup_spec.extras["default"] += setup_spec.extras["code"] if INCLUDE_QUANTIZATION: # Add quantization extra to default, default is True setup_spec.extras["default"] += setup_spec.extras["quantization"]