From cff1e1ea3739866d10fe99e831c1cbd88fe4d735 Mon Sep 17 00:00:00 2001 From: Nelson Auner Date: Mon, 29 Jan 2024 18:38:37 -0800 Subject: [PATCH] Support synchronous processing for e.g. AWS Lambdas #11168 --- .../vectorstores/pinecone.py | 52 ++++++++++++------- .../vectorstores/test_pinecone.py | 47 ++++++++++++++++- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/libs/community/langchain_community/vectorstores/pinecone.py b/libs/community/langchain_community/vectorstores/pinecone.py index 410d55420fa57..b18c1757ca368 100644 --- a/libs/community/langchain_community/vectorstores/pinecone.py +++ b/libs/community/langchain_community/vectorstores/pinecone.py @@ -113,6 +113,7 @@ def add_texts( namespace: Optional[str] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, + async_req: bool = True, **kwargs: Any, ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. @@ -128,6 +129,7 @@ def add_texts( namespace: Optional pinecone namespace to add the texts to. batch_size: Batch size to use when adding the texts to the vectorstore. embedding_chunk_size: Chunk size to use when embedding the texts. + async_req: Whether to use asynchronous requests when adding the texts. Returns: List of ids from adding the texts into the vectorstore. @@ -142,26 +144,34 @@ def add_texts( for metadata, text in zip(metadatas, texts): metadata[self._text_key] = text - # For loops to avoid memory issues and optimize when using HTTP based embeddings - # The first loop runs the embeddings, it benefits when using OpenAI embeddings - # The second loops runs the pinecone upsert asynchronously. - for i in range(0, len(texts), embedding_chunk_size): - chunk_texts = texts[i : i + embedding_chunk_size] - chunk_ids = ids[i : i + embedding_chunk_size] - chunk_metadatas = metadatas[i : i + embedding_chunk_size] - embeddings = self._embed_documents(chunk_texts) - async_res = [ - self._index.upsert( - vectors=batch, - namespace=namespace, - async_req=True, - **kwargs, - ) - for batch in batch_iterate( - batch_size, zip(chunk_ids, embeddings, chunk_metadatas) - ) - ] - [res.get() for res in async_res] + if async_req: + # For loops to avoid memory issues and optimize when using HTTP based embeddings + # The first loop runs the embeddings, it benefits when using OpenAI embeddings + # The second loops runs the pinecone upsert asynchronously. + for i in range(0, len(texts), embedding_chunk_size): + chunk_texts = texts[i : i + embedding_chunk_size] + chunk_ids = ids[i : i + embedding_chunk_size] + chunk_metadatas = metadatas[i : i + embedding_chunk_size] + embeddings = self._embed_documents(chunk_texts) + async_res = [ + self._index.upsert( + vectors=batch, + namespace=namespace, + async_req=async_req, + **kwargs, + ) + for batch in batch_iterate( + batch_size, zip(chunk_ids, embeddings, chunk_metadatas) + ) + ] + [res.get() for res in async_res] + else: + self._index.upsert( + vectors=list(zip(ids, self._embed_documents(texts), metadatas)), + namespace=namespace, + async_req=async_req, + **kwargs, + ) return ids @@ -407,6 +417,7 @@ def from_texts( upsert_kwargs: Optional[dict] = None, pool_threads: int = 4, embeddings_chunk_size: int = 1000, + async_req: bool = True, **kwargs: Any, ) -> Pinecone: """Construct Pinecone wrapper from raw documents. @@ -445,6 +456,7 @@ def from_texts( namespace=namespace, batch_size=batch_size, embedding_chunk_size=embeddings_chunk_size, + async_req=async_req, **(upsert_kwargs or {}), ) return pinecone diff --git a/libs/community/tests/integration_tests/vectorstores/test_pinecone.py b/libs/community/tests/integration_tests/vectorstores/test_pinecone.py index 99a42dc516216..e7bf55231d8ac 100644 --- a/libs/community/tests/integration_tests/vectorstores/test_pinecone.py +++ b/libs/community/tests/integration_tests/vectorstores/test_pinecone.py @@ -2,12 +2,13 @@ import os import time import uuid -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING, List, Generator import numpy as np import pytest from langchain_core.documents import Document +from langchain_community.document_loaders import TextLoader from langchain_community.embeddings import OpenAIEmbeddings from langchain_community.vectorstores.pinecone import Pinecone @@ -19,6 +20,30 @@ dimension = 1536 # dimension of the embeddings +@pytest.fixture(scope="module") +def embedding_openai() -> OpenAIEmbeddings: + if not os.environ.get("OPENAI_API_KEY"): + raise ValueError("OPENAI_API_KEY is not set") + return OpenAIEmbeddings() + + +@pytest.fixture(scope="function") +def texts() -> Generator[List[str], None, None]: + # Load the documents from a file located in the fixtures directory + documents = TextLoader( + os.path.join(os.path.dirname(__file__), "fixtures", "sharks.txt") + ).load() + + yield [doc.page_content for doc in documents] + +@pytest.fixture +def mock_pool_not_supported(mocker): + """ + This is the error thrown when multiprocessing is not supported. + See https://github.com/langchain-ai/langchain/issues/11168 + """ + mocker.patch('multiprocessing.synchronize.SemLock.__init__', side_effect=OSError('OSError: [Errno 38] Function not implemented')) + def reset_pinecone() -> None: assert os.environ.get("PINECONE_API_KEY") is not None assert os.environ.get("PINECONE_ENVIRONMENT") is not None @@ -285,3 +310,23 @@ def test_from_texts_with_metadatas_benchmark( query = "What did the president say about Ketanji Brown Jackson" _ = docsearch.similarity_search(query, k=1, namespace=namespace_name) + + + @pytest.mark.usefixtures('mock_pool_not_supported') + def test_that_async_freq_uses_multiprocessing(self, embedding_openai: OpenAIEmbeddings) -> None: + with pytest.raises(OSError): + Pinecone.from_texts( + texts=["foo", "bar", "baz"] * 32, + embedding=embedding_openai, + index_name=index_name, + async_req=True, + ) + + @pytest.mark.usefixtures('mock_pool_not_supported') + def test_that_async_freq_false_enabled_singlethreading(self, embedding_openai: OpenAIEmbeddings) -> None: + Pinecone.from_texts( + texts=["foo", "bar", "baz"], + embedding=embedding_openai, + index_name=index_name, + async_req=False + ) \ No newline at end of file