Skip to content

Commit

Permalink
[ROCK-13112] Add opensearch chain example (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
mleida-stratio authored Nov 15, 2024
1 parent 1c57dcc commit a785c73
Show file tree
Hide file tree
Showing 21 changed files with 6,957 additions and 8 deletions.
4 changes: 2 additions & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ hose {
"sonar.exclusions": "*/tests/**,*/scripts/**,*/pytest-coverage.xml",
"sonar.tests": ".",
"sonar.test.inclusions": "*/tests/**",
"sonar.python.coverage.reportPaths": "basic-actor-chain-example/pytest-coverage.xml,virtualizer-chain-example/pytest-coverage.xml",
"sonar.python.pylint.reportPaths": "basic-actor-chain-example/pylint-report.txt,virtualizer-chain-example/pylint-report.txt",
"sonar.python.coverage.reportPaths": "basic-actor-chain-example/pytest-coverage.xml,virtualizer-chain-example/pytest-coverage.xml,opensearch-chain-example/pytest-coverage.xml",
"sonar.python.pylint.reportPaths": "basic-actor-chain-example/pylint-report.txt,virtualizer-chain-example/pylint-report.txt,opensearch-chain-example/pylint-report.txt",
"sonar.scm.disabled": "true"
]
)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ There are four basic chains that you can use as a starting point to create your
# TODO: Add the links to the chains once merged
* [Basic Actor Chain](basic-actor-chain-example/README.md): Examples of a chain that implements a basic actor and invokes it.
* [Stratio Virtualizer Chain](./virtualizer-chain-example/README.md): Example of a GenAI chain that connects to the Stratio Virtualizer service to perform a query.
* [Opensearch Chain](./README.md): Example of a GenAI chain that connects to Opensearch service and processes the result of a search.
* [Opensearch Chain](opensearch-chain-example/README.md): Example of a GenAI chain that connects to Opensearch service and processes the result of a search.
* [Memory Chain](./README.md): Example of a GenAI chain that persist the user's conversation in order to remember the context.

Please check the readme of each chain for more information.
Expand Down
4 changes: 2 additions & 2 deletions basic-actor-chain-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ An example of request body for the invoke POST is the following:
"metadata": {
"__genai_state": {
"client_auth_type": "mtls",
"client_user_id": "Alice",
"client_tenant": "s000001"
"client_user_id": "your-user",
"client_tenant": "your-tenant"
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions basic-actor-chain-example/basic_actor_chain_example/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def main():
"metadata": {
"__genai_state": {
"client_auth_type": "mtls",
"client_user_id": "Alice",
"client_tenant": "s000001"
"client_user_id": "your-user",
"client_tenant": "your-tenant"
}
}
}
Expand All @@ -49,5 +49,6 @@ def main():


if __name__ == "__main__":
# Before running this script, refer to the README.md file to know how to set up your environment correctly in order to communicate with the Stratio GenAI Gateway
# Before running this script, refer to the README.md file to know how to set up
# your environment correctly in order to communicate with the Stratio GenAI Gateway
main()
90 changes: 90 additions & 0 deletions opensearch-chain-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Example chain that connects to Opensearch service

This is an example of a GenAI chain that connects to Opensearch service and processes the result of a search.

For the specific case of this example chain, we developed an OpenSearch utility service that connects to an OpenSearch service and performs a search on a specific index and table.

In ths example, we assume that an external process created the index using the name of the database
and added documents by analyzing the data in the tables and indexing the selected columns with all their possible values,
creating document with the following fields:

* _table_: the table name,
* _column_: the column name,
* _value_: the value of the column.

the query coded in the service, returns the first documents that matches the search value in a specific table and column.
The chain will present the first of these value as the result of the chain if a result is found.

## Local deployment

Please refer to the main [README.md](../README.md) for instructions on how to set up the development environment.

You need to specify the Opensearch that the chain will connect to. This is normally specified in the deployment configuration of the chain when registering it in *Stratio GenAI API*. While developing locally, you run your chain in a standalone server which is started by running the the `main.py` script. This scripts obtains the Opensearch URL from the `OPENSEARCH_URL` environment variable, so you should set it with correct value before starting the chain. If accesssing Opensearch through the *Stratio GenAI Developer Proxy*, it would be something like:
```
$ export OPENSEARCH_URL="https://genai-developer-proxy-loadbalancer.s000001-genai.k8s.fifteen.labs.stratio.com:8080/service/opensearch"
```
Also, make sure that you have set the environment variables with your certificates so that the Vault client does not try to connect to Vault:
```
$ export VAULT_LOCAL_CLIENT_CERT="/path/to/cert.crt"
$ export VAULT_LOCAL_CLIENT_KEY="/path/to/private-key.key"
$ export VAULT_LOCAL_CA_CERTS="/path/to/ca-cert.crt"
```

Finally, you can now run the chain locally by calling the `main.py` script in the poetry environment

```
$ poetry run python opensearch_chain_example/main.py
```

In case you want to debug the chain, you can run it in PyCharm as explained in the main [README.md](../README.md) file.

Once started, the chain will expose a swagger UI in the following URL: `http://0.0.0.0:8080/`.

You can test your chain either via the swagger UI exposed by the local chain server, or with curl.

An example of request body for the invoke POST is the following:

```json
{
"input": {
"search_value":"value_to_search",
"collection_name":"index_name",
"table_value":"table_name",
"column_value":"column_name"
},
"config": {
"metadata": {
"__genai_state": {
"client_auth_type": "mtls",
"client_user_id": "your-user",
"client_tenant": "your-tenant"
}
}
}
}
```

Note that the values in the example provided should be adapted to the data present in the OpenSearch service.

The `"config"` key with the extra metadata is normally added by GenAI API before passing the input to the chain,
but while developing locally you should add it by hand.

### Run tests

Run in PyCharm:

* Execute the /tests folder. It works in debug mode too.

Run in the terminal:

* Execute `poetry run pytest`
* Only unit test: `poetry run pytest tests/unit`
* Only integration test: `poetry run pytest tests/integration`.

### Code quality

Run in the terminal:

* To format the code execute `poetry run black ./`
* To lint the code execute `poetry run pylint './**/'`
* To check the types execute `poetry run mypy ./`
Empty file.
160 changes: 160 additions & 0 deletions opensearch-chain-example/opensearch_chain_example/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
© 2024 Stratio Big Data Inc., Sucursal en España. All rights reserved.
This software – including all its source code – contains proprietary
information of Stratio Big Data Inc., Sucursal en España and
may not be revealed, sold, transferred, modified, distributed or
otherwise made available, licensed or sublicensed to third parties;
nor reverse engineered, disassembled or decompiled, without express
written authorization from Stratio Big Data Inc., Sucursal en España.
"""
from abc import ABC
from typing import Optional

from genai_core.chain.base import BaseGenAiChain
from genai_core.clients.vault.vault_client import VaultClient
from genai_core.logger.logger import log
from genai_core.runnables.common_runnables import runnable_extract_genai_auth
from langchain_core.runnables import Runnable, chain

from opensearch_chain_example.constants.constants import (
OPENSEARCH_SEARCH_VALUE_KEY,
OPENSEARCH_COLLECTION_NAME_KEY,
OPENSEARCH_TABLE_VALUE_KEY,
OPENSEARCH_COLUMN_VALUE_KEY,
OPENSEARCH_RESULT_KEY,
OPENSEARCH_NO_RESULTS,
)
from opensearch_chain_example.services.opensearch_service import OpenSearchService


class OpenSearchChain(BaseGenAiChain, ABC):
"""
Example of a GenAI Chain that interacts with OpenSearch service to obtain and process the result of a search.
"""

def __init__(
self, opensearch_url: Optional[str] = None, opensearch_min_score: int = 5
):
log.info("Preparing OpenSearch Example chain")
self.opensearch_min_score = opensearch_min_score
self.opensearch_service = self._init_opensearch(opensearch_url)
log.info("OpenSearch Example chain ready!")

def _init_opensearch(self, opensearch_url: str) -> OpenSearchService:
"""
This method initializes the OpenSearch service client to interact with an OpenSearch
service.
Args:
opensearch_url (str): The URL of the OpenSearch instance.
Returns:
OpenSearchService: The OpenSearch service client instance.
"""
# get certificates
# get the needed certificates to connect to OpenSearch
cert, key, ca = self._init_credentials()
# Init OpenSearch
try:
log.info(f"Trying to connect with OpenSearch {opensearch_url}...")
opensearch_service = OpenSearchService(
opensearch_url=opensearch_url,
ca_certs=ca,
client_cert=cert,
client_key=key,
)
assert opensearch_service.client.indices.get_alias("*")
log.info(f"Connected with OpenSearch")
except Exception as error:
error_msg = f"Unable to init OpenSearch Chain. Unable to validate connection with OpenSearch. Error: {error}"
log.error(error_msg)
raise RuntimeError(error_msg)
return opensearch_service

def chain(self) -> Runnable:
"""
Returns a Langchain Runnable with an invoke method. When invoking the chain,
the body of the request will be passed to the invoke method.
:return: A Runnable instance representing the chain.
"""

@chain
def _ask_opensearch(chain_data: dict) -> dict:
"""
This method queries the OpenSearch service with the user request and returns the response.
The response is stored in the chain_data dictionary with the key 'opensearch_response'.
Args:
chain_data (dict): The input data for the chain.
Returns:
dict: The chain data with the response from the OpenSearch service.
"""
try:
search_value = chain_data[OPENSEARCH_SEARCH_VALUE_KEY]
collection_name = chain_data[OPENSEARCH_COLLECTION_NAME_KEY]
table_value = chain_data[OPENSEARCH_TABLE_VALUE_KEY]
column_value = chain_data[OPENSEARCH_COLUMN_VALUE_KEY]

log.info(
f"Searching for value '{search_value}' in {collection_name},{table_value},{column_value}",
chain_data,
)
search_result = self.opensearch_service.search_filter_values(
collection_name,
table_value,
column_value,
search_value,
self.opensearch_min_score,
)
if len(search_result["hits"]["hits"]) == 0:
log.info(
OPENSEARCH_NO_RESULTS,
chain_data,
)
chain_data[OPENSEARCH_RESULT_KEY] = OPENSEARCH_NO_RESULTS
else:
first_value = search_result["hits"]["hits"][0]["_source"]["value"]
chain_data[
OPENSEARCH_RESULT_KEY
] = f"For the requested search value '{search_value}' in the column '{column_value}' of the table '{table_value}', the first result is '{first_value}'."

except Exception as e:
log.error(
f"Unable to search index. Exception: {e}",
chain_data,
)
chain_data[
OPENSEARCH_RESULT_KEY
] = f"Unable to search index. Exception: {e}"
return chain_data

return runnable_extract_genai_auth() | _ask_opensearch

@staticmethod
def _init_credentials():
"""
This method obtains and sets the certificates needed to access OpenSearch service.
In production, the certificates are obtained from Vault, but for local development, you can
define the following environment variables and the VaultClient will use those to obtain the
certificates instead of trying to access Vault:
VAULT_LOCAL_CLIENT_CERT
VAULT_LOCAL_CLIENT_KEY
VAULT_LOCAL_CA_CERTS
For the production case, where the chain is executed inside GenAI API, you don't need to
explicitly pass the Vault connection details (hot, port and token) since these fields are
inferred from environment variables that are automatically set by GenAI API
"""
try:
vault = VaultClient()
cert, key = vault.get_service_certificate_pem_paths()
ca = vault.get_ca_bundle_pem_path()
return cert, key, ca
except Exception as e:
error_msg = f"Unable to init OpenSearch Chain. Unable to init vault and load credentials. Error: {e}"
log.error(error_msg)
raise RuntimeError(error_msg) from e
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
© 2024 Stratio Big Data Inc., Sucursal en España. All rights reserved.
This software – including all its source code – contains proprietary
information of Stratio Big Data Inc., Sucursal en España and
may not be revealed, sold, transferred, modified, distributed or
otherwise made available, licensed or sublicensed to third parties;
nor reverse engineered, disassembled or decompiled, without express
written authorization from Stratio Big Data Inc., Sucursal en España.
"""
OPENSEARCH_SEARCH_VALUE_KEY = "search_value"
OPENSEARCH_COLLECTION_NAME_KEY = "collection_name"
OPENSEARCH_TABLE_VALUE_KEY = "table_value"
OPENSEARCH_COLUMN_VALUE_KEY = "column_value"
OPENSEARCH_RESULT_KEY = "opensearch_result"
OPENSEARCH_NO_RESULTS = "No results found"
63 changes: 63 additions & 0 deletions opensearch-chain-example/opensearch_chain_example/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
© 2024 Stratio Big Data Inc., Sucursal en España. All rights reserved.
This software – including all its source code – contains proprietary
information of Stratio Big Data Inc., Sucursal en España and
may not be revealed, sold, transferred, modified, distributed or
otherwise made available, licensed or sublicensed to third parties;
nor reverse engineered, disassembled or decompiled, without express
written authorization from Stratio Big Data Inc., Sucursal en España.
"""
import os

from genai_core.server.server import GenAiServer


def main():
"""
Starts a stand-alone GenAI-api-like server with the chain loaded so that in can be easily executed locally.
Note that the chain will need access to a OpenSearch server, which should be accessible from your local machine.
The OpenSearchService class provided in this example is a simple service to interact with an OpenSearch instance
and should be adapted to your specific use case.
The url of the OpenSearch instance should be provided in the OPENSEARCH_URL environment variable (see README.md for more information).
An example of json body that work with our sample chain, to send in invoke POST is
```json
{
"input": {
"search_value":"Scott",
"collection_name":"semantic_banking_customer_product360",
"table_value":"customer",
"column_value":"Full_Name"
},
"config": {
"metadata": {
"__genai_state": {
"client_auth_type": "mtls",
"client_user_id": "your-user",
"client_tenant": "your-tenant"
}
}
}
}
```
The "config" -> "metadata" -> "__genai_state" is only needed to test while developing locally.
In a real environment GenAI API adds automatically that fields from the auth info before
passing the data to the chain
"""
app = GenAiServer(
module_name="opensearch_chain_example.chain",
class_name="OpenSearchChain",
config={
# OPENSEARCH_URL environment variable need to be set
# with the OpenSearch service url (see README.me for more information):
"opensearch_url": os.getenv("OPENSEARCH_URL"),
"opensearch_min_score": 30,
},
)
app.start_server()


if __name__ == "__main__":
# Before running this script, refer to the README.md file to know how to set up
# your environment correctly in order to communicate with the OpenSearch service
main()
Empty file.
Loading

0 comments on commit a785c73

Please sign in to comment.