diff --git a/CHANGELOG.md b/CHANGELOG.md index 36a59c35d..caa9d87f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.3.12-dev3 + +### Enhancements + +* **Migrate Vectara Destination Connector to v2** + ## 0.3.12-dev2 ### Enhancements @@ -20,7 +26,6 @@ * **Create more reflective custom errors** Provide errors to indicate if the error was due to something user provided or due to a provider issue, applicable to all steps in the pipeline. * **Bypass asyncio exception grouping to return more meaningful errors from OneDrive indexer** - ## 0.3.11 ### Enhancements diff --git a/requirements/connectors/vectara.in b/requirements/connectors/vectara.in index b75587241..647f99538 100644 --- a/requirements/connectors/vectara.in +++ b/requirements/connectors/vectara.in @@ -1,3 +1,5 @@ -c ../common/constraints.txt requests +aiofiles +httpx \ No newline at end of file diff --git a/requirements/connectors/vectara.txt b/requirements/connectors/vectara.txt index 917d5e7bc..b8644ba81 100644 --- a/requirements/connectors/vectara.txt +++ b/requirements/connectors/vectara.txt @@ -8,6 +8,8 @@ idna==3.10 # via requests requests==2.32.3 # via -r ./connectors/vectara.in +aiofiles==24.1.0 + # via -r ./connectors/vectara.in urllib3==1.26.20 # via # -c ./connectors/../common/constraints.txt diff --git a/test/integration/connectors/test_vectara.py b/test/integration/connectors/test_vectara.py new file mode 100644 index 000000000..e8d687f63 --- /dev/null +++ b/test/integration/connectors/test_vectara.py @@ -0,0 +1,270 @@ +import json +import os +import time +from pathlib import Path +from typing import Generator +from uuid import uuid4 + +import pytest +import requests + +from test.integration.connectors.utils.constants import DESTINATION_TAG +from test.integration.utils import requires_env +from unstructured_ingest.v2.interfaces.file_data import FileData, SourceIdentifiers +from unstructured_ingest.v2.logger import logger +from unstructured_ingest.v2.processes.connectors.vectara import ( + CONNECTOR_TYPE as VECTARA_CONNECTOR_TYPE, +) +from unstructured_ingest.v2.processes.connectors.vectara import ( + VectaraAccessConfig, + VectaraConnectionConfig, + VectaraUploader, + VectaraUploaderConfig, + VectaraUploadStager, + VectaraUploadStagerConfig, +) + + +def validate_upload(response: dict, expected_data: dict): + element_id = expected_data["element_id"] + expected_text = expected_data["text"] + filename = expected_data["metadata"]["filename"] + filetype = expected_data["metadata"]["filetype"] + page_number = expected_data["metadata"]["page_number"] + + response = response["search_results"][0] + + assert response is not None + assert response["text"] == expected_text + assert response["part_metadata"]["element_id"] == element_id + assert response["part_metadata"]["filename"] == filename + assert response["part_metadata"]["filetype"] == filetype + assert response["part_metadata"]["page_number"] == page_number + + +@requires_env("VECTARA_OAUTH_CLIENT_ID", "VECTARA_OAUTH_SECRET", "VECTARA_CUSTOMER_ID") +def _get_jwt_token(): + """Connect to the server and get a JWT token.""" + customer_id = os.environ["VECTARA_CUSTOMER_ID"] + token_endpoint = ( + f"https://vectara-prod-{customer_id}.auth.us-west-2.amazoncognito.com/oauth2/token" + ) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + } + data = { + "grant_type": "client_credentials", + "client_id": os.environ["VECTARA_OAUTH_CLIENT_ID"], + "client_secret": os.environ["VECTARA_OAUTH_SECRET"], + } + + response = requests.post(token_endpoint, headers=headers, data=data) + response.raise_for_status() + response_json = response.json() + + return response_json.get("access_token") + + +def query_data(corpus_key: str, element_id: str) -> dict: + + url = f"https://api.vectara.io/v2/corpora/{corpus_key}/query" + + # the query below requires the corpus to have filter attributes for element_id + + data = json.dumps( + { + "query": "string", + "search": { + "metadata_filter": f"part.element_id = '{element_id}'", + "lexical_interpolation": 1, + "limit": 10, + }, + } + ) + + jwt_token = _get_jwt_token() + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {jwt_token}", + "X-source": "unstructured", + } + + response = requests.post(url, headers=headers, data=data) + response.raise_for_status() + response_json = response.json() + + return response_json + + +def create_corpora(corpus_key: str, corpus_name: str) -> None: + url = "https://api.vectara.io/v2/corpora" + data = json.dumps({"key": corpus_key, "name": corpus_name, "description": "integration test"}) + jwt_token = _get_jwt_token() + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {jwt_token}", + "X-source": "unstructured", + } + + response = requests.post(url, headers=headers, data=data) + response.raise_for_status() + + +def replace_filter_attributes(corpus_key: str) -> None: + url = f"https://api.vectara.io/v2/corpora/{corpus_key}/replace_filter_attributes" + data = json.dumps( + { + "filter_attributes": [ + {"name": "element_id", "level": "part", "indexed": True, "type": "text"} + ] + } + ) + jwt_token = _get_jwt_token() + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {jwt_token}", + "X-source": "unstructured", + } + + response = requests.post(url, headers=headers, data=data) + response.raise_for_status() + + +def delete_corpora(corpus_key: str) -> None: + url = f"https://api.vectara.io/v2/corpora/{corpus_key}" + + jwt_token = _get_jwt_token() + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {jwt_token}", + "X-source": "unstructured", + } + + response = requests.delete(url, headers=headers) + response.raise_for_status() + + +def list_corpora() -> list: + url = "https://api.vectara.io/v2/corpora?limit=100" + jwt_token = _get_jwt_token() + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {jwt_token}", + "X-source": "unstructured", + } + response = requests.get(url, headers=headers) + response.raise_for_status() + response_json = response.json() + if response_json.get("corpora"): + return [item["key"] for item in response_json.get("corpora")] + else: + return [] + + +def wait_for_ready(corpus_key: str, timeout=60, interval=2) -> None: + def is_ready_status(): + corpora_list = list_corpora() + return corpus_key in corpora_list + + start = time.time() + is_ready = is_ready_status() + while not is_ready and time.time() - start < timeout: + time.sleep(interval) + is_ready = is_ready_status() + if not is_ready: + raise TimeoutError("time out waiting for corpus to be ready") + + +def wait_for_delete(corpus_key: str, timeout=60, interval=2) -> None: + start = time.time() + while time.time() - start < timeout: + corpora_list = list_corpora() + if corpus_key not in corpora_list: + return + time.sleep(interval) + + raise TimeoutError("time out waiting for corpus to delete") + + +@pytest.fixture +def corpora_util() -> Generator[str, None, None]: + random_id = str(uuid4()).split("-")[0] + corpus_key = f"ingest-test-{random_id}" + corpus_name = "ingest-test" + logger.info(f"Creating corpus with key: {corpus_key}") + try: + create_corpora(corpus_key, corpus_name) + replace_filter_attributes(corpus_key) + wait_for_ready(corpus_key=corpus_key) + yield corpus_key + except Exception as e: + logger.error(f"failed to create corpus {corpus_key}: {e}") + finally: + logger.info(f"deleting corpus: {corpus_key}") + delete_corpora(corpus_key) + wait_for_delete(corpus_key=corpus_key) + + +@pytest.mark.asyncio +@pytest.mark.tags(VECTARA_CONNECTOR_TYPE, DESTINATION_TAG, "vectara") +@requires_env("VECTARA_OAUTH_CLIENT_ID", "VECTARA_OAUTH_SECRET", "VECTARA_CUSTOMER_ID") +async def test_vectara_destination( + upload_file: Path, tmp_path: Path, corpora_util: str, retries=30, interval=10 +): + corpus_key = corpora_util + connection_kwargs = { + "customer_id": os.environ["VECTARA_CUSTOMER_ID"], + "corpus_key": corpus_key, + } + + oauth_client_id = os.environ["VECTARA_OAUTH_CLIENT_ID"] + oauth_secret = os.environ["VECTARA_OAUTH_SECRET"] + + file_data = FileData( + source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name), + connector_type=VECTARA_CONNECTOR_TYPE, + identifier="mock-file-data", + ) + + stager_config = VectaraUploadStagerConfig(batch_size=10) + stager = VectaraUploadStager(upload_stager_config=stager_config) + new_upload_file = stager.run( + elements_filepath=upload_file, + output_dir=tmp_path, + output_filename=upload_file.name, + file_data=file_data, + ) + + uploader = VectaraUploader( + connection_config=VectaraConnectionConfig( + **connection_kwargs, + access_config=VectaraAccessConfig( + oauth_client_id=oauth_client_id, oauth_secret=oauth_secret + ), + ), + upload_config=VectaraUploaderConfig(), + ) + + with new_upload_file.open() as new_upload_fp: + elements_stager = json.load(new_upload_fp) + + if uploader.is_async(): + await uploader.run_data_async(data=elements_stager, file_data=file_data) + + with upload_file.open() as upload_fp: + elements = json.load(upload_fp) + first_element = elements[0] + + for i in range(retries): + response = query_data(corpus_key, first_element["element_id"]) + if not response["search_results"]: + time.sleep(interval) + else: + break + + validate_upload(response=response, expected_data=first_element) diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 4d8fbb4eb..3ad6f8bfe 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.12-dev2" # pragma: no cover +__version__ = "0.3.12-dev3" # pragma: no cover diff --git a/unstructured_ingest/v2/examples/vectara.py b/unstructured_ingest/v2/examples/vectara.py new file mode 100644 index 000000000..acd26c1e0 --- /dev/null +++ b/unstructured_ingest/v2/examples/vectara.py @@ -0,0 +1,54 @@ +from pathlib import Path + +from unstructured_ingest.v2.interfaces import ProcessorConfig +from unstructured_ingest.v2.logger import logger +from unstructured_ingest.v2.pipeline.pipeline import Pipeline +from unstructured_ingest.v2.processes.chunker import ChunkerConfig +from unstructured_ingest.v2.processes.connectors.local import ( + LocalConnectionConfig, + LocalDownloaderConfig, + LocalIndexerConfig, +) +from unstructured_ingest.v2.processes.connectors.vectara import ( + CONNECTOR_TYPE, + VectaraAccessConfig, + VectaraConnectionConfig, + VectaraUploaderConfig, + VectaraUploadStagerConfig, +) +from unstructured_ingest.v2.processes.embedder import EmbedderConfig +from unstructured_ingest.v2.processes.partitioner import PartitionerConfig + +base_path = Path(__file__).parent.parent.parent.parent +docs_path = base_path / "example-docs" +work_dir = base_path / "tmp_ingest" / CONNECTOR_TYPE +output_path = work_dir / "output" +download_path = work_dir / "download" + +if __name__ == "__main__": + logger.info(f"writing all content in: {work_dir.resolve()}") + Pipeline.from_configs( + context=ProcessorConfig(work_dir=str(work_dir.resolve())), + indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"), + downloader_config=LocalDownloaderConfig(download_dir=download_path), + source_connection_config=LocalConnectionConfig(), + partitioner_config=PartitionerConfig(strategy="fast"), + chunker_config=ChunkerConfig( + chunking_strategy="by_title", + chunk_include_orig_elements=False, + chunk_max_characters=1500, + chunk_multipage_sections=True, + ), + embedder_config=EmbedderConfig(embedding_provider="huggingface"), + destination_connection_config=VectaraConnectionConfig( + access_config=VectaraAccessConfig( + oauth_client_id="fill oauth_client_id", oauth_secret="fill oauth_secret" + ), + customer_id="fill customer_id", + corpus_name="fill corpus_name", + corpus_key="fill corpus_key", + token_url="fill token_url", + ), + stager_config=VectaraUploadStagerConfig(batch_size=10), + uploader_config=VectaraUploaderConfig(), + ).run() diff --git a/unstructured_ingest/v2/processes/connectors/__init__.py b/unstructured_ingest/v2/processes/connectors/__init__.py index e7c16bb01..c5e466d3e 100644 --- a/unstructured_ingest/v2/processes/connectors/__init__.py +++ b/unstructured_ingest/v2/processes/connectors/__init__.py @@ -56,6 +56,8 @@ from .sharepoint import sharepoint_source_entry from .slack import CONNECTOR_TYPE as SLACK_CONNECTOR_TYPE from .slack import slack_source_entry +from .vectara import CONNECTOR_TYPE as VECTARA_CONNECTOR_TYPE +from .vectara import vectara_destination_entry add_source_entry(source_type=ASTRA_DB_CONNECTOR_TYPE, entry=astra_db_source_entry) add_destination_entry(destination_type=ASTRA_DB_CONNECTOR_TYPE, entry=astra_db_destination_entry) @@ -103,6 +105,7 @@ add_source_entry(source_type=SLACK_CONNECTOR_TYPE, entry=slack_source_entry) +add_destination_entry(destination_type=VECTARA_CONNECTOR_TYPE, entry=vectara_destination_entry) add_source_entry(source_type=CONFLUENCE_CONNECTOR_TYPE, entry=confluence_source_entry) add_destination_entry(destination_type=REDIS_CONNECTOR_TYPE, entry=redis_destination_entry) diff --git a/unstructured_ingest/v2/processes/connectors/vectara.py b/unstructured_ingest/v2/processes/connectors/vectara.py new file mode 100644 index 000000000..440e5dbcf --- /dev/null +++ b/unstructured_ingest/v2/processes/connectors/vectara.py @@ -0,0 +1,350 @@ +import asyncio +import json +import uuid +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Mapping, Optional + +from pydantic import Field, Secret + +from unstructured_ingest.error import DestinationConnectionError +from unstructured_ingest.utils.data_prep import flatten_dict +from unstructured_ingest.utils.dep_check import requires_dependencies +from unstructured_ingest.v2.interfaces import ( + AccessConfig, + ConnectionConfig, + FileData, + Uploader, + UploaderConfig, + UploadStager, + UploadStagerConfig, +) +from unstructured_ingest.v2.logger import logger +from unstructured_ingest.v2.processes.connector_registry import DestinationRegistryEntry + +BASE_URL = "https://api.vectara.io/v2" + +CONNECTOR_TYPE = "vectara" + + +class VectaraAccessConfig(AccessConfig): + oauth_client_id: str = Field(description="Client ID") + oauth_secret: str = Field(description="Client Secret") + + +class VectaraConnectionConfig(ConnectionConfig): + access_config: Secret[VectaraAccessConfig] + customer_id: str + corpus_name: Optional[str] = None + corpus_key: Optional[str] = None + token_url: str = "https://vectara-prod-{}.auth.us-west-2.amazoncognito.com/oauth2/token" + + +class VectaraUploadStagerConfig(UploadStagerConfig): + pass + + +@dataclass +class VectaraUploadStager(UploadStager): + upload_stager_config: VectaraUploadStagerConfig = field( + default_factory=lambda: VectaraUploadStagerConfig() + ) + + @staticmethod + def conform_dict(data: dict) -> dict: + """ + Prepares dictionary in the format that Vectara requires. + See more detail in https://docs.vectara.com/docs/rest-api/create-corpus-document + + Select which meta-data fields to include and optionally map them to a new format. + remove the "metadata-" prefix from the keys + """ + metadata_map = { + "page_number": "page_number", + "data_source-url": "url", + "filename": "filename", + "filetype": "filetype", + "last_modified": "last_modified", + "element_id": "element_id", + } + md = flatten_dict(data, separator="-", flatten_lists=True) + md = {k.replace("metadata-", ""): v for k, v in md.items()} + md = {metadata_map[k]: v for k, v in md.items() if k in metadata_map} + return md + + def process_whole(self, input_file: Path, output_file: Path, file_data: FileData) -> None: + with input_file.open() as in_f: + elements_contents = json.load(in_f) + + logger.info( + f"Extending {len(elements_contents)} json elements from content in {input_file}" + ) + + conformed_elements = [ + { + "id": str(uuid.uuid4()), + "type": "core", + "metadata": { + "title": file_data.identifier, + }, + "document_parts": [ + { + "text": element.pop("text", None), + "metadata": self.conform_dict(data=element), + } + for element in elements_contents + ], + } + ] + + with open(output_file, "w") as out_f: + json.dump(conformed_elements, out_f, indent=2) + + +class VectaraUploaderConfig(UploaderConfig): + pass + + +@dataclass +class VectaraUploader(Uploader): + + connector_type: str = CONNECTOR_TYPE + upload_config: VectaraUploaderConfig + connection_config: VectaraConnectionConfig + _jwt_token: Optional[str] = field(init=False, default=None) + _jwt_token_expires_ts: Optional[float] = field(init=False, default=None) + + def is_async(self) -> bool: + return True + + def precheck(self) -> None: + try: + self._check_connection_and_corpora() + except Exception as e: + logger.error(f"Failed to validate connection {e}", exc_info=True) + raise DestinationConnectionError(f"failed to validate connection: {e}") + + @property + async def jwt_token_async(self) -> str: + if not self._jwt_token or self._jwt_token_expires_ts - datetime.now().timestamp() <= 60: + self._jwt_token = await self._get_jwt_token_async() + return self._jwt_token + + @property + def jwt_token(self) -> str: + if not self._jwt_token or self._jwt_token_expires_ts - datetime.now().timestamp() <= 60: + self._jwt_token = self._get_jwt_token() + return self._jwt_token + + # Get Oauth2 JWT token + @requires_dependencies(["httpx"], extras="vectara") + async def _get_jwt_token_async(self) -> str: + import httpx + + """Connect to the server and get a JWT token.""" + token_endpoint = self.connection_config.token_url.format(self.connection_config.customer_id) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + } + data = { + "grant_type": "client_credentials", + "client_id": self.connection_config.access_config.get_secret_value().oauth_client_id, + "client_secret": self.connection_config.access_config.get_secret_value().oauth_secret, + } + + async with httpx.AsyncClient() as client: + response = await client.post(token_endpoint, headers=headers, data=data) + response.raise_for_status() + response_json = response.json() + + request_time = datetime.now().timestamp() + self._jwt_token_expires_ts = request_time + response_json.get("expires_in") + + return response_json.get("access_token") + + # Get Oauth2 JWT token + @requires_dependencies(["httpx"], extras="vectara") + def _get_jwt_token(self) -> str: + import httpx + + """Connect to the server and get a JWT token.""" + token_endpoint = self.connection_config.token_url.format(self.connection_config.customer_id) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + } + data = { + "grant_type": "client_credentials", + "client_id": self.connection_config.access_config.get_secret_value().oauth_client_id, + "client_secret": self.connection_config.access_config.get_secret_value().oauth_secret, + } + + with httpx.Client() as client: + response = client.post(token_endpoint, headers=headers, data=data) + response.raise_for_status() + response_json = response.json() + + request_time = datetime.now().timestamp() + self._jwt_token_expires_ts = request_time + response_json.get("expires_in") + + return response_json.get("access_token") + + @DestinationConnectionError.wrap + def _check_connection_and_corpora(self) -> None: + """ + Check the connection for Vectara and validate corpus exists. + - If more than one corpus with the same name exists - raise error + - If exactly one corpus exists with this name - use it. + - If does not exist - raise error. + """ + # Get token if not already set + self.jwt_token + + _, list_corpora_response = self._request( + http_method="GET", + endpoint="corpora", + ) + + if self.connection_config.corpus_name: + possible_corpora_keys_names_map = { + corpus.get("key"): corpus.get("name") + for corpus in list_corpora_response.get("corpora") + if corpus.get("name") == self.connection_config.corpus_name + } + + if len(possible_corpora_keys_names_map) > 1: + raise ValueError( + f"Multiple Corpus exist with name {self.connection_config.corpus_name} in dest." + ) + if len(possible_corpora_keys_names_map) == 1: + if not self.connection_config.corpus_key: + self.connection_config.corpus_key = list( + possible_corpora_keys_names_map.keys() + )[0] + elif ( + self.connection_config.corpus_key + != list(possible_corpora_keys_names_map.keys())[0] + ): + raise ValueError("Corpus key does not match provided corpus name.") + else: + raise ValueError( + f"No Corpora exist with name {self.connection_config.corpus_name} in dest." + ) + + @requires_dependencies(["httpx"], extras="vectara") + async def _async_request( + self, + endpoint: str, + http_method: str = "POST", + params: Mapping[str, Any] = None, + data: Mapping[str, Any] = None, + ) -> tuple[bool, dict]: + import httpx + + url = f"{BASE_URL}/{endpoint}" + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {await self.jwt_token_async}", + "X-source": "unstructured", + } + + async with httpx.AsyncClient() as client: + response = await client.request( + method=http_method, url=url, headers=headers, params=params, json=data + ) + response.raise_for_status() + return response.json() + + @requires_dependencies(["httpx"], extras="vectara") + def _request( + self, + endpoint: str, + http_method: str = "POST", + params: Mapping[str, Any] = None, + data: Mapping[str, Any] = None, + ) -> tuple[bool, dict]: + import httpx + + url = f"{BASE_URL}/{endpoint}" + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {self.jwt_token}", + "X-source": "unstructured", + } + + with httpx.Client() as client: + response = client.request( + method=http_method, url=url, headers=headers, params=params, json=data + ) + response.raise_for_status() + return response.json() + + async def _delete_doc(self, doc_id: str) -> tuple[bool, dict]: + """ + Delete a document from the Vectara corpus. + """ + + return await self._async_request( + endpoint=f"corpora/{self.connection_config.corpus_key}/documents/{doc_id}", + http_method="DELETE", + ) + + async def _index_document(self, document: Dict[str, Any]) -> None: + """ + Index a document (by uploading it to the Vectara corpus) from the document dictionary + """ + + logger.debug( + f"Indexing document {document['id']} to corpus key {self.connection_config.corpus_key}" + ) + + try: + result = await self._async_request( + endpoint=f"corpora/{self.connection_config.corpus_key}/documents", data=document + ) + except Exception as e: + logger.error(f"exception {e} while indexing document {document['id']}") + return + + if ( + "messages" in result + and result["messages"] + and ( + "ALREADY_EXISTS" in result["messages"] + or ( + "CONFLICT: Indexing doesn't support updating documents." + in result["messages"][0] + ) + ) + ): + logger.info(f"document {document['id']} already exists, re-indexing") + await self._delete_doc(document["id"]) + await self._async_request( + endpoint=f"corpora/{self.connection_config.corpus_key}/documents", data=document + ) + return + + logger.info(f"indexing document {document['id']} succeeded") + + async def run_data_async( + self, + data: list[dict], + file_data: FileData, + **kwargs: Any, + ) -> None: + + logger.info(f"inserting / updating {len(data)} documents to Vectara ") + await asyncio.gather(*(self._index_document(vdoc) for vdoc in data)) + + +vectara_destination_entry = DestinationRegistryEntry( + connection_config=VectaraConnectionConfig, + uploader=VectaraUploader, + uploader_config=VectaraUploaderConfig, + upload_stager=VectaraUploadStager, + upload_stager_config=VectaraUploadStagerConfig, +)