Skip to content

Commit

Permalink
Factorize AstraDB components constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Jan 31, 2024
1 parent c6724a3 commit 532541c
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 167 deletions.
82 changes: 21 additions & 61 deletions libs/community/langchain_community/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@
from langchain_core.outputs import ChatGeneration, Generation
from langchain_core.utils import get_from_env

from langchain_community.utilities.astradb import AstraDBEnvironment
from langchain_community.vectorstores.redis import Redis as RedisVectorstore

logger = logging.getLogger(__file__)

if TYPE_CHECKING:
import momento
from astrapy.db import AstraDB
from cassandra.cluster import Session as CassandraSession


Expand Down Expand Up @@ -1262,7 +1264,7 @@ def __init__(
collection_name: str = ASTRA_DB_CACHE_DEFAULT_COLLECTION_NAME,
token: Optional[str] = None,
api_endpoint: Optional[str] = None,
astra_db_client: Optional[Any] = None, # 'astrapy.db.AstraDB' if passed
astra_db_client: Optional[AstraDB] = None,
namespace: Optional[str] = None,
):
"""
Expand All @@ -1278,39 +1280,17 @@ def __init__(
namespace (Optional[str]): namespace (aka keyspace) where the
collection is created. Defaults to the database's "default namespace".
"""
try:
from astrapy.db import (
AstraDB as LibAstraDB,
)
except (ImportError, ModuleNotFoundError):
raise ImportError(
"Could not import a recent astrapy python package. "
"Please install it with `pip install --upgrade astrapy`."
)
# Conflicting-arg checks:
if astra_db_client is not None:
if token is not None or api_endpoint is not None:
raise ValueError(
"You cannot pass 'astra_db_client' to AstraDB if passing "
"'token' and 'api_endpoint'."
)

self.collection_name = collection_name
self.token = token
self.api_endpoint = api_endpoint
self.namespace = namespace

if astra_db_client is not None:
self.astra_db = astra_db_client
else:
self.astra_db = LibAstraDB(
token=self.token,
api_endpoint=self.api_endpoint,
namespace=self.namespace,
)
astra_env = AstraDBEnvironment(
token=token,
api_endpoint=api_endpoint,
astra_db_client=astra_db_client,
namespace=namespace,
)
self.astra_db = astra_env.astra_db
self.collection = self.astra_db.create_collection(
collection_name=self.collection_name,
collection_name=collection_name,
)
self.collection_name = collection_name

@staticmethod
def _make_id(prompt: str, llm_string: str) -> str:
Expand Down Expand Up @@ -1364,7 +1344,7 @@ def delete_through_llm(
def delete(self, prompt: str, llm_string: str) -> None:
"""Evict from cache if there's an entry."""
doc_id = self._make_id(prompt, llm_string)
return self.collection.delete_one(doc_id)
self.collection.delete_one(doc_id)

def clear(self, **kwargs: Any) -> None:
"""Clear cache. This is for all LLMs at once."""
Expand Down Expand Up @@ -1395,7 +1375,7 @@ def __init__(
collection_name: str = ASTRA_DB_CACHE_DEFAULT_COLLECTION_NAME,
token: Optional[str] = None,
api_endpoint: Optional[str] = None,
astra_db_client: Optional[Any] = None, # 'astrapy.db.AstraDB' if passed
astra_db_client: Optional[AstraDB] = None,
namespace: Optional[str] = None,
embedding: Embeddings,
metric: Optional[str] = None,
Expand Down Expand Up @@ -1423,22 +1403,13 @@ def __init__(
The default score threshold is tuned to the default metric.
Tune it carefully yourself if switching to another distance metric.
"""
try:
from astrapy.db import (
AstraDB as LibAstraDB,
)
except (ImportError, ModuleNotFoundError):
raise ImportError(
"Could not import a recent astrapy python package. "
"Please install it with `pip install --upgrade astrapy`."
)
# Conflicting-arg checks:
if astra_db_client is not None:
if token is not None or api_endpoint is not None:
raise ValueError(
"You cannot pass 'astra_db_client' to AstraDB if passing "
"'token' and 'api_endpoint'."
)
astra_env = AstraDBEnvironment(
token=token,
api_endpoint=api_endpoint,
astra_db_client=astra_db_client,
namespace=namespace,
)
self.astra_db = astra_env.astra_db

self.embedding = embedding
self.metric = metric
Expand All @@ -1457,18 +1428,7 @@ def _cache_embedding(text: str) -> List[float]:
self.embedding_dimension = self._get_embedding_dimension()

self.collection_name = collection_name
self.token = token
self.api_endpoint = api_endpoint
self.namespace = namespace

if astra_db_client is not None:
self.astra_db = astra_db_client
else:
self.astra_db = LibAstraDB(
token=self.token,
api_endpoint=self.api_endpoint,
namespace=self.namespace,
)
self.collection = self.astra_db.create_collection(
collection_name=self.collection_name,
dimension=self.embedding_dimension,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

import json
import time
import typing
from typing import List, Optional
from typing import TYPE_CHECKING, List, Optional

if typing.TYPE_CHECKING:
from astrapy.db import AstraDB as LibAstraDB
from langchain_community.utilities.astradb import AstraDBEnvironment

if TYPE_CHECKING:
from astrapy.db import AstraDB

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
Expand Down Expand Up @@ -42,40 +43,22 @@ def __init__(
collection_name: str = DEFAULT_COLLECTION_NAME,
token: Optional[str] = None,
api_endpoint: Optional[str] = None,
astra_db_client: Optional[LibAstraDB] = None, # type 'astrapy.db.AstraDB'
astra_db_client: Optional[AstraDB] = None,
namespace: Optional[str] = None,
) -> None:
"""Create an Astra DB chat message history."""
try:
from astrapy.db import AstraDB as LibAstraDB
except (ImportError, ModuleNotFoundError):
raise ImportError(
"Could not import a recent astrapy python package. "
"Please install it with `pip install --upgrade astrapy`."
)
astra_env = AstraDBEnvironment(
token=token,
api_endpoint=api_endpoint,
astra_db_client=astra_db_client,
namespace=namespace,
)
self.astra_db = astra_env.astra_db

# Conflicting-arg checks:
if astra_db_client is not None:
if token is not None or api_endpoint is not None:
raise ValueError(
"You cannot pass 'astra_db_client' to AstraDB if passing "
"'token' and 'api_endpoint'."
)
self.collection = self.astra_db.create_collection(collection_name)

self.session_id = session_id
self.collection_name = collection_name
self.token = token
self.api_endpoint = api_endpoint
self.namespace = namespace
if astra_db_client is not None:
self.astra_db = astra_db_client
else:
self.astra_db = LibAstraDB(
token=self.token,
api_endpoint=self.api_endpoint,
namespace=self.namespace,
)
self.collection = self.astra_db.create_collection(self.collection_name)

@property
def messages(self) -> List[BaseMessage]: # type: ignore
Expand Down
90 changes: 34 additions & 56 deletions libs/community/langchain_community/document_loaders/astradb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
)

from langchain_core.documents import Document
from langchain_core.runnables import run_in_executor

from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.astradb import AstraDBEnvironment

if TYPE_CHECKING:
from astrapy.db import AstraDB, AsyncAstraDB
Expand All @@ -42,69 +44,27 @@ def __init__(
nb_prefetched: int = 1000,
extraction_function: Callable[[Dict], str] = json.dumps,
) -> None:
try:
from astrapy.db import AstraDB
except (ImportError, ModuleNotFoundError):
raise ImportError(
"Could not import a recent astrapy python package. "
"Please install it with `pip install --upgrade astrapy`."
)

# Conflicting-arg checks:
if astra_db_client is not None or async_astra_db_client is not None:
if token is not None or api_endpoint is not None:
raise ValueError(
"You cannot pass 'astra_db_client' or 'async_astra_db_client' to "
"AstraDB if passing 'token' and 'api_endpoint'."
)
astra_env = AstraDBEnvironment(
token=token,
api_endpoint=api_endpoint,
astra_db_client=astra_db_client,
async_astra_db_client=async_astra_db_client,
namespace=namespace,
)
self.astra_env = astra_env
self.collection = astra_env.astra_db.collection(collection_name)
self.collection_name = collection_name
self.filter = filter_criteria
self.projection = projection
self.find_options = find_options or {}
self.nb_prefetched = nb_prefetched
self.extraction_function = extraction_function

astra_db = astra_db_client
async_astra_db = async_astra_db_client

if token and api_endpoint:
astra_db = AstraDB(
token=token,
api_endpoint=api_endpoint,
namespace=namespace,
)
try:
from astrapy.db import AsyncAstraDB

async_astra_db = AsyncAstraDB(
token=token,
api_endpoint=api_endpoint,
namespace=namespace,
)
except (ImportError, ModuleNotFoundError):
pass
if not astra_db and not async_astra_db:
raise ValueError(
"Must provide 'astra_db_client' or 'async_astra_db_client' or 'token' "
"and 'api_endpoint'"
)
self.collection = astra_db.collection(collection_name) if astra_db else None
if async_astra_db:
from astrapy.db import AsyncAstraDBCollection

self.async_collection = AsyncAstraDBCollection(
astra_db=async_astra_db, collection_name=collection_name
)
else:
self.async_collection = None

def load(self) -> List[Document]:
"""Eagerly load the content."""
return list(self.lazy_load())

def lazy_load(self) -> Iterator[Document]:
if not self.collection:
raise ValueError("Missing AstraDB client")
queue = Queue(self.nb_prefetched)
t = threading.Thread(target=self.fetch_results, args=(queue,))
t.start()
Expand All @@ -120,9 +80,27 @@ async def aload(self) -> List[Document]:
return [doc async for doc in self.alazy_load()]

async def alazy_load(self) -> AsyncIterator[Document]:
if not self.async_collection:
raise ValueError("Missing AsyncAstraDB client")
async for doc in self.async_collection.paginated_find(
if not self.astra_env.async_astra_db:
iterator = run_in_executor(
None,
self.collection.paginated_find,
filter=self.filter,
options=self.find_options,
projection=self.projection,
sort=None,
prefetched=True,
)
done = object()
while True:
item = await run_in_executor(None, lambda it: next(it, done), iterator)
if item is done:
break
yield item
return
async_collection = await self.astra_env.async_astra_db.collection(
self.collection_name
)
async for doc in async_collection.paginated_find(
filter=self.filter,
options=self.find_options,
projection=self.projection,
Expand All @@ -132,8 +110,8 @@ async def alazy_load(self) -> AsyncIterator[Document]:
yield Document(
page_content=self.extraction_function(doc),
metadata={
"namespace": self.async_collection.astra_db.namespace,
"api_endpoint": self.async_collection.astra_db.base_url,
"namespace": async_collection.astra_db.namespace,
"api_endpoint": async_collection.astra_db.base_url,
"collection": self.collection_name,
},
)
Expand Down
34 changes: 15 additions & 19 deletions libs/community/langchain_community/storage/astradb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import base64
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Generic,
Iterator,
Expand All @@ -13,6 +16,11 @@

from langchain_core.stores import BaseStore, ByteStore

from langchain_community.utilities.astradb import AstraDBEnvironment

if TYPE_CHECKING:
from astrapy.db import AstraDB

V = TypeVar("V")


Expand All @@ -22,31 +30,19 @@ def __init__(
collection_name: str,
token: Optional[str] = None,
api_endpoint: Optional[str] = None,
astra_db_client: Optional[Any] = None, # 'astrapy.db.AstraDB' if passed
astra_db_client: Optional[AstraDB] = None,
namespace: Optional[str] = None,
) -> None:
try:
from astrapy.db import AstraDB, AstraDBCollection
except (ImportError, ModuleNotFoundError):
raise ImportError(
"Could not import a recent astrapy python package. "
"Please install it with `pip install --upgrade astrapy`."
)

# Conflicting-arg checks:
if astra_db_client is not None:
if token is not None or api_endpoint is not None:
raise ValueError(
"You cannot pass 'astra_db_client' to AstraDB if passing "
"'token' and 'api_endpoint'."
)

astra_db = astra_db_client or AstraDB(
astra_env = AstraDBEnvironment(
token=token,
api_endpoint=api_endpoint,
astra_db_client=astra_db_client,
namespace=namespace,
)
self.collection = AstraDBCollection(collection_name, astra_db=astra_db)
self.astra_db = astra_env.astra_db
self.collection = self.astra_db.create_collection(
collection_name=collection_name,
)

@abstractmethod
def decode_value(self, value: Any) -> Optional[V]:
Expand Down
Loading

0 comments on commit 532541c

Please sign in to comment.