From 6b65a1955c2f5777acad7cae1ba1f6614e3f858a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 17 Jan 2024 00:16:01 +0100 Subject: [PATCH] Add re-usable patches and poly-fills from application adapters Sources: MLflow, LangChain, Singer/Meltano, rdflib-sqlalchemy --- CHANGES.md | 1 + src/sqlalchemy_cratedb/support/__init__.py | 8 ++ src/sqlalchemy_cratedb/support/polyfill.py | 128 +++++++++++++++++++++ src/sqlalchemy_cratedb/support/util.py | 32 ++++++ tests/integration.py | 1 + tests/test_support_polyfill.py | 128 +++++++++++++++++++++ 6 files changed, 298 insertions(+) create mode 100644 src/sqlalchemy_cratedb/support/polyfill.py create mode 100644 src/sqlalchemy_cratedb/support/util.py create mode 100644 tests/test_support_polyfill.py diff --git a/CHANGES.md b/CHANGES.md index 3b65ced..6b4ea25 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Added/reactivated documentation as `sqlalchemy-cratedb` +- Added re-usable patches and poly-fills from application adapters ## 2024/06/13 0.37.0 - Added support for CrateDB's [FLOAT_VECTOR] data type and its accompanying diff --git a/src/sqlalchemy_cratedb/support/__init__.py b/src/sqlalchemy_cratedb/support/__init__.py index efc79d3..c40dbbd 100644 --- a/src/sqlalchemy_cratedb/support/__init__.py +++ b/src/sqlalchemy_cratedb/support/__init__.py @@ -1,5 +1,13 @@ from sqlalchemy_cratedb.support.pandas import insert_bulk +from sqlalchemy_cratedb.support.polyfill import check_uniqueness_factory, refresh_after_dml, \ + patch_autoincrement_timestamp +from sqlalchemy_cratedb.support.util import refresh_table, refresh_dirty __all__ = [ + check_uniqueness_factory, insert_bulk, + patch_autoincrement_timestamp, + refresh_after_dml, + refresh_dirty, + refresh_table, ] diff --git a/src/sqlalchemy_cratedb/support/polyfill.py b/src/sqlalchemy_cratedb/support/polyfill.py new file mode 100644 index 0000000..230af0d --- /dev/null +++ b/src/sqlalchemy_cratedb/support/polyfill.py @@ -0,0 +1,128 @@ +import sqlalchemy as sa +from sqlalchemy.event import listen +import typing as t + +from sqlalchemy_cratedb.support.util import refresh_dirty, refresh_table + + +def patch_autoincrement_timestamp(): + """ + Configure SQLAlchemy model columns with an alternative to `autoincrement=True`. + Use the current timestamp instead. + + This is used by CrateDB's MLflow adapter. + + TODO: Maybe enable through a dialect parameter `crate_polyfill_autoincrement` or such. + """ + import sqlalchemy.sql.schema as schema + + init_dist = schema.Column.__init__ + + def __init__(self, *args, **kwargs): + if "autoincrement" in kwargs: + del kwargs["autoincrement"] + if "default" not in kwargs: + kwargs["default"] = sa.func.now() + init_dist(self, *args, **kwargs) + + schema.Column.__init__ = __init__ # type: ignore[method-assign] + + +def check_uniqueness_factory(sa_entity, *attribute_names): + """ + Run a manual column value uniqueness check on a table, and raise an IntegrityError if applicable. + + CrateDB does not support the UNIQUE constraint on columns. This attempts to emulate it. + + https://github.com/crate/sqlalchemy-cratedb/issues/76 + + This is used by CrateDB's MLflow adapter. + + TODO: Maybe enable through a dialect parameter `crate_polyfill_unique` or such. + """ + + # Synthesize a canonical "name" for the constraint, + # composed of all column names involved. + constraint_name: str = "-".join(attribute_names) + + def check_uniqueness(mapper, connection, target): + from sqlalchemy.exc import IntegrityError + + if isinstance(target, sa_entity): + # TODO: How to use `session.query(SqlExperiment)` here? + stmt = mapper.selectable.select() + for attribute_name in attribute_names: + stmt = stmt.filter(getattr(sa_entity, attribute_name) == getattr(target, attribute_name)) + stmt = stmt.compile(bind=connection.engine) + results = connection.execute(stmt) + if results.rowcount > 0: + raise IntegrityError( + statement=stmt, + params=[], + orig=Exception( + f"DuplicateKeyException in table '{target.__tablename__}' " f"on constraint '{constraint_name}'" + ), + ) + + return check_uniqueness + + +def refresh_after_dml_session(session: sa.orm.Session): + """ + Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE). + + CrateDB is eventually consistent, i.e. write operations are not flushed to + disk immediately, so readers may see stale data. In a traditional OLTP-like + application, this is not applicable. + + This SQLAlchemy extension makes sure that data is synchronized after each + operation manipulating data. + + > `after_{insert,update,delete}` events only apply to the session flush operation + > and do not apply to the ORM DML operations described at ORM-Enabled INSERT, + > UPDATE, and DELETE statements. To intercept ORM DML events, use + > `SessionEvents.do_orm_execute().` + > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.MapperEvents.after_insert + + > Intercept statement executions that occur on behalf of an ORM Session object. + > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.do_orm_execute + + > Execute after flush has completed, but before commit has been called. + > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.after_flush + + This is used by CrateDB's LangChain adapter. + + TODO: Maybe enable through a dialect parameter `crate_dml_refresh` or such. + """ # noqa: E501 + listen(session, "after_flush", refresh_dirty) + + +def refresh_after_dml_engine(engine: sa.engine.Engine): + """ + Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE). + + This is used by CrateDB's Singer/Meltano and `rdflib-sqlalchemy` adapters. + """ + def receive_after_execute( + conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result + ): + if isinstance(clauseelement, (sa.sql.Insert, sa.sql.Update, sa.sql.Delete)): + if not isinstance(clauseelement.table, sa.sql.Join): + full_table_name = f'"{clauseelement.table.name}"' + if clauseelement.table.schema is not None: + full_table_name = f'"{clauseelement.table.schema}".' + full_table_name + refresh_table(conn, full_table_name) + + sa.event.listen(engine, "after_execute", receive_after_execute) + + +def refresh_after_dml(engine_or_session: t.Union[sa.engine.Engine, sa.orm.Session]): + """ + Run `REFRESH TABLE` after each DML operation (INSERT, UPDATE, DELETE). + """ + if isinstance(engine_or_session, sa.engine.Engine): + refresh_after_dml_engine(engine_or_session) + elif isinstance(engine_or_session, (sa.orm.Session, sa.orm.scoping.scoped_session)): + refresh_after_dml_session(engine_or_session) + else: + raise TypeError(f"Unknown type: {type(engine_or_session)}") diff --git a/src/sqlalchemy_cratedb/support/util.py b/src/sqlalchemy_cratedb/support/util.py new file mode 100644 index 0000000..f8695b7 --- /dev/null +++ b/src/sqlalchemy_cratedb/support/util.py @@ -0,0 +1,32 @@ +import itertools +import typing as t + +import sqlalchemy as sa +try: + from sqlalchemy.orm import DeclarativeBase +except: + from sqlalchemy.orm import DeclarativeMeta + + +def refresh_table(connection, target: t.Union[str, "DeclarativeBase", "DeclarativeMeta"]): + """ + Invoke a `REFRESH TABLE` statement. + """ + if hasattr(target, "__tablename__"): + sql = f"REFRESH TABLE {target.__tablename__}" + else: + sql = f"REFRESH TABLE {target}" + connection.execute(sa.text(sql)) + + +def refresh_dirty(session, flush_context=None): + """ + Invoke a `REFRESH TABLE` statement on each table entity flagged as "dirty". + + SQLAlchemy event handler for the 'after_flush' event, + invoking `REFRESH TABLE` on each table which has been modified. + """ + dirty_entities = itertools.chain(session.new, session.dirty, session.deleted) + dirty_classes = {entity.__class__ for entity in dirty_entities} + for class_ in dirty_classes: + refresh_table(session, class_) diff --git a/tests/integration.py b/tests/integration.py index 5e262fc..80d155e 100644 --- a/tests/integration.py +++ b/tests/integration.py @@ -124,6 +124,7 @@ def drop_tables(): "DROP TABLE IF EXISTS archived_tasks", "DROP TABLE IF EXISTS characters", "DROP TABLE IF EXISTS cities", + "DROP TABLE IF EXISTS foobar", "DROP TABLE IF EXISTS locations", "DROP BLOB TABLE IF EXISTS myfiles", "DROP TABLE IF EXISTS search", diff --git a/tests/test_support_polyfill.py b/tests/test_support_polyfill.py new file mode 100644 index 0000000..8b4316c --- /dev/null +++ b/tests/test_support_polyfill.py @@ -0,0 +1,128 @@ +import datetime as dt + +import pytest +import sqlalchemy as sa +from sqlalchemy.event import listen +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import sessionmaker + +try: + from sqlalchemy.orm import declarative_base +except ImportError: + from sqlalchemy.ext.declarative import declarative_base + +from sqlalchemy_cratedb.support import check_uniqueness_factory, patch_autoincrement_timestamp, refresh_after_dml + + +def test_autoincrement_timestamp(cratedb_service): + """ + Validate autoincrement columns using `sa.DateTime` columns. + + https://github.com/crate/sqlalchemy-cratedb/issues/77 + """ + patch_autoincrement_timestamp() + + engine = cratedb_service.database.engine + session = sessionmaker(bind=engine)() + Base = declarative_base() + + # Define DDL. + class FooBar(Base): + __tablename__ = 'foobar' + id = sa.Column(sa.DateTime, primary_key=True, autoincrement=True) + + Base.metadata.drop_all(engine, checkfirst=True) + Base.metadata.create_all(engine, checkfirst=True) + + # Insert record. + foo_item = FooBar() + session.add(foo_item) + session.commit() + session.execute(sa.text("REFRESH TABLE foobar")) + + # Query record. + query = session.query(FooBar.id) + result = query.first() + + # Compare outcome. + assert result[0].year == dt.datetime.now().year + + +def test_check_uniqueness_factory(cratedb_service): + """ + Validate basic synthetic UNIQUE constraints. + + https://github.com/crate/sqlalchemy-cratedb/issues/76 + """ + + engine = cratedb_service.database.engine + session = sessionmaker(bind=engine)() + Base = declarative_base() + + # Define DDL. + class FooBar(Base): + __tablename__ = 'foobar' + id = sa.Column(sa.String, primary_key=True) + name = sa.Column(sa.String) + + # Add synthetic UNIQUE constraint on `name` column. + listen(FooBar, "before_insert", check_uniqueness_factory(FooBar, "name")) + + Base.metadata.drop_all(engine, checkfirst=True) + Base.metadata.create_all(engine, checkfirst=True) + + # Insert baseline record. + foo_item = FooBar(id="foo", name="foo") + session.add(foo_item) + session.commit() + session.execute(sa.text("REFRESH TABLE foobar")) + + # Insert second record, violating the uniqueness constraint. + bar_item = FooBar(id="bar", name="foo") + session.add(bar_item) + with pytest.raises(IntegrityError) as ex: + session.commit() + assert ex.match("DuplicateKeyException in table 'foobar' on constraint 'name'") + + +@pytest.mark.parametrize("mode", ["engine", "session"]) +def test_refresh_after_dml(cratedb_service, mode): + """ + Validate automatic `REFRESH TABLE` issuing works well. + + https://github.com/crate/sqlalchemy-cratedb/issues/83 + """ + engine = cratedb_service.database.engine + session = sessionmaker(bind=engine)() + Base = declarative_base() + + # Enable automatic refresh. + if mode == "engine": + refresh_after_dml(engine) + elif mode == "session": + refresh_after_dml(session) + else: + raise ValueError(f"Unable to enable automatic refresh with mode: {mode}") + + # Define DDL. + class FooBar(Base): + __tablename__ = 'foobar' + id = sa.Column(sa.String, primary_key=True) + + Base.metadata.drop_all(engine, checkfirst=True) + Base.metadata.create_all(engine, checkfirst=True) + + # Insert baseline record. + foo_item = FooBar(id="foo") + session.add(foo_item) + session.commit() + + # Query record. + query = session.query(FooBar.id) + result = query.first() + + # Sanity checks. + assert result is not None, "Database result is empty. Most probably, `REFRESH TABLE` wasn't issued." + + # Compare outcome. + assert result[0] == "foo"