custom supabase checkpointer #2015
Unanswered
kaushalendra-pandey
asked this question in
Q&A
Replies: 2 comments
-
Maybe reference the postgres implmenetation |
Beta Was this translation helpful? Give feedback.
0 replies
-
Try something like this: import os
from typing import Optional
from dotenv import load_dotenv
from psycopg import AsyncConnection
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from api.utils.logger import setup_logger
load_dotenv()
class PGDatabase:
"""
A singleton class for managing PostgreSQL database connections and LangGraph checkpointing.
Uses direct connection for Supabase compatibility.
"""
_instance: Optional["PGDatabase"] = None
def __new__(cls) -> "PGDatabase":
if cls._instance is None:
cls._instance = super(PGDatabase, cls).__new__(cls)
return cls._instance
def __init__(self) -> None:
if not hasattr(self, "initialized"):
self._db_uri = os.getenv("DATABASE_URI")
self.conn: Optional[AsyncConnection] = None
self._checkpointer: Optional[AsyncPostgresSaver] = None
self.logger = setup_logger()
self.initialized = True
async def setup_connection(self) -> None:
"""Initialize the database connection and set up the checkpointer"""
if self.conn:
await self.close_connection()
self.conn = await AsyncConnection.connect(
self._db_uri,
autocommit=True, # Required for concurrent operations
sslmode="require", # Enforce SSL
connect_timeout=10, # Add timeout
)
try:
self._checkpointer = AsyncPostgresSaver(self.conn)
await self._checkpointer.setup()
except Exception as e:
await self.close_connection()
raise e
async def close_connection(self) -> None:
"""Close the database connection and cleanup prepared statements"""
if self.conn:
try:
# Clean up any prepared statements
async with self.conn.cursor() as cur:
await cur.execute("DEALLOCATE ALL")
except Exception as e:
self.logger.warning(f"Error cleaning up prepared statements: {e}")
finally:
await self.conn.close()
self.conn = None
self._checkpointer = None
@property
def checkpointer(self) -> Optional[AsyncPostgresSaver]:
"""Access the LangGraph checkpointer"""
return self._checkpointer
async def __aenter__(self) -> "PGDatabase":
"""Async context manager entry"""
await self.setup_connection()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit"""
await self.close_connection()
if __name__ == "__main__":
import asyncio
async def test_connection():
async with PGDatabase() as db:
print("Database connection established")
# Your database operations here
print("Database connection closed")
asyncio.run(test_connection()) Then use the checkpointer property as in the docs |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I wanted a create a checkpointer which uses supabase to store data. How can i do that ?
Beta Was this translation helpful? Give feedback.
All reactions