Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Pinecone implementation nonblocking #321

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions vocode/streaming/agent/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,15 @@ async def get_tracer_name_start(self) -> str:
self.tracer_name_start: str = tracer_name_start
return tracer_name_start

def terminate(self):
if (
hasattr(self.agent_config, "vector_db_config")
and self.agent_config.vector_db_config
):
self.logger.debug("Terminating vector db")
self.vector_db.tear_down()
return super().terminate()

async def respond(
self,
human_input,
Expand Down
55 changes: 53 additions & 2 deletions vocode/streaming/vector_db/base_vector_db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,57 @@
import os
from typing import Iterable, List, Optional, Tuple
import aiohttp
import openai
from langchain.docstore.document import Document

DEFAULT_OPENAI_EMBEDDING_MODEL = "text-embedding-ada-002"


class VectorDB:
async def add_texts(self):
def __init__(
self,
aiohttp_session: Optional[aiohttp.ClientSession] = None,
):
if aiohttp_session:
# the caller is responsible for closing the session
self.aiohttp_session = aiohttp_session
self.should_close_session_on_tear_down = False
else:
self.aiohttp_session = aiohttp.ClientSession()
self.should_close_session_on_tear_down = True

async def create_openai_embedding(
self, text, model=DEFAULT_OPENAI_EMBEDDING_MODEL
) -> List[float]:
params = {
"input": text,
}

engine = os.getenv("AZURE_OPENAI_TEXT_EMBEDDING_ENGINE")
if engine:
params["engine"] = engine
else:
params["model"] = model

return list((await openai.Embedding.acreate(**params))["data"][0]["embedding"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(fast follow) this should have some error handling


async def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
namespace: Optional[str] = None,
) -> List[str]:
raise NotImplementedError

async def similarity_search_with_score(self):
async def similarity_search_with_score(
self,
query: str,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
) -> List[Tuple[Document, float]]:
raise NotImplementedError

async def tear_down(self):
if self.should_close_session_on_tear_down:
await self.aiohttp_session.close()
9 changes: 7 additions & 2 deletions vocode/streaming/vector_db/factory.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import logging
from typing import Optional
import aiohttp
from vocode.streaming.models.vector_db import PineconeConfig, VectorDBConfig
from vocode.streaming.vector_db.base_vector_db import VectorDB
from vocode.streaming.vector_db.pinecone import PineconeDB


class VectorDBFactory:
def create_vector_db(self, vector_db_config: VectorDBConfig) -> VectorDB:
def create_vector_db(
self,
vector_db_config: VectorDBConfig,
aiohttp_session: Optional[aiohttp.ClientSession] = None,
) -> VectorDB:
if isinstance(vector_db_config, PineconeConfig):
return PineconeDB(vector_db_config)
return PineconeDB(vector_db_config, aiohttp_session=aiohttp_session)
raise Exception("Invalid vector db config", vector_db_config.type)
125 changes: 104 additions & 21 deletions vocode/streaming/vector_db/pinecone.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,115 @@
import asyncio
from functools import partial
from langchain.vectorstores import Pinecone
import logging
from typing import Iterable, List, Optional, Tuple
HHousen marked this conversation as resolved.
Show resolved Hide resolved
import uuid
from langchain.docstore.document import Document
from vocode import getenv
from vocode.streaming.models.vector_db import PineconeConfig
from langchain.embeddings.openai import OpenAIEmbeddings
from vocode.streaming.vector_db.base_vector_db import VectorDB

logger = logging.getLogger(__name__)

class PineconeDB(VectorDB):
def __init__(self, config: PineconeConfig) -> None:
import pinecone

class PineconeDB(VectorDB):
def __init__(self, config: PineconeConfig, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.config = config

pinecone.init(
api_key=getenv("PINECONE_API_KEY"),
environment=getenv("PINECONE_ENVIRONMENT"),
self.index_name = self.config.index
self.pinecone_api_key = getenv("PINECONE_API_KEY") or self.config.api_key
self.pinecone_environment = (
getenv("PINECONE_ENVIRONMENT") or self.config.api_environment
)
self.pinecone_url = (
f"https://{self.index_name}.svc.{self.pinecone_environment}.pinecone.io"
)
index = pinecone.Index(self.config.index)
self.embeddings = OpenAIEmbeddings() # type: ignore
self.vectorstore = Pinecone(index, self.embeddings.embed_query, "text")
self._text_key = "text"

async def add_texts(self, texts, **kwargs):
func = partial(self.vectorstore.add_texts, texts, **kwargs)
return await asyncio.get_event_loop().run_in_executor(None, func)
async def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
namespace: Optional[str] = None,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.

async def similarity_search_with_score(self, query, k=4, **kwargs):
func = partial(
self.vectorstore.similarity_search_with_score, query, k, **kwargs
)
return await asyncio.get_event_loop().run_in_executor(None, func)
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
namespace: Optional pinecone namespace to add the texts to.

Returns:
List of ids from adding the texts into the vectorstore.
"""
# Adapted from: langchain/vectorstores/pinecone.py. Made langchain implementation async.
if namespace is None:
namespace = ""
# Embed and create the documents
docs = []
ids = ids or [str(uuid.uuid4()) for _ in texts]
for i, text in enumerate(texts):
embedding = await self.create_openai_embedding(text)
metadata = metadatas[i] if metadatas else {}
metadata[self._text_key] = text
docs.append({"id": ids[i], "values": embedding, "metadata": metadata})
# upsert to Pinecone
async with self.aiohttp_session.post(
f"{self.pinecone_url}/vectors/upsert",
headers={"Api-Key": self.pinecone_api_key},
json={
"vectors": docs,
"namespace": namespace,
},
) as response:
response_json = await response.json()
if "message" in response_json:
logger.error(f"Error upserting vectors: {response_json}")

return ids

async def similarity_search_with_score(
self,
query: str,
filter: Optional[dict] = None,
namespace: Optional[str] = None,
) -> List[Tuple[Document, float]]:
"""Return pinecone documents most similar to query, along with scores.

Args:
query: Text to look up documents similar to.
filter: Dictionary of argument(s) to filter on metadata
namespace: Namespace to search in. Default will search in '' namespace.

Returns:
List of Documents most similar to the query and score for each
"""
# Adapted from: langchain/vectorstores/pinecone.py. Made langchain implementation async.
if namespace is None:
namespace = ""
query_obj = await self.create_openai_embedding(query)
docs = []
async with self.aiohttp_session.post(
f"{self.pinecone_url}/query",
headers={"Api-Key": self.pinecone_api_key},
json={
"top_k": self.config.top_k,
"namespace": namespace,
"filter": filter,
"vector": query_obj,
"includeMetadata": True,
},
) as response:
results = await response.json()

for res in results["matches"]:
metadata = res["metadata"]
if self._text_key in metadata:
text = metadata.pop(self._text_key)
score = res["score"]
docs.append((Document(page_content=text, metadata=metadata), score))
else:
logger.warning(
f"Found document with no `{self._text_key}` key. Skipping."
)
return docs
Loading