Skip to content

Commit

Permalink
Support synchronous processing for e.g. AWS Lambdas #11168
Browse files Browse the repository at this point in the history
  • Loading branch information
nelsonauner committed Jan 30, 2024
1 parent 2db79ab commit cff1e1e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 21 deletions.
52 changes: 32 additions & 20 deletions libs/community/langchain_community/vectorstores/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def add_texts(
namespace: Optional[str] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
async_req: bool = True,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Expand All @@ -128,6 +129,7 @@ def add_texts(
namespace: Optional pinecone namespace to add the texts to.
batch_size: Batch size to use when adding the texts to the vectorstore.
embedding_chunk_size: Chunk size to use when embedding the texts.
async_req: Whether to use asynchronous requests when adding the texts.
Returns:
List of ids from adding the texts into the vectorstore.
Expand All @@ -142,26 +144,34 @@ def add_texts(
for metadata, text in zip(metadatas, texts):
metadata[self._text_key] = text

# For loops to avoid memory issues and optimize when using HTTP based embeddings
# The first loop runs the embeddings, it benefits when using OpenAI embeddings
# The second loops runs the pinecone upsert asynchronously.
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
async_res = [
self._index.upsert(
vectors=batch,
namespace=namespace,
async_req=True,
**kwargs,
)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
)
]
[res.get() for res in async_res]
if async_req:
# For loops to avoid memory issues and optimize when using HTTP based embeddings
# The first loop runs the embeddings, it benefits when using OpenAI embeddings
# The second loops runs the pinecone upsert asynchronously.
for i in range(0, len(texts), embedding_chunk_size):
chunk_texts = texts[i : i + embedding_chunk_size]
chunk_ids = ids[i : i + embedding_chunk_size]
chunk_metadatas = metadatas[i : i + embedding_chunk_size]
embeddings = self._embed_documents(chunk_texts)
async_res = [
self._index.upsert(
vectors=batch,
namespace=namespace,
async_req=async_req,
**kwargs,
)
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
)
]
[res.get() for res in async_res]
else:
self._index.upsert(
vectors=list(zip(ids, self._embed_documents(texts), metadatas)),
namespace=namespace,
async_req=async_req,
**kwargs,
)

return ids

Expand Down Expand Up @@ -407,6 +417,7 @@ def from_texts(
upsert_kwargs: Optional[dict] = None,
pool_threads: int = 4,
embeddings_chunk_size: int = 1000,
async_req: bool = True,
**kwargs: Any,
) -> Pinecone:
"""Construct Pinecone wrapper from raw documents.
Expand Down Expand Up @@ -445,6 +456,7 @@ def from_texts(
namespace=namespace,
batch_size=batch_size,
embedding_chunk_size=embeddings_chunk_size,
async_req=async_req,
**(upsert_kwargs or {}),
)
return pinecone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import os
import time
import uuid
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING, List, Generator

import numpy as np
import pytest
from langchain_core.documents import Document

from langchain_community.document_loaders import TextLoader
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores.pinecone import Pinecone

Expand All @@ -19,6 +20,30 @@
dimension = 1536 # dimension of the embeddings


@pytest.fixture(scope="module")
def embedding_openai() -> OpenAIEmbeddings:
if not os.environ.get("OPENAI_API_KEY"):
raise ValueError("OPENAI_API_KEY is not set")
return OpenAIEmbeddings()


@pytest.fixture(scope="function")
def texts() -> Generator[List[str], None, None]:
# Load the documents from a file located in the fixtures directory
documents = TextLoader(
os.path.join(os.path.dirname(__file__), "fixtures", "sharks.txt")
).load()

yield [doc.page_content for doc in documents]

@pytest.fixture
def mock_pool_not_supported(mocker):
"""
This is the error thrown when multiprocessing is not supported.
See https://github.com/langchain-ai/langchain/issues/11168
"""
mocker.patch('multiprocessing.synchronize.SemLock.__init__', side_effect=OSError('OSError: [Errno 38] Function not implemented'))

def reset_pinecone() -> None:
assert os.environ.get("PINECONE_API_KEY") is not None
assert os.environ.get("PINECONE_ENVIRONMENT") is not None
Expand Down Expand Up @@ -285,3 +310,23 @@ def test_from_texts_with_metadatas_benchmark(

query = "What did the president say about Ketanji Brown Jackson"
_ = docsearch.similarity_search(query, k=1, namespace=namespace_name)


@pytest.mark.usefixtures('mock_pool_not_supported')
def test_that_async_freq_uses_multiprocessing(self, embedding_openai: OpenAIEmbeddings) -> None:
with pytest.raises(OSError):
Pinecone.from_texts(
texts=["foo", "bar", "baz"] * 32,
embedding=embedding_openai,
index_name=index_name,
async_req=True,
)

@pytest.mark.usefixtures('mock_pool_not_supported')
def test_that_async_freq_false_enabled_singlethreading(self, embedding_openai: OpenAIEmbeddings) -> None:
Pinecone.from_texts(
texts=["foo", "bar", "baz"],
embedding=embedding_openai,
index_name=index_name,
async_req=False
)

0 comments on commit cff1e1e

Please sign in to comment.