Skip to content

Commit

Permalink
Added possibility of utilising namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
mateuszkuprowski committed Jan 3, 2025
1 parent 5914bd9 commit 9de0dd4
Showing 1 changed file with 45 additions and 33 deletions.
78 changes: 45 additions & 33 deletions unstructured_ingest/v2/processes/connectors/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ class PineconeUploaderConfig(UploaderConfig):
)
namespace: Optional[str] = Field(
default=None,
description="The namespace to write to. If not specified, the default namespace is used",
description=(
"The namespace to write to. If not specified (None), the Pinecone SDK "
"will fall back to the 'default' namespace automatically."
),
)
record_id_key: str = Field(
default=RECORD_ID_LABEL,
Expand Down Expand Up @@ -173,49 +176,56 @@ def precheck(self):
raise DestinationConnectionError(f"failed to validate connection: {e}")

def pod_delete_by_record_id(self, file_data: FileData) -> None:
"""Deletion for Pinecone Pod-based index."""
logger.debug(
f"deleting any content with metadata "
f"Deleting any content with metadata "
f"{self.upload_config.record_id_key}={file_data.identifier} "
f"from pinecone pod index"
f"from Pinecone pod index"
)
index = self.connection_config.get_index(pool_threads=MAX_POOL_THREADS)

# Build the delete_kwargs, only include 'namespace' if it's not None
delete_kwargs = {
"filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}}
"filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}},
}
if namespace := self.upload_config.namespace:
delete_kwargs["namespace"] = namespace
if self.upload_config.namespace is not None:
delete_kwargs["namespace"] = self.upload_config.namespace

resp = index.delete(**delete_kwargs)
logger.debug(
f"deleted any content with metadata "
f"Deleted any content with metadata "
f"{self.upload_config.record_id_key}={file_data.identifier} "
f"from pinecone index: {resp}"
f"from Pinecone index: {resp}"
)

def serverless_delete_by_record_id(self, file_data: FileData) -> None:
"""Deletion for Pinecone Serverless index."""
logger.debug(
f"deleting any content with metadata "
f"Deleting any content with metadata "
f"{self.upload_config.record_id_key}={file_data.identifier} "
f"from pinecone serverless index"
f"from Pinecone serverless index"
)
index = self.connection_config.get_index(pool_threads=MAX_POOL_THREADS)

# Build the list_kwargs, only include 'namespace' if it's not None
list_kwargs = {"prefix": f"{file_data.identifier}#"}
if self.upload_config.namespace is not None:
list_kwargs["namespace"] = self.upload_config.namespace

deleted_ids = 0
if namespace := self.upload_config.namespace:
list_kwargs["namespace"] = namespace
for ids in index.list(**list_kwargs):
deleted_ids += len(ids)
delete_kwargs = {"ids": ids}
if namespace := self.upload_config.namespace:
delete_resp = delete_kwargs["namespace"] = namespace
# delete_resp should be an empty dict if there were no errors
if delete_resp:
logger.error(f"failed to delete batch of ids: {delete_resp}")
index.delete(**delete_kwargs)
if self.upload_config.namespace is not None:
delete_kwargs["namespace"] = self.upload_config.namespace
delete_resp = index.delete(**delete_kwargs)
if delete_resp:
logger.error(f"Failed to delete batch of IDs: {delete_resp}")

logger.info(
f"deleted {deleted_ids} records with metadata "
f"Deleted {deleted_ids} records with metadata "
f"{self.upload_config.record_id_key}={file_data.identifier} "
f"from pinecone index"
f"from Pinecone index"
)

@requires_dependencies(["pinecone"], extras="pinecone")
Expand All @@ -229,26 +239,28 @@ def upsert_batches_async(self, elements_dict: list[dict]):
max_batch_size=self.upload_config.batch_size,
)
)
logger.info(f"split doc with {len(elements_dict)} elements into {len(chunks)} batches")
logger.info(f"Split doc with {len(elements_dict)} elements into {len(chunks)} batches")

max_pool_threads = min(len(chunks), MAX_POOL_THREADS)
if self.upload_config.pool_threads:
pool_threads = min(self.upload_config.pool_threads, max_pool_threads)
else:
pool_threads = max_pool_threads
pool_threads = min(self.upload_config.pool_threads or max_pool_threads, max_pool_threads)
index = self.connection_config.get_index(pool_threads=pool_threads)

# Build upsert_kwargs for each chunk
upsert_kwargs_list = []
for chunk in chunks:
kwargs = {"vectors": chunk, "async_req": True}
if self.upload_config.namespace is not None:
kwargs["namespace"] = self.upload_config.namespace
upsert_kwargs_list.append(kwargs)

with index:
upsert_kwargs = [{"vectors": chunk, "async_req": True} for chunk in chunks]
if namespace := self.upload_config.namespace:
for kwargs in upsert_kwargs:
kwargs["namespace"] = namespace
async_results = [index.upsert(**kwarg) for kwarg in upsert_kwargs]
# Wait for and retrieve responses (this raises in case of error)
# Execute async upserts
async_results = [index.upsert(**kwargs) for kwargs in upsert_kwargs_list]
try:
results = [async_result.get() for async_result in async_results]
except PineconeApiException as api_error:
raise DestinationConnectionError(f"http error: {api_error}") from api_error
logger.debug(f"results: {results}")
raise DestinationConnectionError(f"HTTP error: {api_error}") from api_error
logger.debug(f"Results: {results}")

def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None:
logger.info(
Expand Down

0 comments on commit 9de0dd4

Please sign in to comment.