Skip to content

Commit

Permalink
feat/vectara-destination-to-v2 (#158)
Browse files Browse the repository at this point in the history
* vectara v2 still work in progress

* takingn errors out

* fix lint

* linting

* order imports

* ruff --fix

* fix version

* fixing PR issues by Roman

* ruff

* wrking get metadata on stager

* remove coments

* changing conform dict to the correct way and including vdoc on stager part

* taking path out from file by Potter chat to eave only url, even though if empty

* fix potter comments

* change version

* version update

* make tidy

* add secret to access config

* make tidy

* .

* get secret value

* change version

* add async

* change import

* vectara requirements

* make tidy

* mt

* fix PR comments

* vectara example to be able to debug async

* no wait worn

* improving logging

* precheck without async.

* mke tidy

* lint

* linting

* some fixes  vectara

* migrate to vectara v2 api

* add integration test for vectara

* fix syntax

* divide elements to batches

* Add retry logic to document query

* change integration test to regular function

* clean up corpus after integration test

* fix syntax error

* update connection config in example

* Remove unnecessary var

* remove batch_size because Vectara api does not support batch indexing documents

* remove asyncio.run to avoid conflict with async context

* update stager to reflect new structure

* update uploader to reflect new structure

* fix syntax

---------

Co-authored-by: Bryan Chen <[email protected]>
  • Loading branch information
guilherme-uns and bryan-unstructured authored Dec 20, 2024
1 parent a0923db commit 5914bd9
Show file tree
Hide file tree
Showing 8 changed files with 688 additions and 2 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.3.12-dev3

### Enhancements

* **Migrate Vectara Destination Connector to v2**

## 0.3.12-dev2

### Enhancements
Expand All @@ -20,7 +26,6 @@
* **Create more reflective custom errors** Provide errors to indicate if the error was due to something user provided or due to a provider issue, applicable to all steps in the pipeline.
* **Bypass asyncio exception grouping to return more meaningful errors from OneDrive indexer**


## 0.3.11

### Enhancements
Expand Down
2 changes: 2 additions & 0 deletions requirements/connectors/vectara.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-c ../common/constraints.txt

requests
aiofiles
httpx
2 changes: 2 additions & 0 deletions requirements/connectors/vectara.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ idna==3.10
# via requests
requests==2.32.3
# via -r ./connectors/vectara.in
aiofiles==24.1.0
# via -r ./connectors/vectara.in
urllib3==1.26.20
# via
# -c ./connectors/../common/constraints.txt
Expand Down
270 changes: 270 additions & 0 deletions test/integration/connectors/test_vectara.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import json
import os
import time
from pathlib import Path
from typing import Generator
from uuid import uuid4

import pytest
import requests

from test.integration.connectors.utils.constants import DESTINATION_TAG
from test.integration.utils import requires_env
from unstructured_ingest.v2.interfaces.file_data import FileData, SourceIdentifiers
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.processes.connectors.vectara import (
CONNECTOR_TYPE as VECTARA_CONNECTOR_TYPE,
)
from unstructured_ingest.v2.processes.connectors.vectara import (
VectaraAccessConfig,
VectaraConnectionConfig,
VectaraUploader,
VectaraUploaderConfig,
VectaraUploadStager,
VectaraUploadStagerConfig,
)


def validate_upload(response: dict, expected_data: dict):
element_id = expected_data["element_id"]
expected_text = expected_data["text"]
filename = expected_data["metadata"]["filename"]
filetype = expected_data["metadata"]["filetype"]
page_number = expected_data["metadata"]["page_number"]

response = response["search_results"][0]

assert response is not None
assert response["text"] == expected_text
assert response["part_metadata"]["element_id"] == element_id
assert response["part_metadata"]["filename"] == filename
assert response["part_metadata"]["filetype"] == filetype
assert response["part_metadata"]["page_number"] == page_number


@requires_env("VECTARA_OAUTH_CLIENT_ID", "VECTARA_OAUTH_SECRET", "VECTARA_CUSTOMER_ID")
def _get_jwt_token():
"""Connect to the server and get a JWT token."""
customer_id = os.environ["VECTARA_CUSTOMER_ID"]
token_endpoint = (
f"https://vectara-prod-{customer_id}.auth.us-west-2.amazoncognito.com/oauth2/token"
)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
data = {
"grant_type": "client_credentials",
"client_id": os.environ["VECTARA_OAUTH_CLIENT_ID"],
"client_secret": os.environ["VECTARA_OAUTH_SECRET"],
}

response = requests.post(token_endpoint, headers=headers, data=data)
response.raise_for_status()
response_json = response.json()

return response_json.get("access_token")


def query_data(corpus_key: str, element_id: str) -> dict:

url = f"https://api.vectara.io/v2/corpora/{corpus_key}/query"

# the query below requires the corpus to have filter attributes for element_id

data = json.dumps(
{
"query": "string",
"search": {
"metadata_filter": f"part.element_id = '{element_id}'",
"lexical_interpolation": 1,
"limit": 10,
},
}
)

jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}

response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
response_json = response.json()

return response_json


def create_corpora(corpus_key: str, corpus_name: str) -> None:
url = "https://api.vectara.io/v2/corpora"
data = json.dumps({"key": corpus_key, "name": corpus_name, "description": "integration test"})
jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}

response = requests.post(url, headers=headers, data=data)
response.raise_for_status()


def replace_filter_attributes(corpus_key: str) -> None:
url = f"https://api.vectara.io/v2/corpora/{corpus_key}/replace_filter_attributes"
data = json.dumps(
{
"filter_attributes": [
{"name": "element_id", "level": "part", "indexed": True, "type": "text"}
]
}
)
jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}

response = requests.post(url, headers=headers, data=data)
response.raise_for_status()


def delete_corpora(corpus_key: str) -> None:
url = f"https://api.vectara.io/v2/corpora/{corpus_key}"

jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}

response = requests.delete(url, headers=headers)
response.raise_for_status()


def list_corpora() -> list:
url = "https://api.vectara.io/v2/corpora?limit=100"
jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}
response = requests.get(url, headers=headers)
response.raise_for_status()
response_json = response.json()
if response_json.get("corpora"):
return [item["key"] for item in response_json.get("corpora")]
else:
return []


def wait_for_ready(corpus_key: str, timeout=60, interval=2) -> None:
def is_ready_status():
corpora_list = list_corpora()
return corpus_key in corpora_list

start = time.time()
is_ready = is_ready_status()
while not is_ready and time.time() - start < timeout:
time.sleep(interval)
is_ready = is_ready_status()
if not is_ready:
raise TimeoutError("time out waiting for corpus to be ready")


def wait_for_delete(corpus_key: str, timeout=60, interval=2) -> None:
start = time.time()
while time.time() - start < timeout:
corpora_list = list_corpora()
if corpus_key not in corpora_list:
return
time.sleep(interval)

raise TimeoutError("time out waiting for corpus to delete")


@pytest.fixture
def corpora_util() -> Generator[str, None, None]:
random_id = str(uuid4()).split("-")[0]
corpus_key = f"ingest-test-{random_id}"
corpus_name = "ingest-test"
logger.info(f"Creating corpus with key: {corpus_key}")
try:
create_corpora(corpus_key, corpus_name)
replace_filter_attributes(corpus_key)
wait_for_ready(corpus_key=corpus_key)
yield corpus_key
except Exception as e:
logger.error(f"failed to create corpus {corpus_key}: {e}")
finally:
logger.info(f"deleting corpus: {corpus_key}")
delete_corpora(corpus_key)
wait_for_delete(corpus_key=corpus_key)


@pytest.mark.asyncio
@pytest.mark.tags(VECTARA_CONNECTOR_TYPE, DESTINATION_TAG, "vectara")
@requires_env("VECTARA_OAUTH_CLIENT_ID", "VECTARA_OAUTH_SECRET", "VECTARA_CUSTOMER_ID")
async def test_vectara_destination(
upload_file: Path, tmp_path: Path, corpora_util: str, retries=30, interval=10
):
corpus_key = corpora_util
connection_kwargs = {
"customer_id": os.environ["VECTARA_CUSTOMER_ID"],
"corpus_key": corpus_key,
}

oauth_client_id = os.environ["VECTARA_OAUTH_CLIENT_ID"]
oauth_secret = os.environ["VECTARA_OAUTH_SECRET"]

file_data = FileData(
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
connector_type=VECTARA_CONNECTOR_TYPE,
identifier="mock-file-data",
)

stager_config = VectaraUploadStagerConfig(batch_size=10)
stager = VectaraUploadStager(upload_stager_config=stager_config)
new_upload_file = stager.run(
elements_filepath=upload_file,
output_dir=tmp_path,
output_filename=upload_file.name,
file_data=file_data,
)

uploader = VectaraUploader(
connection_config=VectaraConnectionConfig(
**connection_kwargs,
access_config=VectaraAccessConfig(
oauth_client_id=oauth_client_id, oauth_secret=oauth_secret
),
),
upload_config=VectaraUploaderConfig(),
)

with new_upload_file.open() as new_upload_fp:
elements_stager = json.load(new_upload_fp)

if uploader.is_async():
await uploader.run_data_async(data=elements_stager, file_data=file_data)

with upload_file.open() as upload_fp:
elements = json.load(upload_fp)
first_element = elements[0]

for i in range(retries):
response = query_data(corpus_key, first_element["element_id"])
if not response["search_results"]:
time.sleep(interval)
else:
break

validate_upload(response=response, expected_data=first_element)
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.12-dev2" # pragma: no cover
__version__ = "0.3.12-dev3" # pragma: no cover
54 changes: 54 additions & 0 deletions unstructured_ingest/v2/examples/vectara.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from pathlib import Path

from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured_ingest.v2.processes.connectors.vectara import (
CONNECTOR_TYPE,
VectaraAccessConfig,
VectaraConnectionConfig,
VectaraUploaderConfig,
VectaraUploadStagerConfig,
)
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

base_path = Path(__file__).parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest" / CONNECTOR_TYPE
output_path = work_dir / "output"
download_path = work_dir / "download"

if __name__ == "__main__":
logger.info(f"writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(
chunking_strategy="by_title",
chunk_include_orig_elements=False,
chunk_max_characters=1500,
chunk_multipage_sections=True,
),
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
destination_connection_config=VectaraConnectionConfig(
access_config=VectaraAccessConfig(
oauth_client_id="fill oauth_client_id", oauth_secret="fill oauth_secret"
),
customer_id="fill customer_id",
corpus_name="fill corpus_name",
corpus_key="fill corpus_key",
token_url="fill token_url",
),
stager_config=VectaraUploadStagerConfig(batch_size=10),
uploader_config=VectaraUploaderConfig(),
).run()
3 changes: 3 additions & 0 deletions unstructured_ingest/v2/processes/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
from .sharepoint import sharepoint_source_entry
from .slack import CONNECTOR_TYPE as SLACK_CONNECTOR_TYPE
from .slack import slack_source_entry
from .vectara import CONNECTOR_TYPE as VECTARA_CONNECTOR_TYPE
from .vectara import vectara_destination_entry

add_source_entry(source_type=ASTRA_DB_CONNECTOR_TYPE, entry=astra_db_source_entry)
add_destination_entry(destination_type=ASTRA_DB_CONNECTOR_TYPE, entry=astra_db_destination_entry)
Expand Down Expand Up @@ -103,6 +105,7 @@

add_source_entry(source_type=SLACK_CONNECTOR_TYPE, entry=slack_source_entry)

add_destination_entry(destination_type=VECTARA_CONNECTOR_TYPE, entry=vectara_destination_entry)
add_source_entry(source_type=CONFLUENCE_CONNECTOR_TYPE, entry=confluence_source_entry)

add_destination_entry(destination_type=REDIS_CONNECTOR_TYPE, entry=redis_destination_entry)
Loading

0 comments on commit 5914bd9

Please sign in to comment.