From 1d2a3c3dfca86eb0c2b5f50541f053bb2231b341 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Wed, 15 May 2024 13:42:32 +0530 Subject: [PATCH 01/32] wip --- core/config/__init__.py | 0 core/config/config.py | 44 ++++++++++++++++++++++++++++++++++++++ core/config/connections.py | 15 +++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 core/config/__init__.py create mode 100644 core/config/config.py create mode 100644 core/config/connections.py diff --git a/core/config/__init__.py b/core/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/config/config.py b/core/config/config.py new file mode 100644 index 0000000..4c3b5e8 --- /dev/null +++ b/core/config/config.py @@ -0,0 +1,44 @@ +import functools + +from os import getenv + +class Config: + @functools.cached_property + def mongo_url(self) -> str: + url = getenv("MONGO_URL") + if url: + return url + + host = getenv("MONGODB_HOST", "localhost") + user = getenv("MONGODB_USER", getenv("MONGODB_USERNAME", None)) + password = getenv("MONGODB_PASS", getenv("MONGODB_PASSWORD", None)) + port = getenv("MONGODB_PORT", 27017) + database = getenv("MONGODB_DATABASE", "rubra") + + if user and not password: + print("MONGODB_USER set but password not found, ignoring user") + + if not user and password: + print("MONGODB_PASSWORD set but user not found, ignoring password") + + if user and password: + return f"mongodb://{user}:{password}@{host}:${port}/{database}" + + return f"mongodb://{host}:{port}/{database}" + + @functools.cached_property + def redis_url(self) -> str: + url = getenv("REDIS_URL") + if url: + return url + + host = getenv("REDIS_HOST", "localhost") + password = getenv("REDIS_PASS", getenv("REDIS_PASSWORD", None)) + user = getenv("REDIS_USER", getenv("REDIS_USERNAME", None)) + port = getenv("REDIS_PORT", 6379) + database = getenv("REDIS_DATABASE", "rubra") + + if password: + return f"redis://{user or ''}:{password}@{host}:{port}/{database}" + + return f"redis://{host}:{port}/{database}" diff --git a/core/config/connections.py b/core/config/connections.py new file mode 100644 index 0000000..3076f34 --- /dev/null +++ b/core/config/connections.py @@ -0,0 +1,15 @@ +from functools import cached_property +from motor.motor_asyncio import AsyncIOMotorClient + +from config import Config + +class Connections: + def __init__(self): + self._config = Config() + + async def mongo_client(self): + client = AsyncIOMotorClient(self._config.mongo_url) + info = await client.server_info() + print("[MongoDB] server info: ", info) + + return client From 8322f3df37c53c246b20d49f1e3e5141c54c2e0d Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Thu, 16 May 2024 17:30:41 +0530 Subject: [PATCH 02/32] wip add test healthchecks --- core/config/__init__.py | 1 + core/config/config.py | 78 +++++++++++--------- core/config/connections.py | 15 ---- services/backend/api_server/Dockerfile | 1 + services/backend/api_server/app/backend.py | 84 ++++++++++++++-------- 5 files changed, 102 insertions(+), 77 deletions(-) delete mode 100644 core/config/connections.py diff --git a/core/config/__init__.py b/core/config/__init__.py index e69de29..27c9ec6 100644 --- a/core/config/__init__.py +++ b/core/config/__init__.py @@ -0,0 +1 @@ +from .config import * diff --git a/core/config/config.py b/core/config/config.py index 4c3b5e8..1a0f414 100644 --- a/core/config/config.py +++ b/core/config/config.py @@ -1,44 +1,58 @@ -import functools +from functools import cache from os import getenv -class Config: - @functools.cached_property - def mongo_url(self) -> str: - url = getenv("MONGO_URL") - if url: - return url +@cache +def get_mongo_database_name(): + return getenv("MONGO_DATABASE", "rubra_db") - host = getenv("MONGODB_HOST", "localhost") - user = getenv("MONGODB_USER", getenv("MONGODB_USERNAME", None)) - password = getenv("MONGODB_PASS", getenv("MONGODB_PASSWORD", None)) - port = getenv("MONGODB_PORT", 27017) - database = getenv("MONGODB_DATABASE", "rubra") +@cache +def get_mongo_url() -> str: + url = getenv("MONGO_URL") + if url: + return url - if user and not password: - print("MONGODB_USER set but password not found, ignoring user") + host = getenv("MONGODB_HOST", "localhost") + user = getenv("MONGODB_USER", getenv("MONGODB_USERNAME", None)) + password = getenv("MONGODB_PASS", getenv("MONGODB_PASSWORD", None)) + port = getenv("MONGODB_PORT", 27017) + database = getenv("MONGODB_DATABASE", "rubra") - if not user and password: - print("MONGODB_PASSWORD set but user not found, ignoring password") + if user and not password: + print("MONGODB_USER set but password not found, ignoring user") - if user and password: - return f"mongodb://{user}:{password}@{host}:${port}/{database}" + if not user and password: + print("MONGODB_PASSWORD set but user not found, ignoring password") - return f"mongodb://{host}:{port}/{database}" + if user and password: + return f"mongodb://{user}:{password}@{host}:${port}/{database}" - @functools.cached_property - def redis_url(self) -> str: - url = getenv("REDIS_URL") - if url: - return url + return f"mongodb://{host}:{port}/{database}" - host = getenv("REDIS_HOST", "localhost") - password = getenv("REDIS_PASS", getenv("REDIS_PASSWORD", None)) - user = getenv("REDIS_USER", getenv("REDIS_USERNAME", None)) - port = getenv("REDIS_PORT", 6379) - database = getenv("REDIS_DATABASE", "rubra") +@cache +def get_redis_url() -> str: + url = getenv("REDIS_URL") + if url: + return url - if password: - return f"redis://{user or ''}:{password}@{host}:{port}/{database}" + host = getenv("REDIS_HOST", "localhost") + password = getenv("REDIS_PASS", getenv("REDIS_PASSWORD", None)) + user = getenv("REDIS_USER", getenv("REDIS_USERNAME", None)) + port = getenv("REDIS_PORT", 6379) + database = getenv("REDIS_DATABASE", "rubra") - return f"redis://{host}:{port}/{database}" + if password: + return f"redis://{user or ''}:{password}@{host}:{port}/{database}" + + return f"redis://{host}:{port}/{database}" + +@cache +def get_litellm_url() -> str: + url = getenv("LITELLM_URL") + if url: + return url + + host = getenv("LITELLM_HOST") + port = getenv("LITELLM_PORT") + + return f"http://{host}:{port}" diff --git a/core/config/connections.py b/core/config/connections.py deleted file mode 100644 index 3076f34..0000000 --- a/core/config/connections.py +++ /dev/null @@ -1,15 +0,0 @@ -from functools import cached_property -from motor.motor_asyncio import AsyncIOMotorClient - -from config import Config - -class Connections: - def __init__(self): - self._config = Config() - - async def mongo_client(self): - client = AsyncIOMotorClient(self._config.mongo_url) - info = await client.server_info() - print("[MongoDB] server info: ", info) - - return client diff --git a/services/backend/api_server/Dockerfile b/services/backend/api_server/Dockerfile index 4a905e5..a9f5ee7 100644 --- a/services/backend/api_server/Dockerfile +++ b/services/backend/api_server/Dockerfile @@ -8,6 +8,7 @@ COPY . /app COPY --from=core ./ /app/core # Install any needed packages specified in requirements.txt +RUN apt-get update && apt-get install gcc g++ -y RUN pip install --no-cache-dir -r requirements.txt RUN spacy download en_core_web_sm RUN playwright install diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index 9d67d55..217f347 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -1,14 +1,17 @@ # Standard Library import asyncio +import os import json import logging -import os import uuid from datetime import datetime -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union + +from pymongo.server_api import ServerApi -# Third Party import aioredis + +# Third Party import requests from beanie import init_beanie from celery import Celery @@ -87,7 +90,7 @@ delete_docs, drop_collection, ) -from fastapi import FastAPI, Form, HTTPException, UploadFile, WebSocket +from fastapi import FastAPI, Form, HTTPException, UploadFile, WebSocket, Response, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from fastapi.websockets import WebSocketState @@ -104,9 +107,7 @@ generate_thread_id, ) -litellm_host = os.getenv("LITELLM_HOST", "localhost") -redis_host = os.getenv("REDIS_HOST", "localhost") -mongodb_host = os.getenv("MONGODB_HOST", "localhost") +import core.config as config app = FastAPI() @@ -125,24 +126,23 @@ ) # MongoDB Configurationget -MONGODB_URL = f"mongodb://{mongodb_host}:27017" -DATABASE_NAME = "rubra_db" -LITELLM_URL = f"http://{litellm_host}:8002" +LITELLM_URL = config.get_litellm_url() +LITELLM_MASTER_KEY = os.getenv("LITELLM_MASTER_KEY", "") HEADERS = {"accept": "application/json", "Content-Type": "application/json"} # Initialize MongoDB client -mongo_client = AsyncIOMotorClient(MONGODB_URL) -database = mongo_client[DATABASE_NAME] +mongo_client = AsyncIOMotorClient(config.get_mongo_url(), server_api=ServerApi("1")) +database = mongo_client[config.get_mongo_database_name()] -celery_app = Celery(broker=f"redis://{redis_host}:6379/0") +celery_app = Celery(config.get_redis_url()) -logging.basicConfig(level=logging.INFO) +redis = aioredis.from_url(config.get_redis_url()) +logging.basicConfig(level=logging.INFO) def get_database(): return database - @app.on_event("startup") async def on_startup(): await init_beanie( @@ -202,9 +202,6 @@ async def on_startup(): @app.get("/get_api_key_status", tags=["API Keys"]) async def get_api_key_status(): try: - redis = await aioredis.from_url( - f"redis://{redis_host}:6379/0", encoding="utf-8", decode_responses=True - ) openai_key = await redis.get("OPENAI_API_KEY") anthropic_key = await redis.get("ANTHROPIC_API_KEY") @@ -225,10 +222,6 @@ async def get_api_key_status(): @app.post("/set_api_keys", tags=["API Keys"]) async def set_api_key_status(api_keys: ApiKeysUpdateModel): try: - redis = await aioredis.from_url( - f"redis://{redis_host}:6379/0", encoding="utf-8", decode_responses=True - ) - logging.info("Setting API keys") logging.info(api_keys) @@ -751,9 +744,6 @@ async def list_messages( async def redis_subscriber(channel, timeout=1): logging.info(f"Connecting to Redis and subscribing to channel: {channel}") - redis = await aioredis.from_url( - f"redis://{redis_host}:6379/0", encoding="utf-8", decode_responses=True - ) pubsub = redis.pubsub() await pubsub.subscribe(channel) @@ -778,12 +768,8 @@ async def listen_for_task_status( task_status_channel, status_update_event, thread_id, run_id ): logging.info(f"Listening for task status on channel: {task_status_channel}") - redis = None pubsub = None try: - redis = await aioredis.from_url( - f"redis://{redis_host}:6379/0", encoding="utf-8", decode_responses=True - ) pubsub = redis.pubsub() await pubsub.subscribe(task_status_channel) @@ -1023,7 +1009,7 @@ def convert_model_info_to_oai_model(obj, predefined_models): def litellm_list_model() -> ListModelsResponse: try: - client = OpenAI(base_url=LITELLM_URL, api_key="abc") + client = OpenAI(base_url=LITELLM_URL, api_key=LITELLM_MASTER_KEY) models_data = client.models.list().data models_data = sorted(models_data, key=lambda x: x.id) predefined_models = [convert_to_model(m) for m in models_data] @@ -1589,6 +1575,44 @@ async def chat_completion(body: CreateChatCompletionRequest): else: return response +async def perform_health_check(*, readiness=False) -> None: + await redis.ping() + await mongo_client.admin.command("ping") + + response: Union[requests.Response, None] + + healthy = False + + if readiness: + response = requests.get(f"{LITELLM_URL}/health/readiness", { + }) + + healthy = response.json().get("status", "") == "healthy" + else: + response = requests.get(f"{LITELLM_URL}/health/liveliness", { + }) + + healthy = response.content == "I'm alive!" + + if not healthy: + raise Exception("litellm not ready: " + str(response.json())) + + +@app.get("/healthz/readiness", status_code=status.HTTP_204_NO_CONTENT) +async def check_readiness(response: Response) -> None: + try: + await perform_health_check(readiness=True) + except Exception as e: + print("error checking for health:", e) + response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE + +@app.get("/healthz/liveness", status_code=status.HTTP_204_NO_CONTENT) +async def check_liveness(response: Response) -> None: + try: + await perform_health_check(readiness=True) + except Exception as e: + print("error checking for health:", e) + response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE def data_generator(response): """ From cff24eddbcf562127fe3f4e4ac0aff915182e963 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Thu, 16 May 2024 19:27:06 +0530 Subject: [PATCH 03/32] wip healthz endpoints --- services/backend/api_server/Dockerfile | 11 ++-- services/backend/api_server/app/backend.py | 62 +++++++++++----------- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/services/backend/api_server/Dockerfile b/services/backend/api_server/Dockerfile index a9f5ee7..eaea120 100644 --- a/services/backend/api_server/Dockerfile +++ b/services/backend/api_server/Dockerfile @@ -3,17 +3,20 @@ FROM python:3.10.7-slim # Set the working directory in the container to /app WORKDIR /app -# Add the current directory contents into the container at /app -COPY . /app -COPY --from=core ./ /app/core +RUN apt-get update && apt-get install gcc g++ -y + +COPY requirements.txt /app # Install any needed packages specified in requirements.txt -RUN apt-get update && apt-get install gcc g++ -y RUN pip install --no-cache-dir -r requirements.txt RUN spacy download en_core_web_sm RUN playwright install RUN playwright install-deps +# Add the current directory contents into the container at /app +COPY . /app +COPY --from=core ./ /app/core + # Make port 80 available to the world outside this container EXPOSE 8000 diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index 217f347..7aacad3 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -5,7 +5,7 @@ import logging import uuid from datetime import datetime -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional, Union, Callable from pymongo.server_api import ServerApi @@ -1575,44 +1575,46 @@ async def chat_completion(body: CreateChatCompletionRequest): else: return response -async def perform_health_check(*, readiness=False) -> None: - await redis.ping() - await mongo_client.admin.command("ping") - - response: Union[requests.Response, None] - - healthy = False +def _health_endpoint_wrapper(fn: Callable): + async def _health(response: Response): + try: + await redis.ping() + await mongo_client.admin.command("ping") - if readiness: - response = requests.get(f"{LITELLM_URL}/health/readiness", { - }) + if asyncio.iscoroutinefunction(fn): + await fn() + else: + fn() + except Exception as e: + print("error checking for health:", e) + response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE + return {"status": "not healthy"} - healthy = response.json().get("status", "") == "healthy" - else: - response = requests.get(f"{LITELLM_URL}/health/liveliness", { - }) + return _health - healthy = response.content == "I'm alive!" +@app.get("/healthz/readiness", status_code=status.HTTP_204_NO_CONTENT) +@_health_endpoint_wrapper +def is_litellm_ready() -> None: + response = requests.get(f"{LITELLM_URL}/health/readiness", { }) - if not healthy: + if response.json().get("status", "") != "healthy": raise Exception("litellm not ready: " + str(response.json())) + # it is important to make sure auth is working + response = requests.get(f"{LITELLM_URL}/health", headers={ "Authorization": f"Bearer {LITELLM_MASTER_KEY}" }) + + if not response.ok: + raise Exception("could not grab litellm health: "+response.text) -@app.get("/healthz/readiness", status_code=status.HTTP_204_NO_CONTENT) -async def check_readiness(response: Response) -> None: - try: - await perform_health_check(readiness=True) - except Exception as e: - print("error checking for health:", e) - response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE @app.get("/healthz/liveness", status_code=status.HTTP_204_NO_CONTENT) -async def check_liveness(response: Response) -> None: - try: - await perform_health_check(readiness=True) - except Exception as e: - print("error checking for health:", e) - response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE +@_health_endpoint_wrapper +def is_litellm_healthy() -> None: + response = requests.get(f"{LITELLM_URL}/health/liveliness", { }) + + if response.text != "\"I'm alive!\"": + raise Exception("litellm not healthy: " + response.content.decode()) + def data_generator(response): """ From 6f62b7c290ef388aeef7fa3a78e8d602c07bbb09 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Thu, 16 May 2024 19:37:45 +0530 Subject: [PATCH 04/32] centralize configs for task-executor --- core/tasks/celery_config.py | 7 ------- core/tasks/tasks.py | 29 +++++++++++++---------------- 2 files changed, 13 insertions(+), 23 deletions(-) delete mode 100644 core/tasks/celery_config.py diff --git a/core/tasks/celery_config.py b/core/tasks/celery_config.py deleted file mode 100644 index 2e8e6b1..0000000 --- a/core/tasks/celery_config.py +++ /dev/null @@ -1,7 +0,0 @@ -# Standard Library -import os - -CELERY_REDIS_HOST = os.getenv("REDIS_HOST", "localhost") -BROKER_URL = f"redis://{CELERY_REDIS_HOST}:6379/0" # Redis configuration -CELERY_RESULT_BACKEND = f"redis://{CELERY_REDIS_HOST}:6379/0" -CELERY_IMPORTS = ("core.tasks.tasks",) diff --git a/core/tasks/tasks.py b/core/tasks/tasks.py index 3bd6eba..114a7ed 100644 --- a/core/tasks/tasks.py +++ b/core/tasks/tasks.py @@ -5,6 +5,8 @@ import sys from functools import partial +from typing import cast + # Get the current working directory current_directory = os.getcwd() @@ -31,33 +33,28 @@ from openai import OpenAI from pymongo import MongoClient -litellm_host = os.getenv("LITELLM_HOST", "localhost") -redis_host = os.getenv("REDIS_HOST", "localhost") -mongodb_host = os.getenv("MONGODB_HOST", "localhost") +import core.config as config + +redis_client = cast(redis.Redis, redis.Redis.from_url(config.get_redis_url())) # annoyingly from_url returns None, not Self +app = Celery("tasks", broker=config.get_redis_url()) -redis_client = redis.Redis(host=redis_host, port=6379, db=0) -app = Celery("tasks", broker=f"redis://{redis_host}:6379/0") app.config_from_object("core.tasks.celery_config") app.autodiscover_tasks(["core.tasks"]) # Explicitly discover tasks in 'app' package -# MongoDB Configuration -MONGODB_URL = f"mongodb://{mongodb_host}:27017" -DATABASE_NAME = "rubra_db" - # Global MongoDB client -mongo_client = None +mongo_client: MongoClient @signals.worker_process_init.connect def setup_mongo_connection(*args, **kwargs): global mongo_client - mongo_client = MongoClient(f"mongodb://{mongodb_host}:27017") + mongo_client = MongoClient(config.get_mongo_url()) def create_assistant_message( thread_id, assistant_id, run_id, content_text, role=Role7.assistant.value ): - db = mongo_client[DATABASE_NAME] + db = mongo_client[config.get_mongo_database_name()] # Generate a unique ID for the message message_id = f"msg_{uuid.uuid4().hex[:6]}" @@ -216,10 +213,10 @@ def form_openai_tools(tools, assistant_id: str): def execute_chat_completion(assistant_id, thread_id, redis_channel, run_id): try: oai_client = OpenAI( - base_url=f"http://{litellm_host}:8002/v1/", - api_key="abc", # point to litellm server + base_url=config.get_litellm_url(), + api_key=os.getenv("LITELLM_MASTER_KEY"), # point to litellm server ) - db = mongo_client[DATABASE_NAME] + db = mongo_client[config.get_mongo_database_name()] # Fetch assistant and thread messages synchronously assistant = db.assistants.find_one({"id": assistant_id}) @@ -461,7 +458,7 @@ def execute_asst_file_create(file_id: str, assistant_id: str): from langchain.text_splitter import RecursiveCharacterTextSplitter try: - db = mongo_client[DATABASE_NAME] + db = mongo_client[config.get_mongo_database_name()] collection_name = assistant_id text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) parsed_text = "" From c89cdd816221ce6e0072330fda7e15e7a0b20c70 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Thu, 16 May 2024 20:34:26 +0530 Subject: [PATCH 05/32] vectordb changes --- core/config/config.py | 12 ++++++++++++ core/tools/knowledge/file_knowledge_tool.py | 8 +++----- core/tools/knowledge/vector_db/milvus/operations.py | 8 ++++---- .../tools/knowledge/vector_db/milvus/query_milvus.py | 3 +++ services/backend/vector_db_api/main.py | 5 ----- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/core/config/config.py b/core/config/config.py index 1a0f414..207d7df 100644 --- a/core/config/config.py +++ b/core/config/config.py @@ -56,3 +56,15 @@ def get_litellm_url() -> str: port = getenv("LITELLM_PORT") return f"http://{host}:{port}" + +@cache +def get_vector_db_url() -> str: + url = getenv("VECTOR_DB_URL") + if url: + return url + + host = getenv("VECTOR_DB_HOST", "localhost") + port = getenv("VECTOR_DB_PORT", 8010) + name = getenv("VECTOR_DB_NAME", "similarity_search") + + return f"http://{host}:{port}/{name}" diff --git a/core/tools/knowledge/file_knowledge_tool.py b/core/tools/knowledge/file_knowledge_tool.py index fc62637..aaa92b1 100644 --- a/core/tools/knowledge/file_knowledge_tool.py +++ b/core/tools/knowledge/file_knowledge_tool.py @@ -2,13 +2,11 @@ import json import os +import core.config as config + # Third Party import requests -VECTOR_DB_HOST = os.getenv("VECTOR_DB_HOST", "localhost") -VECTOR_DB_MATCH_URL = f"http://{VECTOR_DB_HOST}:8010/similarity_match" - - class FileKnowledgeTool: name = "FileKnowledge" description = "Useful for search knowledge or information from user's file" @@ -42,7 +40,7 @@ def file_knowledge_search_api(query: str, assistant_id: str): } ) - response = requests.post(VECTOR_DB_MATCH_URL, headers=headers, data=data) + response = requests.post(config.get_vector_db_url(), headers=headers, data=data) res = response.json()["response"] txt = "" for r in res: diff --git a/core/tools/knowledge/vector_db/milvus/operations.py b/core/tools/knowledge/vector_db/milvus/operations.py index 430f29f..5f6a425 100644 --- a/core/tools/knowledge/vector_db/milvus/operations.py +++ b/core/tools/knowledge/vector_db/milvus/operations.py @@ -33,10 +33,10 @@ def load_collection(collection_name: str) -> Milvus: embedding_function=CustomEmbeddings(), collection_name=collection_name, connection_args={ - "host": MILVUS_HOST, - "port": "19530", - "user": "username", - "password": "password", + "host": os.getenv("MILVUS_HOST", "localhost"), + "port": os.getenv("MILVUS_PORT", "19530"), + "user": os.getenv("MILVUS_USER", os.getenv("MILVUS_USERNAME", "")), + "password": os.getenv("MILVUS_PASS", os.getenv("MILVUS_PASSWORD", "")), }, index_params={ "metric_type": "IP", diff --git a/core/tools/knowledge/vector_db/milvus/query_milvus.py b/core/tools/knowledge/vector_db/milvus/query_milvus.py index 4e7779c..f35529e 100644 --- a/core/tools/knowledge/vector_db/milvus/query_milvus.py +++ b/core/tools/knowledge/vector_db/milvus/query_milvus.py @@ -131,6 +131,9 @@ def __init__( # Create the connection to the server if connection_args is None: connection_args = DEFAULT_MILVUS_CONNECTION + else: + # fill anything not passed like "default" port + connection_args = {**DEFAULT_MILVUS_CONNECTION, **connection_args} self.alias = self._create_connection_alias(connection_args) self.col: Optional[Collection] = None diff --git a/services/backend/vector_db_api/main.py b/services/backend/vector_db_api/main.py index 152d832..9b683d8 100644 --- a/services/backend/vector_db_api/main.py +++ b/services/backend/vector_db_api/main.py @@ -19,11 +19,6 @@ app = FastAPI() -@app.on_event("startup") -async def app_startup(): - pass - - @app.post("/add_texts") async def add_texts_embeddings( collection_name: str, From ba2fedbd333da596e935f30aeec007201a480716 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Thu, 16 May 2024 21:05:00 +0530 Subject: [PATCH 06/32] fix ordering of unpacking stupid --- core/tools/knowledge/vector_db/milvus/query_milvus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/tools/knowledge/vector_db/milvus/query_milvus.py b/core/tools/knowledge/vector_db/milvus/query_milvus.py index f35529e..8315754 100644 --- a/core/tools/knowledge/vector_db/milvus/query_milvus.py +++ b/core/tools/knowledge/vector_db/milvus/query_milvus.py @@ -133,7 +133,7 @@ def __init__( connection_args = DEFAULT_MILVUS_CONNECTION else: # fill anything not passed like "default" port - connection_args = {**DEFAULT_MILVUS_CONNECTION, **connection_args} + connection_args = {**connection_args, **DEFAULT_MILVUS_CONNECTION} self.alias = self._create_connection_alias(connection_args) self.col: Optional[Collection] = None From 0773853924a06494f31b41d6b157c4df6fc3d9d2 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Sun, 19 May 2024 22:18:21 +0530 Subject: [PATCH 07/32] remove cache --- core/config/config.py | 17 ++++++++++------- core/tasks/tasks.py | 16 ++++++++-------- core/tools/knowledge/file_knowledge_tool.py | 4 ++-- services/backend/api_server/app/backend.py | 12 ++++++------ 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/core/config/config.py b/core/config/config.py index 207d7df..ac994e6 100644 --- a/core/config/config.py +++ b/core/config/config.py @@ -1,12 +1,8 @@ -from functools import cache - from os import getenv -@cache def get_mongo_database_name(): return getenv("MONGO_DATABASE", "rubra_db") -@cache def get_mongo_url() -> str: url = getenv("MONGO_URL") if url: @@ -29,7 +25,6 @@ def get_mongo_url() -> str: return f"mongodb://{host}:{port}/{database}" -@cache def get_redis_url() -> str: url = getenv("REDIS_URL") if url: @@ -46,7 +41,6 @@ def get_redis_url() -> str: return f"redis://{host}:{port}/{database}" -@cache def get_litellm_url() -> str: url = getenv("LITELLM_URL") if url: @@ -57,7 +51,6 @@ def get_litellm_url() -> str: return f"http://{host}:{port}" -@cache def get_vector_db_url() -> str: url = getenv("VECTOR_DB_URL") if url: @@ -68,3 +61,13 @@ def get_vector_db_url() -> str: name = getenv("VECTOR_DB_NAME", "similarity_search") return f"http://{host}:{port}/{name}" + +mongo_database = get_mongo_database_name() + +mongo_url = get_mongo_url() + +litellm_url = get_litellm_url() + +vector_db_url = get_vector_db_url() + +redis_url = get_redis_url() diff --git a/core/tasks/tasks.py b/core/tasks/tasks.py index 114a7ed..0716159 100644 --- a/core/tasks/tasks.py +++ b/core/tasks/tasks.py @@ -33,10 +33,10 @@ from openai import OpenAI from pymongo import MongoClient -import core.config as config +import core.config as configs -redis_client = cast(redis.Redis, redis.Redis.from_url(config.get_redis_url())) # annoyingly from_url returns None, not Self -app = Celery("tasks", broker=config.get_redis_url()) +redis_client = cast(redis.Redis, redis.Redis.from_url(configs.get_redis_url())) # annoyingly from_url returns None, not Self +app = Celery("tasks", broker=configs.get_redis_url()) app.config_from_object("core.tasks.celery_config") app.autodiscover_tasks(["core.tasks"]) # Explicitly discover tasks in 'app' package @@ -48,13 +48,13 @@ @signals.worker_process_init.connect def setup_mongo_connection(*args, **kwargs): global mongo_client - mongo_client = MongoClient(config.get_mongo_url()) + mongo_client = MongoClient(configs.get_mongo_url()) def create_assistant_message( thread_id, assistant_id, run_id, content_text, role=Role7.assistant.value ): - db = mongo_client[config.get_mongo_database_name()] + db = mongo_client[configs.get_mongo_database_name()] # Generate a unique ID for the message message_id = f"msg_{uuid.uuid4().hex[:6]}" @@ -213,10 +213,10 @@ def form_openai_tools(tools, assistant_id: str): def execute_chat_completion(assistant_id, thread_id, redis_channel, run_id): try: oai_client = OpenAI( - base_url=config.get_litellm_url(), + base_url=configs.get_litellm_url(), api_key=os.getenv("LITELLM_MASTER_KEY"), # point to litellm server ) - db = mongo_client[config.get_mongo_database_name()] + db = mongo_client[configs.get_mongo_database_name()] # Fetch assistant and thread messages synchronously assistant = db.assistants.find_one({"id": assistant_id}) @@ -458,7 +458,7 @@ def execute_asst_file_create(file_id: str, assistant_id: str): from langchain.text_splitter import RecursiveCharacterTextSplitter try: - db = mongo_client[config.get_mongo_database_name()] + db = mongo_client[configs.get_mongo_database_name()] collection_name = assistant_id text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) parsed_text = "" diff --git a/core/tools/knowledge/file_knowledge_tool.py b/core/tools/knowledge/file_knowledge_tool.py index aaa92b1..fbe4e07 100644 --- a/core/tools/knowledge/file_knowledge_tool.py +++ b/core/tools/knowledge/file_knowledge_tool.py @@ -2,7 +2,7 @@ import json import os -import core.config as config +import core.config as configs # Third Party import requests @@ -40,7 +40,7 @@ def file_knowledge_search_api(query: str, assistant_id: str): } ) - response = requests.post(config.get_vector_db_url(), headers=headers, data=data) + response = requests.post(configs.vector_db_url, headers=headers, data=data) res = response.json()["response"] txt = "" for r in res: diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index 7aacad3..0ee0350 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -107,7 +107,7 @@ generate_thread_id, ) -import core.config as config +import core.config as configs app = FastAPI() @@ -126,17 +126,17 @@ ) # MongoDB Configurationget -LITELLM_URL = config.get_litellm_url() +LITELLM_URL = configs.litellm_url LITELLM_MASTER_KEY = os.getenv("LITELLM_MASTER_KEY", "") HEADERS = {"accept": "application/json", "Content-Type": "application/json"} # Initialize MongoDB client -mongo_client = AsyncIOMotorClient(config.get_mongo_url(), server_api=ServerApi("1")) -database = mongo_client[config.get_mongo_database_name()] +mongo_client = AsyncIOMotorClient(configs.mongo_url, server_api=ServerApi("1")) +database = mongo_client[configs.mongo_url] -celery_app = Celery(config.get_redis_url()) +celery_app = Celery(configs.redis_url) -redis = aioredis.from_url(config.get_redis_url()) +redis = aioredis.from_url(configs.redis_url) logging.basicConfig(level=logging.INFO) From e3cfd12777753e8415978947efc9e97cbf5a794c Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 00:03:07 +0530 Subject: [PATCH 08/32] make it so we connect to milvus before any actions previously milvus connection was dependent on some action, function call. for any misconfiguration this means application will start fine without any sign of errors until its late. --- core/tasks/tasks.py | 18 +++++++++------- .../knowledge/vector_db/milvus/operations.py | 14 ++++++------- .../vector_db/milvus/query_milvus.py | 21 +++++++++++++------ services/backend/task_executor/Dockerfile | 2 +- services/backend/vector_db_api/main.py | 4 ++++ 5 files changed, 37 insertions(+), 22 deletions(-) diff --git a/core/tasks/tasks.py b/core/tasks/tasks.py index 0716159..6e04023 100644 --- a/core/tasks/tasks.py +++ b/core/tasks/tasks.py @@ -35,8 +35,8 @@ import core.config as configs -redis_client = cast(redis.Redis, redis.Redis.from_url(configs.get_redis_url())) # annoyingly from_url returns None, not Self -app = Celery("tasks", broker=configs.get_redis_url()) +redis_client = cast(redis.Redis, redis.Redis.from_url(configs.redis_url)) # annoyingly from_url returns None, not Self +app = Celery("tasks", broker=configs.redis_url) app.config_from_object("core.tasks.celery_config") app.autodiscover_tasks(["core.tasks"]) # Explicitly discover tasks in 'app' package @@ -46,15 +46,17 @@ @signals.worker_process_init.connect -def setup_mongo_connection(*args, **kwargs): +async def setup_mongo_connection(*args, **kwargs): global mongo_client - mongo_client = MongoClient(configs.get_mongo_url()) + mongo_client = MongoClient(configs.mongo_url) + mongo_client.admin.command("ping") + await redis_client.ping() def create_assistant_message( thread_id, assistant_id, run_id, content_text, role=Role7.assistant.value ): - db = mongo_client[configs.get_mongo_database_name()] + db = mongo_client[configs.mongo_database] # Generate a unique ID for the message message_id = f"msg_{uuid.uuid4().hex[:6]}" @@ -213,10 +215,10 @@ def form_openai_tools(tools, assistant_id: str): def execute_chat_completion(assistant_id, thread_id, redis_channel, run_id): try: oai_client = OpenAI( - base_url=configs.get_litellm_url(), + base_url=configs.litellm_url, api_key=os.getenv("LITELLM_MASTER_KEY"), # point to litellm server ) - db = mongo_client[configs.get_mongo_database_name()] + db = mongo_client[configs.mongo_database] # Fetch assistant and thread messages synchronously assistant = db.assistants.find_one({"id": assistant_id}) @@ -458,7 +460,7 @@ def execute_asst_file_create(file_id: str, assistant_id: str): from langchain.text_splitter import RecursiveCharacterTextSplitter try: - db = mongo_client[configs.get_mongo_database_name()] + db = mongo_client[configs.mongo_database] collection_name = assistant_id text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) parsed_text = "" diff --git a/core/tools/knowledge/vector_db/milvus/operations.py b/core/tools/knowledge/vector_db/milvus/operations.py index 5f6a425..63e5f6a 100644 --- a/core/tools/knowledge/vector_db/milvus/operations.py +++ b/core/tools/knowledge/vector_db/milvus/operations.py @@ -15,6 +15,12 @@ top_re_rank = 5 top_k_match = 10 +milvus_connection_alias = Milvus.create_connection_alias({ + "host": os.getenv("MILVUS_HOST", "localhost"), + "port": os.getenv("MILVUS_PORT", "19530"), + "user": os.getenv("MILVUS_USER", os.getenv("MILVUS_USERNAME", "")), + "password": os.getenv("MILVUS_PASS", os.getenv("MILVUS_PASSWORD", "")) +}) class Query(BaseModel): text: str @@ -27,17 +33,11 @@ class Query(BaseModel): def drop_collection(collection_name: str): load_collection(collection_name).drop_collection() - def load_collection(collection_name: str) -> Milvus: return Milvus( embedding_function=CustomEmbeddings(), collection_name=collection_name, - connection_args={ - "host": os.getenv("MILVUS_HOST", "localhost"), - "port": os.getenv("MILVUS_PORT", "19530"), - "user": os.getenv("MILVUS_USER", os.getenv("MILVUS_USERNAME", "")), - "password": os.getenv("MILVUS_PASS", os.getenv("MILVUS_PASSWORD", "")), - }, + alias=milvus_connection_alias, index_params={ "metric_type": "IP", "index_type": "FLAT", diff --git a/core/tools/knowledge/vector_db/milvus/query_milvus.py b/core/tools/knowledge/vector_db/milvus/query_milvus.py index 8315754..35b6677 100644 --- a/core/tools/knowledge/vector_db/milvus/query_milvus.py +++ b/core/tools/knowledge/vector_db/milvus/query_milvus.py @@ -32,6 +32,7 @@ def __init__( embedding_function: Embeddings, collection_name: str = "DefaultCollection", connection_args: Optional[Dict[str, Any]] = None, + alias: Optional[str] = None, consistency_level: str = "Session", index_params: Optional[Dict[str, Any]] = None, search_params: Optional[Dict[str, Any]] = None, @@ -129,12 +130,13 @@ def __init__( self._vector_field = "vector" self.fields: list[str] = [] # Create the connection to the server - if connection_args is None: - connection_args = DEFAULT_MILVUS_CONNECTION + if alias is not None: + self.alias = alias + elif connection_args is not None: + self.alias = Milvus.create_connection_alias(connection_args) else: - # fill anything not passed like "default" port - connection_args = {**connection_args, **DEFAULT_MILVUS_CONNECTION} - self.alias = self._create_connection_alias(connection_args) + raise ValueError('alias or connection_args must be passed to Milvus construtor') + self.col: Optional[Collection] = None # Grab the existing colection if it exists @@ -157,11 +159,18 @@ def drop_collection(self): utility.drop_collection(collection_name=self.collection_name, using=self.alias) - def _create_connection_alias(self, connection_args: dict) -> str: + @staticmethod + def create_connection_alias(connection_args: dict) -> str: """Create the connection to the Milvus server.""" # Third Party from pymilvus import MilvusException, connections + if connection_args is None: + connection_args = DEFAULT_MILVUS_CONNECTION + else: + # fill anything not passed like "default" port + connection_args = {**connection_args, **DEFAULT_MILVUS_CONNECTION} + # Grab the connection arguments that are used for checking existing connection host: str = connection_args.get("host", None) port: Union[str, int] = connection_args.get("port", None) diff --git a/services/backend/task_executor/Dockerfile b/services/backend/task_executor/Dockerfile index 4844f47..2c6f42f 100644 --- a/services/backend/task_executor/Dockerfile +++ b/services/backend/task_executor/Dockerfile @@ -8,7 +8,7 @@ ADD requirements.txt /app/ # Install any needed packages specified in requirements.txt RUN pip install --no-cache-dir -r requirements.txt -RUN spacy download en_core_web_sm +# RUN spacy download en_core_web_sm RUN playwright install RUN playwright install-deps diff --git a/services/backend/vector_db_api/main.py b/services/backend/vector_db_api/main.py index 9b683d8..80dcf4c 100644 --- a/services/backend/vector_db_api/main.py +++ b/services/backend/vector_db_api/main.py @@ -48,3 +48,7 @@ def text_similarity_match(query: Query): @app.get("/ping") def ping(): return {"response": "Pong!"} + +@app.get("/healthz") +def healthcheck(): + pass From 4a31840099ea49d4be056611d5c037d5aaa7db93 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 00:22:08 +0530 Subject: [PATCH 09/32] health endpoint for vectordb api --- services/backend/vector_db_api/main.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/backend/vector_db_api/main.py b/services/backend/vector_db_api/main.py index 80dcf4c..5fc8df2 100644 --- a/services/backend/vector_db_api/main.py +++ b/services/backend/vector_db_api/main.py @@ -10,9 +10,12 @@ drop_collection, get_similar_match, load_collection, + milvus_connection_alias, ) from fastapi import FastAPI +from pymilvus import connections + model = {} top_re_rank = 5 top_k_match = 10 @@ -51,4 +54,4 @@ def ping(): @app.get("/healthz") def healthcheck(): - pass + connections.connect(alias=milvus_connection_alias) # alias makes sure pool isn't filled with random junk connections From 63887f811e67871b9070ab7fd0556704980af7fc Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 00:27:44 +0530 Subject: [PATCH 10/32] status codes for healthz --- services/backend/vector_db_api/main.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/services/backend/vector_db_api/main.py b/services/backend/vector_db_api/main.py index 5fc8df2..9a888de 100644 --- a/services/backend/vector_db_api/main.py +++ b/services/backend/vector_db_api/main.py @@ -12,9 +12,9 @@ load_collection, milvus_connection_alias, ) -from fastapi import FastAPI +from fastapi import FastAPI, status, Response -from pymilvus import connections +from pymilvus import connections, MilvusException model = {} top_re_rank = 5 @@ -52,6 +52,10 @@ def text_similarity_match(query: Query): def ping(): return {"response": "Pong!"} -@app.get("/healthz") -def healthcheck(): - connections.connect(alias=milvus_connection_alias) # alias makes sure pool isn't filled with random junk connections +@app.get("/healthz", status_code=status.HTTP_204_NO_CONTENT) +def healthcheck(response: Response): + try: + connections.connect(alias=milvus_connection_alias) # alias makes sure pool isn't filled with random junk connections + except MilvusException as e: + print("failed to maintain connection with milvus: ", e) + response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE From 0ed071fea00145f680ef131a06d335e77fdcdf50 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 18:48:03 +0530 Subject: [PATCH 11/32] run workflow for this branch --- .github/workflows/tag-release.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/tag-release.yaml b/.github/workflows/tag-release.yaml index 3102a57..2de561c 100644 --- a/.github/workflows/tag-release.yaml +++ b/.github/workflows/tag-release.yaml @@ -4,6 +4,8 @@ on: push: tags: - "v*.*.*" + branches: + - modifications jobs: build-and-push: From 48390dfe84ef062fa9380a6eafe1e397a4649c63 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 18:52:27 +0530 Subject: [PATCH 12/32] trigger workflow From d009cde7f3beafe89cb3d61f395cd36a12b379e0 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 19:09:59 +0530 Subject: [PATCH 13/32] temp --- .github/workflows/tag-release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tag-release.yaml b/.github/workflows/tag-release.yaml index 2de561c..e6676cb 100644 --- a/.github/workflows/tag-release.yaml +++ b/.github/workflows/tag-release.yaml @@ -71,7 +71,7 @@ jobs: - name: get release version id: get_release_version - run: echo "TAG=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV + run: echo "TAG=${GITHUB_REF#refs/heads/}" >> $GITHUB_ENV - name: get release id id: get_release_id From 5589458771ca73abb30a6dceb83f84e3de783d7e Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 19:11:29 +0530 Subject: [PATCH 14/32] what i forgot --- .github/workflows/tag-release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tag-release.yaml b/.github/workflows/tag-release.yaml index e6676cb..7eb9d5b 100644 --- a/.github/workflows/tag-release.yaml +++ b/.github/workflows/tag-release.yaml @@ -32,7 +32,7 @@ jobs: - name: Build and Push Docker Images run: | - TAG=${GITHUB_REF#refs/tags/} make build_and_push_images + TAG=${GITHUB_REF#refs/heads/} make build_and_push_images env: REGISTRY: ghcr.io ORG: ${{ github.repository_owner }} From 8a98fa43e6acb58e1d688fc59bfb6169086281a6 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 19:13:03 +0530 Subject: [PATCH 15/32] main-release --- .github/workflows/main-release.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/main-release.yaml b/.github/workflows/main-release.yaml index 90c83ac..265eafa 100644 --- a/.github/workflows/main-release.yaml +++ b/.github/workflows/main-release.yaml @@ -8,6 +8,7 @@ on: push: branches: - main + - modifications jobs: build-and-push: From e2b437ddd3a26dd7ee6b19b1ce74a7f5278786cf Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 19:16:26 +0530 Subject: [PATCH 16/32] dont run this workflow --- .github/workflows/tag-release.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/tag-release.yaml b/.github/workflows/tag-release.yaml index 7eb9d5b..7411b8a 100644 --- a/.github/workflows/tag-release.yaml +++ b/.github/workflows/tag-release.yaml @@ -4,8 +4,6 @@ on: push: tags: - "v*.*.*" - branches: - - modifications jobs: build-and-push: From 81210ea6d05fa0f84be2d0ddf3134b59d194f206 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 20 May 2024 19:16:53 +0530 Subject: [PATCH 17/32] change to new branch name --- .github/workflows/main-release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main-release.yaml b/.github/workflows/main-release.yaml index 265eafa..7f4ebc7 100644 --- a/.github/workflows/main-release.yaml +++ b/.github/workflows/main-release.yaml @@ -8,7 +8,7 @@ on: push: branches: - main - - modifications + - rc-modifications jobs: build-and-push: From 3d34d60529140d0b7173eaf1768e57ab04340e64 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 14:32:11 +0530 Subject: [PATCH 18/32] ... --- core/config/config.py | 15 +++++++++++++-- core/tools/knowledge/file_knowledge_tool.py | 5 +++-- .../vector_db/milvus/custom_embeddigs.py | 6 +++--- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/core/config/config.py b/core/config/config.py index ac994e6..00d4d42 100644 --- a/core/config/config.py +++ b/core/config/config.py @@ -58,9 +58,18 @@ def get_vector_db_url() -> str: host = getenv("VECTOR_DB_HOST", "localhost") port = getenv("VECTOR_DB_PORT", 8010) - name = getenv("VECTOR_DB_NAME", "similarity_search") - return f"http://{host}:{port}/{name}" + return f"http://{host}:{port}" + +def get_embedding_url(): + url = getenv("EMBEDDING_URL") + if url: + return url + + host = getenv("EMBEDDING_HOST", "localhost") + port = getenv("EMBEDDING_PORT", 8020) + + return f"http://{host}:{port}" mongo_database = get_mongo_database_name() @@ -71,3 +80,5 @@ def get_vector_db_url() -> str: vector_db_url = get_vector_db_url() redis_url = get_redis_url() + +embedding_url = get_embedding_url() diff --git a/core/tools/knowledge/file_knowledge_tool.py b/core/tools/knowledge/file_knowledge_tool.py index fbe4e07..105cb11 100644 --- a/core/tools/knowledge/file_knowledge_tool.py +++ b/core/tools/knowledge/file_knowledge_tool.py @@ -1,12 +1,13 @@ # Standard Library import json -import os import core.config as configs # Third Party import requests +vector_db_url = f"{configs.vector_db_url}/similarity_search" + class FileKnowledgeTool: name = "FileKnowledge" description = "Useful for search knowledge or information from user's file" @@ -40,7 +41,7 @@ def file_knowledge_search_api(query: str, assistant_id: str): } ) - response = requests.post(configs.vector_db_url, headers=headers, data=data) + response = requests.post(vector_db_url, headers=headers, data=data) res = response.json()["response"] txt = "" for r in res: diff --git a/core/tools/knowledge/vector_db/milvus/custom_embeddigs.py b/core/tools/knowledge/vector_db/milvus/custom_embeddigs.py index 085f281..74dd456 100644 --- a/core/tools/knowledge/vector_db/milvus/custom_embeddigs.py +++ b/core/tools/knowledge/vector_db/milvus/custom_embeddigs.py @@ -1,14 +1,14 @@ # Standard Library import json -import os from typing import List # Third Party import requests from langchain.embeddings.base import Embeddings -HOST = os.getenv("EMBEDDING_HOST", "localhost") -EMBEDDING_URL = f"http://{HOST}:8020/embed_multiple" +import core.config as configs + +EMBEDDING_URL = f"{configs.embedding_url}/embed_multiple" def embed_text(texts: List[str]) -> List[List[float]]: From fe4f84b0310a70a7438b73dae47cb61e33be7abc Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 14:32:57 +0530 Subject: [PATCH 19/32] don't build for arm --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index d5ad67b..5b6fff5 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ TAG := $(or $(TAG),main) GITHUB_WORKFLOW := $(or $(GITHUB_WORKFLOW),local) REGISTRY := $(or $(REGISTRY),index.docker.io) -PLATFORMS := linux/amd64,linux/arm64 +PLATFORMS := linux/amd64 BUILDX_FLAGS := --platform $(PLATFORMS) --push define get_full_tag From 546c926e784a38d665f1106c9fc0f512c651885c Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 14:41:47 +0530 Subject: [PATCH 20/32] run for this branch --- .github/workflows/main-release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main-release.yaml b/.github/workflows/main-release.yaml index 7f4ebc7..820fe9d 100644 --- a/.github/workflows/main-release.yaml +++ b/.github/workflows/main-release.yaml @@ -8,7 +8,7 @@ on: push: branches: - main - - rc-modifications + - fork-modifications jobs: build-and-push: From a1d3e84e34e89e7ac12dc66c777101850de15ef5 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 16:49:13 +0530 Subject: [PATCH 21/32] things you foget to port over --- services/backend/api_server/requirements.txt | 4 ++-- services/backend/task_executor/requirements.txt | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/services/backend/api_server/requirements.txt b/services/backend/api_server/requirements.txt index 9ac8f68..81a29e0 100644 --- a/services/backend/api_server/requirements.txt +++ b/services/backend/api_server/requirements.txt @@ -4,7 +4,7 @@ celery==5.3.6 fastapi==0.105.0 motor==3.3.2 openai==1.6.1 -pymilvus==2.3.4 +pymilvus==2.2.8 pydantic==1.10.9 python-multipart==0.0.6 redis==5.0.1 @@ -16,4 +16,4 @@ langchain==0.0.351 spacy==3.7.2 markdownify==0.11.6 playwright==1.39.0 -tiktoken==0.5.2 \ No newline at end of file +tiktoken==0.5.2 diff --git a/services/backend/task_executor/requirements.txt b/services/backend/task_executor/requirements.txt index 00a3ea2..bd4daf9 100644 --- a/services/backend/task_executor/requirements.txt +++ b/services/backend/task_executor/requirements.txt @@ -12,10 +12,9 @@ redis==5.0.1 requests==2.31.0 uvicorn==0.25.0 websockets==12.0 -pymilvus==2.3.4 pypdf2==3.0.1 spacy==3.7.2 markdownify==0.11.6 playwright==1.39.0 tiktoken==0.5.2 -chardet==5.2.0 \ No newline at end of file +chardet==5.2.0 From acd335860d41cbea9c2f963f479535180070e128 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 18:55:01 +0530 Subject: [PATCH 22/32] ... --- core/config/config.py | 6 +-- core/tasks/__init__.py | 0 core/tasks/is_ready.py | 40 +++++++++++++++++++ core/tasks/tasks.py | 25 +++++------- .../knowledge/vector_db/milvus/operations.py | 2 - services/backend/task_executor/Dockerfile | 4 +- .../backend/task_executor/requirements.txt | 1 + 7 files changed, 56 insertions(+), 22 deletions(-) create mode 100644 core/tasks/__init__.py create mode 100644 core/tasks/is_ready.py diff --git a/core/config/config.py b/core/config/config.py index 00d4d42..4cda0fd 100644 --- a/core/config/config.py +++ b/core/config/config.py @@ -34,7 +34,7 @@ def get_redis_url() -> str: password = getenv("REDIS_PASS", getenv("REDIS_PASSWORD", None)) user = getenv("REDIS_USER", getenv("REDIS_USERNAME", None)) port = getenv("REDIS_PORT", 6379) - database = getenv("REDIS_DATABASE", "rubra") + database = getenv("REDIS_DATABASE", 0) if password: return f"redis://{user or ''}:{password}@{host}:{port}/{database}" @@ -46,8 +46,8 @@ def get_litellm_url() -> str: if url: return url - host = getenv("LITELLM_HOST") - port = getenv("LITELLM_PORT") + host = getenv("LITELLM_HOST", "localhost") + port = getenv("LITELLM_PORT", 8002) return f"http://{host}:{port}" diff --git a/core/tasks/__init__.py b/core/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/tasks/is_ready.py b/core/tasks/is_ready.py new file mode 100644 index 0000000..d5fb6a6 --- /dev/null +++ b/core/tasks/is_ready.py @@ -0,0 +1,40 @@ +from .tasks import app + +import socket + +import requests + +from core.config import litellm_url, vector_db_url + +import os + +def main(): + response = requests.get(f"{litellm_url}/health", headers={ + "Authorization": f"Bearer {os.getenv('LITELLM_MASTER_KEY', '')}" + }) + if not response.ok: + raise Exception(response.text) + + print(response) + + response = requests.get(f"{litellm_url}/health/readiness") + if not response.ok: + raise Exception(response.text) + + print(response) + + pong = app.control.ping([f'celery@{socket.gethostname()}']) + if len(pong) == 0 or pong[0].get('ok', None) is None: + raise Exception('ping failed with' + str(pong)) + + print(pong) + + response = requests.get(f"{vector_db_url}/healthz") + if not response.ok: + raise Exception(response.text) + + print(response) + + +if __name__ == "__main__": + main() diff --git a/core/tasks/tasks.py b/core/tasks/tasks.py index 6e04023..6cb3edc 100644 --- a/core/tasks/tasks.py +++ b/core/tasks/tasks.py @@ -7,6 +7,14 @@ from typing import cast +# Third Party +from core.tools.knowledge.vector_db.milvus.operations import add_texts, milvus_connection_alias +from langchain.text_splitter import RecursiveCharacterTextSplitter +from core.tools.knowledge.file_knowledge_tool import FileKnowledgeTool +from core.tools.web_browse.web_browse_tool import WebBrowseTool + +from pymilvus import connections + # Get the current working directory current_directory = os.getcwd() @@ -38,7 +46,6 @@ redis_client = cast(redis.Redis, redis.Redis.from_url(configs.redis_url)) # annoyingly from_url returns None, not Self app = Celery("tasks", broker=configs.redis_url) -app.config_from_object("core.tasks.celery_config") app.autodiscover_tasks(["core.tasks"]) # Explicitly discover tasks in 'app' package # Global MongoDB client @@ -46,12 +53,11 @@ @signals.worker_process_init.connect -async def setup_mongo_connection(*args, **kwargs): +async def ensure_connections(*args, **kwargs): global mongo_client mongo_client = MongoClient(configs.mongo_url) - mongo_client.admin.command("ping") - await redis_client.ping() + mongo_client.admin.command('ping') def create_assistant_message( thread_id, assistant_id, run_id, content_text, role=Role7.assistant.value @@ -174,10 +180,6 @@ def rubra_local_agent_chat_completion( def form_openai_tools(tools, assistant_id: str): - # Third Party - from core.tools.knowledge.file_knowledge_tool import FileKnowledgeTool - from core.tools.web_browse.web_browse_tool import WebBrowseTool - retrieval = FileKnowledgeTool() googlesearch = WebBrowseTool() res_tools = [] @@ -452,13 +454,6 @@ def execute_chat_completion(assistant_id, thread_id, redis_channel, run_id): @app.task def execute_asst_file_create(file_id: str, assistant_id: str): - # Standard Library - import json - - # Third Party - from core.tools.knowledge.vector_db.milvus.operations import add_texts - from langchain.text_splitter import RecursiveCharacterTextSplitter - try: db = mongo_client[configs.mongo_database] collection_name = assistant_id diff --git a/core/tools/knowledge/vector_db/milvus/operations.py b/core/tools/knowledge/vector_db/milvus/operations.py index 63e5f6a..b456d49 100644 --- a/core/tools/knowledge/vector_db/milvus/operations.py +++ b/core/tools/knowledge/vector_db/milvus/operations.py @@ -9,8 +9,6 @@ from .custom_embeddigs import CustomEmbeddings from .query_milvus import Milvus -MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost") - model = {} top_re_rank = 5 top_k_match = 10 diff --git a/services/backend/task_executor/Dockerfile b/services/backend/task_executor/Dockerfile index 2c6f42f..8d8fce4 100644 --- a/services/backend/task_executor/Dockerfile +++ b/services/backend/task_executor/Dockerfile @@ -8,7 +8,7 @@ ADD requirements.txt /app/ # Install any needed packages specified in requirements.txt RUN pip install --no-cache-dir -r requirements.txt -# RUN spacy download en_core_web_sm +RUN spacy download en_core_web_sm RUN playwright install RUN playwright install-deps @@ -23,4 +23,4 @@ WORKDIR /app ENV OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES # Run app.py when the container launches -CMD ["celery", "-A", "core.tasks.tasks", "worker", "--loglevel=info"] +CMD ["sh", "-c", "celery -A core.tasks.tasks worker --loglevel=info -n celery@$(hostname)"] diff --git a/services/backend/task_executor/requirements.txt b/services/backend/task_executor/requirements.txt index bd4daf9..958ecca 100644 --- a/services/backend/task_executor/requirements.txt +++ b/services/backend/task_executor/requirements.txt @@ -12,6 +12,7 @@ redis==5.0.1 requests==2.31.0 uvicorn==0.25.0 websockets==12.0 +pymilvus==2.2.8 pypdf2==3.0.1 spacy==3.7.2 markdownify==0.11.6 From e4e6ce2f0e274f9b115b3f4fe5b0b9ca2f6d50ef Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 18:59:41 +0530 Subject: [PATCH 23/32] run workflow From 77edbbabce46e977d9e7e1f72a35c876df66df17 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 19:15:04 +0530 Subject: [PATCH 24/32] ? --- services/backend/api_server/app/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index 0ee0350..36667ab 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -132,7 +132,7 @@ # Initialize MongoDB client mongo_client = AsyncIOMotorClient(configs.mongo_url, server_api=ServerApi("1")) -database = mongo_client[configs.mongo_url] +database = mongo_client[configs.mongo_database] celery_app = Celery(configs.redis_url) From f95c9a5371b59c0a162eaac366fc6f260dc37990 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 20:27:42 +0530 Subject: [PATCH 25/32] ? --- core/config/config.py | 6 +++--- services/backend/api_server/app/backend.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/config/config.py b/core/config/config.py index 4cda0fd..f3d6b09 100644 --- a/core/config/config.py +++ b/core/config/config.py @@ -1,10 +1,10 @@ from os import getenv def get_mongo_database_name(): - return getenv("MONGO_DATABASE", "rubra_db") + return getenv("MONGODB_DATABASE", "rubra_db") def get_mongo_url() -> str: - url = getenv("MONGO_URL") + url = getenv("MONGODB_URL") if url: return url @@ -12,7 +12,7 @@ def get_mongo_url() -> str: user = getenv("MONGODB_USER", getenv("MONGODB_USERNAME", None)) password = getenv("MONGODB_PASS", getenv("MONGODB_PASSWORD", None)) port = getenv("MONGODB_PORT", 27017) - database = getenv("MONGODB_DATABASE", "rubra") + database = get_mongo_database_name() if user and not password: print("MONGODB_USER set but password not found, ignoring user") diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index 36667ab..796fea4 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -1595,6 +1595,8 @@ async def _health(response: Response): @app.get("/healthz/readiness", status_code=status.HTTP_204_NO_CONTENT) @_health_endpoint_wrapper def is_litellm_ready() -> None: + print("Redis and MongoDB are connected and ready!") + response = requests.get(f"{LITELLM_URL}/health/readiness", { }) if response.json().get("status", "") != "healthy": From d7876b8a013cf3d7d2f529739bf70c60a4aaa723 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 21:57:28 +0530 Subject: [PATCH 26/32] ...Z --- core/tasks/is_ready.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/tasks/is_ready.py b/core/tasks/is_ready.py index d5fb6a6..12eaffa 100644 --- a/core/tasks/is_ready.py +++ b/core/tasks/is_ready.py @@ -24,7 +24,7 @@ def main(): print(response) pong = app.control.ping([f'celery@{socket.gethostname()}']) - if len(pong) == 0 or pong[0].get('ok', None) is None: + if len(pong) == 0 or list(pong[0].values())[0].get('ok', None) is None: raise Exception('ping failed with' + str(pong)) print(pong) From 2a273a8f4a9a8d60f79715a2ee6bedfbdfe92f52 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Tue, 21 May 2024 22:42:13 +0530 Subject: [PATCH 27/32] now revert the force and reduce stress --- core/config/utils.py | 18 +++++++ core/tasks/is_ready.py | 3 ++ services/backend/api_server/app/backend.py | 57 ++++++++++------------ services/backend/vector_db_api/main.py | 10 +--- 4 files changed, 48 insertions(+), 40 deletions(-) create mode 100644 core/config/utils.py diff --git a/core/config/utils.py b/core/config/utils.py new file mode 100644 index 0000000..f7b269b --- /dev/null +++ b/core/config/utils.py @@ -0,0 +1,18 @@ +from typing import Callable + +import asyncio + +def once(function: Callable) -> Callable: + ran = False + async def run_once(*args, **kwargs): + nonlocal ran + if ran: + return + + ret = function(*args, **kwargs) + if asyncio.iscoroutine(ret): + await ret + + ran = True + + return run_once diff --git a/core/tasks/is_ready.py b/core/tasks/is_ready.py index 12eaffa..f2bf8da 100644 --- a/core/tasks/is_ready.py +++ b/core/tasks/is_ready.py @@ -6,8 +6,11 @@ from core.config import litellm_url, vector_db_url +from core.config.utils import once + import os +@once def main(): response = requests.get(f"{litellm_url}/health", headers={ "Authorization": f"Bearer {os.getenv('LITELLM_MASTER_KEY', '')}" diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index 796fea4..4278fc2 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -109,6 +109,8 @@ import core.config as configs +from core.config.utils import once + app = FastAPI() origins = [ @@ -1575,47 +1577,38 @@ async def chat_completion(body: CreateChatCompletionRequest): else: return response -def _health_endpoint_wrapper(fn: Callable): - async def _health(response: Response): - try: - await redis.ping() - await mongo_client.admin.command("ping") - - if asyncio.iscoroutinefunction(fn): - await fn() - else: - fn() - except Exception as e: - print("error checking for health:", e) - response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE - return {"status": "not healthy"} +@app.get("/healthz/readiness", status_code=status.HTTP_204_NO_CONTENT) +@once +async def full_check(response: Response) -> Union[None, Dict[str, str]]: + try: + await redis.ping() + print("Redis connection is ready!") - return _health + await mongo_client.admin.command("ping") + print("MongoDB connection is ready!") -@app.get("/healthz/readiness", status_code=status.HTTP_204_NO_CONTENT) -@_health_endpoint_wrapper -def is_litellm_ready() -> None: - print("Redis and MongoDB are connected and ready!") + res = requests.get(f"{LITELLM_URL}/health/readiness", { }) - response = requests.get(f"{LITELLM_URL}/health/readiness", { }) + if res.json().get("status", "") != "healthy": + raise Exception("litellm not ready: " + str(res.json())) - if response.json().get("status", "") != "healthy": - raise Exception("litellm not ready: " + str(response.json())) + print("litellm is ready!") - # it is important to make sure auth is working - response = requests.get(f"{LITELLM_URL}/health", headers={ "Authorization": f"Bearer {LITELLM_MASTER_KEY}" }) + # it is important to make sure auth is working + res = requests.get(f"{LITELLM_URL}/health", headers={ "Authorization": f"Bearer {LITELLM_MASTER_KEY}" }) - if not response.ok: - raise Exception("could not grab litellm health: "+response.text) + if not res.ok: + raise Exception("could not grab litellm health: "+res.text) + print("litellm auth is working!") + except Exception as e: + print("error checking for health:", e) + response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE + return {"status": "not healthy"} @app.get("/healthz/liveness", status_code=status.HTTP_204_NO_CONTENT) -@_health_endpoint_wrapper -def is_litellm_healthy() -> None: - response = requests.get(f"{LITELLM_URL}/health/liveliness", { }) - - if response.text != "\"I'm alive!\"": - raise Exception("litellm not healthy: " + response.content.decode()) +def ping(): + pass def data_generator(response): diff --git a/services/backend/vector_db_api/main.py b/services/backend/vector_db_api/main.py index 9a888de..646cb65 100644 --- a/services/backend/vector_db_api/main.py +++ b/services/backend/vector_db_api/main.py @@ -16,6 +16,8 @@ from pymilvus import connections, MilvusException +from core.config.utils import once + model = {} top_re_rank = 5 top_k_match = 10 @@ -51,11 +53,3 @@ def text_similarity_match(query: Query): @app.get("/ping") def ping(): return {"response": "Pong!"} - -@app.get("/healthz", status_code=status.HTTP_204_NO_CONTENT) -def healthcheck(response: Response): - try: - connections.connect(alias=milvus_connection_alias) # alias makes sure pool isn't filled with random junk connections - except MilvusException as e: - print("failed to maintain connection with milvus: ", e) - response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE From 349b54ede7ec1c0043f3b1764b5afb75ea5aab92 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Thu, 23 May 2024 06:45:17 +0530 Subject: [PATCH 28/32] .. --- core/config/utils.py | 18 ----- core/tasks/is_ready.py | 21 ++--- core/tasks/tasks.py | 4 + services/backend/api_server/app/backend.py | 91 ++++++++++------------ services/backend/vector_db_api/main.py | 2 - 5 files changed, 54 insertions(+), 82 deletions(-) delete mode 100644 core/config/utils.py diff --git a/core/config/utils.py b/core/config/utils.py deleted file mode 100644 index f7b269b..0000000 --- a/core/config/utils.py +++ /dev/null @@ -1,18 +0,0 @@ -from typing import Callable - -import asyncio - -def once(function: Callable) -> Callable: - ran = False - async def run_once(*args, **kwargs): - nonlocal ran - if ran: - return - - ret = function(*args, **kwargs) - if asyncio.iscoroutine(ret): - await ret - - ran = True - - return run_once diff --git a/core/tasks/is_ready.py b/core/tasks/is_ready.py index f2bf8da..fdf9a04 100644 --- a/core/tasks/is_ready.py +++ b/core/tasks/is_ready.py @@ -6,19 +6,14 @@ from core.config import litellm_url, vector_db_url -from core.config.utils import once +def is_ready(): + # response = requests.get(f"{litellm_url}/health", headers={ + # "Authorization": f"Bearer {os.getenv('LITELLM_MASTER_KEY', '')}" + # }) + # if not response.ok: + # raise Exception(response.text) -import os - -@once -def main(): - response = requests.get(f"{litellm_url}/health", headers={ - "Authorization": f"Bearer {os.getenv('LITELLM_MASTER_KEY', '')}" - }) - if not response.ok: - raise Exception(response.text) - - print(response) + # print(response) response = requests.get(f"{litellm_url}/health/readiness") if not response.ok: @@ -40,4 +35,4 @@ def main(): if __name__ == "__main__": - main() + is_ready() diff --git a/core/tasks/tasks.py b/core/tasks/tasks.py index 6cb3edc..8b4129f 100644 --- a/core/tasks/tasks.py +++ b/core/tasks/tasks.py @@ -43,6 +43,8 @@ import core.config as configs +from .is_ready import is_ready + redis_client = cast(redis.Redis, redis.Redis.from_url(configs.redis_url)) # annoyingly from_url returns None, not Self app = Celery("tasks", broker=configs.redis_url) @@ -59,6 +61,8 @@ async def ensure_connections(*args, **kwargs): mongo_client.admin.command('ping') + is_ready() + def create_assistant_message( thread_id, assistant_id, run_id, content_text, role=Role7.assistant.value ): diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index 4278fc2..a262e1d 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -109,8 +109,6 @@ import core.config as configs -from core.config.utils import once - app = FastAPI() origins = [ @@ -145,6 +143,28 @@ def get_database(): return database +async def full_check() -> None: + await redis.ping() + print("Redis connection is ready!") + + await mongo_client.admin.command("ping") + print("MongoDB connection is ready!") + + res = requests.get(f"{LITELLM_URL}/health/readiness", { }) + + if res.json().get("status", "") != "healthy": + raise Exception("litellm not ready: " + str(res.json())) + + print("litellm is ready!") + + # it is important to make sure auth is working + res = requests.get(f"{LITELLM_URL}/health", headers={ "Authorization": f"Bearer {LITELLM_MASTER_KEY}" }) + + if not res.ok: + raise Exception("could not grab litellm health: "+res.text) + + print("litellm auth is working!") + @app.on_event("startup") async def on_startup(): await init_beanie( @@ -160,6 +180,8 @@ async def on_startup(): ], ) + await full_check() + available_models = [r.id for r in litellm_list_model().data] if not available_models: logging.warning("No models configured.") @@ -181,24 +203,24 @@ async def on_startup(): welcome_asst_instruction += tool_use_instruction # Create the Welcome Assistant if it doesn't exist - existing_assistant = await AssistantObject.find_one({"id": "asst_welcome"}) - if not existing_assistant: - logging.info("Creating Welcome Assistant") - assistant = AssistantObject( - assistant_id="asst_welcome", - object=Object20.assistant.value, - created_at=int(datetime.now().timestamp()), - name="Welcome Assistant", - description="Welcome Assistant", - model=welcome_asst_model, - instructions=welcome_asst_instruction, - tools=[{"type": Type824.retrieval.value}] - if welcome_asst_model in tool_enabled_model_pool - else [], # browser - file_ids=[], - metadata={}, - ) - await assistant.insert() + # existing_assistant = await AssistantObject.find_one({"id": "asst_welcome"}) + # if not existing_assistant: + # logging.info("Creating Welcome Assistant") + # assistant = AssistantObject( + # assistant_id="asst_welcome", + # object=Object20.assistant.value, + # created_at=int(datetime.now().timestamp()), + # name="Welcome Assistant", + # description="Welcome Assistant", + # model=welcome_asst_model, + # instructions=welcome_asst_instruction, + # tools=[{"type": Type824.retrieval.value}] + # if welcome_asst_model in tool_enabled_model_pool + # else [], # browser + # file_ids=[], + # metadata={}, + # ) + # await assistant.insert() @app.get("/get_api_key_status", tags=["API Keys"]) @@ -1577,35 +1599,6 @@ async def chat_completion(body: CreateChatCompletionRequest): else: return response -@app.get("/healthz/readiness", status_code=status.HTTP_204_NO_CONTENT) -@once -async def full_check(response: Response) -> Union[None, Dict[str, str]]: - try: - await redis.ping() - print("Redis connection is ready!") - - await mongo_client.admin.command("ping") - print("MongoDB connection is ready!") - - res = requests.get(f"{LITELLM_URL}/health/readiness", { }) - - if res.json().get("status", "") != "healthy": - raise Exception("litellm not ready: " + str(res.json())) - - print("litellm is ready!") - - # it is important to make sure auth is working - res = requests.get(f"{LITELLM_URL}/health", headers={ "Authorization": f"Bearer {LITELLM_MASTER_KEY}" }) - - if not res.ok: - raise Exception("could not grab litellm health: "+res.text) - - print("litellm auth is working!") - except Exception as e: - print("error checking for health:", e) - response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE - return {"status": "not healthy"} - @app.get("/healthz/liveness", status_code=status.HTTP_204_NO_CONTENT) def ping(): pass diff --git a/services/backend/vector_db_api/main.py b/services/backend/vector_db_api/main.py index 646cb65..7b9b96c 100644 --- a/services/backend/vector_db_api/main.py +++ b/services/backend/vector_db_api/main.py @@ -16,8 +16,6 @@ from pymilvus import connections, MilvusException -from core.config.utils import once - model = {} top_re_rank = 5 top_k_match = 10 From 6ff667bcfc408a7ad185b381a4ff18b0039b6f11 Mon Sep 17 00:00:00 2001 From: Debdut Chakraborty Date: Mon, 27 May 2024 09:31:57 +0530 Subject: [PATCH 29/32] remove older code --- services/backend/api_server/app/backend.py | 7 ------- services/backend/vector_db_api/main.py | 3 --- 2 files changed, 10 deletions(-) diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index a262e1d..11ed38d 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -157,13 +157,6 @@ async def full_check() -> None: print("litellm is ready!") - # it is important to make sure auth is working - res = requests.get(f"{LITELLM_URL}/health", headers={ "Authorization": f"Bearer {LITELLM_MASTER_KEY}" }) - - if not res.ok: - raise Exception("could not grab litellm health: "+res.text) - - print("litellm auth is working!") @app.on_event("startup") async def on_startup(): diff --git a/services/backend/vector_db_api/main.py b/services/backend/vector_db_api/main.py index 7b9b96c..320605b 100644 --- a/services/backend/vector_db_api/main.py +++ b/services/backend/vector_db_api/main.py @@ -10,12 +10,9 @@ drop_collection, get_similar_match, load_collection, - milvus_connection_alias, ) from fastapi import FastAPI, status, Response -from pymilvus import connections, MilvusException - model = {} top_re_rank = 5 top_k_match = 10 From 6fdca85464026b9951c8d641be4e7c74e77eff78 Mon Sep 17 00:00:00 2001 From: Dnouv Date: Thu, 30 May 2024 14:57:52 +0530 Subject: [PATCH 30/32] resolve circular dep, correct vdb url, resp_format fix --- core/local_model.py | 2 +- core/tasks/celery_app.py | 10 ++++++++++ core/tasks/is_ready.py | 4 ++-- core/tasks/tasks.py | 16 ++++++++-------- core/tools/knowledge/file_knowledge_tool.py | 2 +- .../knowledge/vector_db/milvus/query_milvus.py | 1 + docker-compose.yml | 2 ++ services/backend/api_server/app/backend.py | 14 +++++++++++--- 8 files changed, 36 insertions(+), 15 deletions(-) create mode 100644 core/tasks/celery_app.py diff --git a/core/local_model.py b/core/local_model.py index e00b52d..5fc5b16 100644 --- a/core/local_model.py +++ b/core/local_model.py @@ -150,7 +150,7 @@ def simple_qa(query: str, context: str) -> str: temperature=0.1, messages=messages, stream=False, - response_format="web", + response_format={"type": "text"}, # mlc doesn't supports string "web" ) return response.choices[0].message.content diff --git a/core/tasks/celery_app.py b/core/tasks/celery_app.py new file mode 100644 index 0000000..4e6922f --- /dev/null +++ b/core/tasks/celery_app.py @@ -0,0 +1,10 @@ +import redis +from celery import Celery +from typing import cast +import core.config as configs + + +redis_client = cast(redis.Redis, redis.Redis.from_url(configs.redis_url)) # annoyingly from_url returns None, not Self +app = Celery("tasks", broker=configs.redis_url) + +app.autodiscover_tasks(["core.tasks"]) # Explicitly discover tasks in 'app' package diff --git a/core/tasks/is_ready.py b/core/tasks/is_ready.py index fdf9a04..ae59bcf 100644 --- a/core/tasks/is_ready.py +++ b/core/tasks/is_ready.py @@ -1,11 +1,11 @@ -from .tasks import app - import socket import requests from core.config import litellm_url, vector_db_url +from .celery_app import app + def is_ready(): # response = requests.get(f"{litellm_url}/health", headers={ # "Authorization": f"Bearer {os.getenv('LITELLM_MASTER_KEY', '')}" diff --git a/core/tasks/tasks.py b/core/tasks/tasks.py index 8b4129f..a58578d 100644 --- a/core/tasks/tasks.py +++ b/core/tasks/tasks.py @@ -44,18 +44,15 @@ import core.config as configs from .is_ready import is_ready - -redis_client = cast(redis.Redis, redis.Redis.from_url(configs.redis_url)) # annoyingly from_url returns None, not Self -app = Celery("tasks", broker=configs.redis_url) - -app.autodiscover_tasks(["core.tasks"]) # Explicitly discover tasks in 'app' package +from .celery_app import app # Global MongoDB client -mongo_client: MongoClient +mongo_client: MongoClient = None +redis_client = cast(redis.Redis, redis.Redis.from_url(configs.redis_url)) # annoyingly from_url returns None, not Self @signals.worker_process_init.connect -async def ensure_connections(*args, **kwargs): +def ensure_connections(*args, **kwargs): global mongo_client mongo_client = MongoClient(configs.mongo_url) @@ -220,11 +217,12 @@ def form_openai_tools(tools, assistant_id: str): @shared_task def execute_chat_completion(assistant_id, thread_id, redis_channel, run_id): try: + db = mongo_client[configs.mongo_database] # OpenAI call can fail, so we need to get the db again + oai_client = OpenAI( base_url=configs.litellm_url, api_key=os.getenv("LITELLM_MASTER_KEY"), # point to litellm server ) - db = mongo_client[configs.mongo_database] # Fetch assistant and thread messages synchronously assistant = db.assistants.find_one({"id": assistant_id}) @@ -459,6 +457,8 @@ def execute_chat_completion(assistant_id, thread_id, redis_channel, run_id): @app.task def execute_asst_file_create(file_id: str, assistant_id: str): try: + if mongo_client is None: + raise Exception("MongoDB client not initialized yet") db = mongo_client[configs.mongo_database] collection_name = assistant_id text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) diff --git a/core/tools/knowledge/file_knowledge_tool.py b/core/tools/knowledge/file_knowledge_tool.py index 105cb11..dea2088 100644 --- a/core/tools/knowledge/file_knowledge_tool.py +++ b/core/tools/knowledge/file_knowledge_tool.py @@ -6,7 +6,7 @@ # Third Party import requests -vector_db_url = f"{configs.vector_db_url}/similarity_search" +vector_db_url = f"{configs.vector_db_url}/similarity_match" class FileKnowledgeTool: name = "FileKnowledge" diff --git a/core/tools/knowledge/vector_db/milvus/query_milvus.py b/core/tools/knowledge/vector_db/milvus/query_milvus.py index 35b6677..725cbca 100644 --- a/core/tools/knowledge/vector_db/milvus/query_milvus.py +++ b/core/tools/knowledge/vector_db/milvus/query_milvus.py @@ -133,6 +133,7 @@ def __init__( if alias is not None: self.alias = alias elif connection_args is not None: + connection_args = DEFAULT_MILVUS_CONNECTION self.alias = Milvus.create_connection_alias(connection_args) else: raise ValueError('alias or connection_args must be passed to Milvus construtor') diff --git a/docker-compose.yml b/docker-compose.yml index ea091a7..01c74d2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -115,6 +115,7 @@ services: - EMBEDDING_HOST=text-embedding-api - VECTOR_DB_HOST=vector-db-api - MILVUS_HOST=milvus + - LITELLM_MASTER_KEY=abc depends_on: - redis - mongodb @@ -134,6 +135,7 @@ services: - REDIS_HOST=redis - MONGODB_HOST=mongodb - LITELLM_HOST=litellm + - MILVUS_HOST=milvus ports: - '8000:8000' depends_on: diff --git a/services/backend/api_server/app/backend.py b/services/backend/api_server/app/backend.py index 11ed38d..3fdab5c 100644 --- a/services/backend/api_server/app/backend.py +++ b/services/backend/api_server/app/backend.py @@ -127,16 +127,16 @@ # MongoDB Configurationget LITELLM_URL = configs.litellm_url -LITELLM_MASTER_KEY = os.getenv("LITELLM_MASTER_KEY", "") +LITELLM_MASTER_KEY = os.getenv("LITELLM_MASTER_KEY", "abc") # Litellm fails without this key HEADERS = {"accept": "application/json", "Content-Type": "application/json"} # Initialize MongoDB client mongo_client = AsyncIOMotorClient(configs.mongo_url, server_api=ServerApi("1")) database = mongo_client[configs.mongo_database] -celery_app = Celery(configs.redis_url) +celery_app = Celery(broker=configs.redis_url) -redis = aioredis.from_url(configs.redis_url) +redis = aioredis.from_url(configs.redis_url, encoding="utf-8", decode_responses=True) logging.basicConfig(level=logging.INFO) @@ -764,6 +764,14 @@ async def redis_subscriber(channel, timeout=1): pubsub = redis.pubsub() await pubsub.subscribe(channel) + # Check if the subscription was successful + channels = await redis.pubsub_channels() + logging.info(f"Channels: {channels}") + if channel in channels: + logging.info(f"Successfully subscribed to channel: {channel}") + else: + logging.error(f"Failed to subscribe to channel: {channel}") + while True: try: message = await asyncio.wait_for( From c15ae03e13596ba9ef40faeb0a5ba81ab3585b3a Mon Sep 17 00:00:00 2001 From: Dnouv Date: Thu, 30 May 2024 15:00:25 +0530 Subject: [PATCH 31/32] build for arm --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 5b6fff5..d5ad67b 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ TAG := $(or $(TAG),main) GITHUB_WORKFLOW := $(or $(GITHUB_WORKFLOW),local) REGISTRY := $(or $(REGISTRY),index.docker.io) -PLATFORMS := linux/amd64 +PLATFORMS := linux/amd64,linux/arm64 BUILDX_FLAGS := --platform $(PLATFORMS) --push define get_full_tag From 301a8d4548c2dce88a416625acf364ecc6e8e46c Mon Sep 17 00:00:00 2001 From: Dnouv Date: Thu, 30 May 2024 15:17:09 +0530 Subject: [PATCH 32/32] lowercase --- .github/workflows/main-release.yaml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main-release.yaml b/.github/workflows/main-release.yaml index 820fe9d..88e313f 100644 --- a/.github/workflows/main-release.yaml +++ b/.github/workflows/main-release.yaml @@ -8,7 +8,7 @@ on: push: branches: - main - - fork-modifications + - develop jobs: build-and-push: @@ -33,13 +33,22 @@ jobs: - name: Expose GH Runtime uses: crazy-max/ghaction-github-runtime@v3 + - name: set lower case owner name + run: | + echo "OWNER_LC=${OWNER,,}" >>${GITHUB_ENV} + env: + OWNER: "${{ github.repository_owner }}" + + - name: set lower case owner name + run: | + echo "REPO_LC=$(echo ${{ github.repository }} | awk 'BEGIN{FS=OFS="/"}{print tolower($1) "/" $2}')" >>${GITHUB_ENV} - name: Build and Push Docker Images run: | make build_and_push_images env: REGISTRY: "ghcr.io" - ORG: ${{ github.repository_owner }} - REPO: ${{ github.event.repository.name }} + ORG: ${{ env.OWNER_LC }} + REPO: ${{ env.REPO_LC }} GITHUB_WORKFLOW: ${{ github.workflow }} build-tauri: