From d38816df0ba797dfcace43fe57aa4c4928d3fc38 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 27 Nov 2023 12:31:07 +0100 Subject: [PATCH] MongoDB/PyMongo: Add querying capabilities using JessiQL With corresponding improvements, the amalgamated PyMongo driver can now run 95% of the MongoDB "getting started" tutorial successfully. --- cratedb_toolkit/adapter/pymongo/api.py | 9 +- cratedb_toolkit/adapter/pymongo/backlog.md | 33 +-- cratedb_toolkit/adapter/pymongo/collection.py | 61 +++++- cratedb_toolkit/adapter/pymongo/cursor.py | 26 ++- cratedb_toolkit/adapter/pymongo/reactor.py | 73 +++++++ cratedb_toolkit/adapter/pymongo/util.py | 6 +- cratedb_toolkit/sqlalchemy/patch.py | 10 + cratedb_toolkit/util/date.py | 13 ++ cratedb_toolkit/util/pandas.py | 73 ++++++- pyproject.toml | 9 +- tests/adapter/__init__.py | 0 tests/adapter/test_pymongo.py | 200 +++++++++++++++++- 12 files changed, 471 insertions(+), 42 deletions(-) create mode 100644 cratedb_toolkit/adapter/pymongo/reactor.py create mode 100644 cratedb_toolkit/util/date.py create mode 100644 tests/adapter/__init__.py diff --git a/cratedb_toolkit/adapter/pymongo/api.py b/cratedb_toolkit/adapter/pymongo/api.py index af4e4113..6e8b061d 100644 --- a/cratedb_toolkit/adapter/pymongo/api.py +++ b/cratedb_toolkit/adapter/pymongo/api.py @@ -3,7 +3,11 @@ import pymongo.collection from cratedb_toolkit.adapter.pymongo.collection import collection_factory +from cratedb_toolkit.sqlalchemy.patch import patch_types_map from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.pandas import patch_pandas_sqltable_with_extended_mapping + +patch_types_map() class PyMongoCrateDbAdapter: @@ -39,9 +43,10 @@ def configure_sqlalchemy(self): https://cratedb.com/docs/crate/reference/en/latest/general/ddl/column-policy.html#dynamic """ # TODO: Provide unpatching hook. - from cratedb_toolkit.util.pandas import patch_pandas_io_sqldatabase_with_dialect_parameters + from cratedb_toolkit.util.pandas import patch_pandas_sqltable_with_dialect_parameters - patch_pandas_io_sqldatabase_with_dialect_parameters(table_kwargs={"crate_column_policy": "'dynamic'"}) + patch_pandas_sqltable_with_dialect_parameters(table_kwargs={"crate_column_policy": "'dynamic'"}) + patch_pandas_sqltable_with_extended_mapping() def activate_pymongo_adapter(self): """ diff --git a/cratedb_toolkit/adapter/pymongo/backlog.md b/cratedb_toolkit/adapter/pymongo/backlog.md index 5ae967f1..d474127a 100644 --- a/cratedb_toolkit/adapter/pymongo/backlog.md +++ b/cratedb_toolkit/adapter/pymongo/backlog.md @@ -1,23 +1,26 @@ -# PyMongo CrateDB Adapter +# PyMongo CrateDB Adapter Backlog + ## Iteration +1 -Make it work. +- Translate query expressions. -- Using individual columns for fields does not work, because `insert_one` works - iteratively, and needs to evolve the table schema gradually. As a consequence, - we need to use `OBJECT(DYNAMIC)` for storing MongoDB fields. -- Add software tests -- Add documentation + - https://github.com/gordonbusyman/mongo-to-sql-converter + - https://github.com/2do2go/json-sql -## Iteration +2 + - https://github.com/kolypto/py-mongosql + - https://github.com/SY-Xuan/mongo2sql + - https://github.com/nsragow/MongoToSqlParse + - https://github.com/sushmitharao2124/MongoToSQLConverter + - https://github.com/Phomint/MongoSQL + +- Add documentation -Translate query expressions. -- https://github.com/gordonbusyman/mongo-to-sql-converter -- https://github.com/2do2go/json-sql +## Done -- https://github.com/SY-Xuan/mongo2sql -- https://github.com/nsragow/MongoToSqlParse -- https://github.com/sushmitharao2124/MongoToSQLConverter -- https://github.com/Phomint/MongoSQL +- Make it work +- Using individual columns for fields does not work, because `insert_one` works + iteratively, and needs to evolve the table schema gradually. As a consequence, + we need to use `OBJECT(DYNAMIC)` for storing MongoDB fields. +- Add software tests diff --git a/cratedb_toolkit/adapter/pymongo/collection.py b/cratedb_toolkit/adapter/pymongo/collection.py index 86088d7e..4b58d55d 100644 --- a/cratedb_toolkit/adapter/pymongo/collection.py +++ b/cratedb_toolkit/adapter/pymongo/collection.py @@ -1,17 +1,20 @@ import io import logging -from typing import Any, Mapping, Optional, Union +from collections import abc +from typing import Any, Iterable, Iterator, Mapping, Optional, Union import pandas as pd from bson.raw_bson import RawBSONDocument +from crate.client.sqlalchemy.support import insert_bulk +from pymongo import common from pymongo.client_session import ClientSession from pymongo.collection import Collection from pymongo.cursor import Cursor -from pymongo.results import InsertOneResult +from pymongo.results import InsertManyResult, InsertOneResult from pymongo.typings import _DocumentType from cratedb_toolkit.adapter.pymongo.cursor import cursor_factory -from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId +from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId as ObjectId from cratedb_toolkit.util import DatabaseAdapter logger = logging.getLogger(__name__) @@ -50,12 +53,9 @@ def insert_one( session: Optional[ClientSession] = None, comment: Optional[Any] = None, ) -> InsertOneResult: - logger.debug( - f"Pretending to insert document into MongoDB: database={self.database.name}, collection={self.name}" - ) - logger.debug(f"Document: {document}") + logger.debug(f"Reading document: {document}") data = pd.DataFrame.from_records([document]) - # logger.debug(f"Dataframe: {self.get_df_info()}, {data.tail()}") # noqa: ERA001 + # logger.debug(f"Dataframe: {self.get_df_info(data)}, {data.tail()}") # noqa: ERA001 logger.debug(f"Inserting record into CrateDB: schema={self.database.name}, table={self.name}") object_id_cratedb: Optional[str] = None @@ -101,8 +101,51 @@ def insert_returning_id(pd_table, conn, keys, data_iter): if object_id_cratedb is None: raise ValueError("Object may have been created, but there is no object id") - object_id_mongodb = AmendedObjectId.from_str(object_id_cratedb) + object_id_mongodb = ObjectId.from_str(object_id_cratedb) logger.debug(f"Created object with id: {object_id_mongodb!r}") return InsertOneResult(inserted_id=object_id_mongodb, acknowledged=True) + def insert_many( + self, + documents: Iterable[Union[_DocumentType, RawBSONDocument]], + ordered: bool = True, + bypass_document_validation: bool = False, + session: Optional[ClientSession] = None, + comment: Optional[Any] = None, + ) -> InsertManyResult: + if not isinstance(documents, abc.Iterable) or isinstance(documents, abc.Mapping) or not documents: + raise TypeError("documents must be a non-empty list") + inserted_ids: list[ObjectId] = [] + + def gen() -> Iterator[tuple[int, Mapping[str, Any]]]: + """A generator that validates documents and handles _ids.""" + for document in documents: + common.validate_is_document_type("document", document) + if not isinstance(document, RawBSONDocument): + if "_id" not in document: + identifier = ObjectId() + document["_id"] = str(identifier) # type: ignore[index] + inserted_ids.append(identifier) + yield document + + logger.debug("Converting documents") + documents = list(gen()) + + logger.debug(f"Reading documents: {documents}") + data = pd.DataFrame.from_records(documents) + logger.debug(f"Dataframe: {self.get_df_info(data)}, {data.tail()}") # noqa: ERA001 + logger.debug(f"Inserting records into CrateDB: schema={self.database.name}, table={self.name}") + + data.to_sql( + name=self.name, + schema=self.database.name, + con=cratedb.engine, + index=False, + # TODO: Handle `append` vs. `replace`. + if_exists="append", + method=insert_bulk, + ) + + return InsertManyResult(inserted_ids, acknowledged=True) + return AmendedCollection diff --git a/cratedb_toolkit/adapter/pymongo/cursor.py b/cratedb_toolkit/adapter/pymongo/cursor.py index eadd646d..5fecb10f 100644 --- a/cratedb_toolkit/adapter/pymongo/cursor.py +++ b/cratedb_toolkit/adapter/pymongo/cursor.py @@ -1,20 +1,25 @@ import copy import logging +import warnings from collections import deque from typing import Any, Iterable, Mapping, Optional, Union +import sqlalchemy as sa from bson import SON from pymongo import CursorType, helpers from pymongo.client_session import ClientSession from pymongo.collation import validate_collation_or_none from pymongo.collection import Collection -from pymongo.common import validate_is_document_type +from pymongo.common import validate_is_document_type, validate_is_mapping from pymongo.cursor import _QUERY_OPTIONS, Cursor, _Hint, _Sort from pymongo.errors import InvalidOperation from pymongo.message import _GetMore, _Query from pymongo.read_preferences import _ServerMode from pymongo.typings import _Address, _CollationIn, _DocumentType +from pymongo.write_concern import validate_boolean +from sqlalchemy.util import to_list +from cratedb_toolkit.adapter.pymongo.reactor import mongodb_query, table_to_model from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId from cratedb_toolkit.util import DatabaseAdapter @@ -77,7 +82,6 @@ def __init__( self.__explicit_session = False spec: Mapping[str, Any] = filter or {} - """ validate_is_mapping("filter", spec) if not isinstance(skip, int): raise TypeError("skip must be an instance of int") @@ -112,7 +116,6 @@ def __init__( if allow_disk_use is not None: allow_disk_use = validate_boolean("allow_disk_use", allow_disk_use) - """ if projection is not None: projection = helpers._fields_list_to_dict(projection, "projection") @@ -279,6 +282,12 @@ def _refresh(self) -> int: return len(self.__data) + def sort(self, key_or_list: _Hint, direction: Optional[Union[int, str]] = None) -> Cursor[_DocumentType]: + """ """ + self.__ordering = to_list(key_or_list) + super().sort(key_or_list, direction=direction) + return self + def __send_message(self, operation: Union[_Query, _GetMore]) -> None: """ Usually sends a query or getmore operation and handles the response to/from a MongoDB server. @@ -287,6 +296,15 @@ def __send_message(self, operation: Union[_Query, _GetMore]) -> None: TODO: OperationFailure / self.close() / PinnedResponse / explain / batching """ + metadata = sa.MetaData(schema=operation.db) + table_name = operation.coll + + table = sa.Table(table_name, metadata, autoload_with=cratedb.engine) + table.append_column(sa.Column("_id", sa.String(), primary_key=True, system=True)) + model = table_to_model(table) + + query = mongodb_query(model=model, filter=self.__spec or {}, sort=self.__ordering or ["_id"]) + """ tbl = f'"{operation.db}"."{operation.coll}"' sql = f"SELECT *, _id FROM {tbl}" # noqa: S608 if self.__ordering is not None: @@ -307,6 +325,8 @@ def __send_message(self, operation: Union[_Query, _GetMore]) -> None: sql += f" LIMIT {operation.limit}" logger.debug(f"Running SQL: {sql}") records = cratedb.run_sql(sql, records=True) + """ + records = query.fetchall(cratedb.engine) for record in records: record["_id"] = AmendedObjectId.from_str(record["_id"]) self.__data = deque(records) diff --git a/cratedb_toolkit/adapter/pymongo/reactor.py b/cratedb_toolkit/adapter/pymongo/reactor.py new file mode 100644 index 00000000..6d81a4e5 --- /dev/null +++ b/cratedb_toolkit/adapter/pymongo/reactor.py @@ -0,0 +1,73 @@ +import typing as t + +import sqlalchemy as sa +from jessiql import Query, QueryObject +from jessiql.exc import InvalidColumnError +from jessiql.typing import SARowDict + + +def table_to_model(table: sa.Table) -> t.Type[sa.orm.Mapper]: + """ + Create SQLAlchemy model class from Table object. + + - https://docs.sqlalchemy.org/en/14/orm/mapping_styles.html#imperative-mapping + - https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html + """ + mapper_registry = sa.orm.registry(metadata=table.metadata) + Surrogate = type("Surrogate", (), {}) + mapper_registry.map_imperatively(Surrogate, table) + return Surrogate + + +def reflect_model(engine: t.Any, metadata: sa.MetaData, table_name: str) -> t.Type[sa.orm.Mapper]: + """ + Create SQLAlchemy model class by reflecting a database table. + """ + table = sa.Table(table_name, metadata, autoload_with=engine) + return table_to_model(table) + + +def mongodb_query( + model: t.Type[sa.orm.Mapper], + select: t.List = None, + filter: t.Dict = None, # noqa: A002 + sort: t.Dict = None, +) -> Query: + """ + Create a JessiQL Query object from an SQLAlchemy model class and typical MongoDB query parameters. + """ + + select = select or list(model._sa_class_manager.keys()) + + filter = filter or {} # noqa: A001 + sort = sort or [] + + # TODO: select, filter, sort, skip, limit + if "_id" in filter: + filter["_id"] = str(filter["_id"]) + query_dict = {"select": select, "filter": filter, "sort": sort} + query_object = QueryObject.from_query_object(query_dict) + + try: + return Query(query=query_object, Model=model) + except InvalidColumnError as ex: + msg = str(ex) + if "Invalid column" in msg and "specified in filter" in msg: + return EmptyQuery() + else: + raise + + +class EmptyQuery(Query): + """ + A surrogate QueryExecutor for propagating back empty results. + """ + + def __init__(self, *args, **kwargs): + self.related_executors = {} + + def _load_results(self, *args, **kwargs) -> t.List[SARowDict]: + return [] + + def _apply_operations_to_results(self, *args, **kwargs) -> t.List[SARowDict]: + return [] diff --git a/cratedb_toolkit/adapter/pymongo/util.py b/cratedb_toolkit/adapter/pymongo/util.py index db1940f3..171068e5 100644 --- a/cratedb_toolkit/adapter/pymongo/util.py +++ b/cratedb_toolkit/adapter/pymongo/util.py @@ -32,7 +32,11 @@ def __repr__(self) -> str: return f"ObjectId('{self!s}')" def __generate(self) -> None: - """Generate a new value for this ObjectId.""" + """ + Generate a new value for this ObjectId. + + TODO: Generate IDs of the same width like CrateDB. + """ # 4 bytes current time oid = struct.pack(">I", int(time.time())) diff --git a/cratedb_toolkit/sqlalchemy/patch.py b/cratedb_toolkit/sqlalchemy/patch.py index 78c89770..ca13af44 100644 --- a/cratedb_toolkit/sqlalchemy/patch.py +++ b/cratedb_toolkit/sqlalchemy/patch.py @@ -1,6 +1,8 @@ import typing as t import sqlalchemy as sa +from crate.client.sqlalchemy.dialect import TYPES_MAP +from sqlalchemy import types as sqltypes def patch_inspector(): @@ -32,3 +34,11 @@ def get_table_names(self, connection: sa.Connection, schema: t.Optional[str] = N return get_table_names_dist(self, connection=connection, schema=schema, **kw) CrateDialect.get_table_names = get_table_names # type: ignore + + +def patch_types_map(): + """ + Register missing timestamp data type. + """ + # TODO: Submit patch to `crate-python`. + TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP diff --git a/cratedb_toolkit/util/date.py b/cratedb_toolkit/util/date.py new file mode 100644 index 00000000..e077cbd3 --- /dev/null +++ b/cratedb_toolkit/util/date.py @@ -0,0 +1,13 @@ +import datetime as dt +import math + + +def truncate_milliseconds(timestamp: dt.datetime): + """ + Downgrade Python datetime objects from microseconds to milliseconds resolution. + + Input: 2023-11-26 21:57:18.537624 + Output: 2023-11-26 21:57:18.537000 + """ + dec, integer = math.modf(timestamp.microsecond / 1000) + return timestamp - dt.timedelta(microseconds=round(dec * 1000)) diff --git a/cratedb_toolkit/util/pandas.py b/cratedb_toolkit/util/pandas.py index 7d683fd2..0bae5e2e 100644 --- a/cratedb_toolkit/util/pandas.py +++ b/cratedb_toolkit/util/pandas.py @@ -1,9 +1,9 @@ import typing as t -from pandas.io.sql import SQLTable +import sqlalchemy as sa -def patch_pandas_io_sqldatabase_with_dialect_parameters(table_kwargs: t.Dict = None): +def patch_pandas_sqltable_with_dialect_parameters(table_kwargs: t.Dict = None): """ When using pandas' `to_sql` function, configure the SQLAlchemy database dialect implementation using custom dialect parameters. @@ -15,6 +15,9 @@ def patch_pandas_io_sqldatabase_with_dialect_parameters(table_kwargs: t.Dict = N table_kwargs = table_kwargs or {} # Provide enhancement code. + # It needs to be inlined in order to obtain `table_kwargs`. + from pandas.io.sql import SQLTable + class SQLTableWithDialectParameters(SQLTable): def _create_table_setup(self): table = super()._create_table_setup() @@ -25,3 +28,69 @@ def _create_table_setup(self): import pandas.io.sql pandas.io.sql.SQLTable = SQLTableWithDialectParameters + + +def patch_pandas_sqltable_with_extended_mapping(): + """ + Improve pandas' `to_sql` function to respect CrateDB's advanced data types. + """ + + # It needs to be inlined in order to be composable with `SQLTableWithDialectParameters`. + from pandas.io.sql import SQLTable + + class SQLTableWithDtypeMappersForCrateDB(SQLTable): + """ + column_names_and_types = [] + if self.index is not None: + for i, idx_label in enumerate(self.index): + idx_type = dtype_mapper(self.frame.index._get_level_values(i)) + column_names_and_types.append((str(idx_label), idx_type, True)) + + column_names_and_types += [ + (str(self.frame.columns[i]), dtype_mapper(self.frame.iloc[:, i]), False) + for i in range(len(self.frame.columns)) + ] + """ + + def _get_column_names_and_types(self, dtype_mapper, errors: str = "raise"): + column_names_and_types = super()._get_column_names_and_types(dtype_mapper) + column_names_and_types_new = [] + for name, sqlalchemy_type, is_index in column_names_and_types: + # TODO: Currently only handles columns. Also make it handle indexes. + if is_index: + if errors == "raise": + raise ValueError("pandas indexes not supported yet") + + else: + values = self.frame._get_value(0, name) + if isinstance(values, list): + first_value = values[0] + first_value_type = type(first_value) + # TODO: Use `boltons.remap`. + sqlalchemy_type = ARRAY_TYPE_MAP.get(first_value_type) + if sqlalchemy_type is None: + raise TypeError(f"Data type not supported yet: List[{first_value_type}]") + + column_names_and_types_new.append((name, sqlalchemy_type, is_index)) + + return column_names_and_types_new + + def _execute_create(self) -> None: + # Prevent `InvalidColumnNameException["_id" conflicts with system column pattern]`. + # TODO: Solve differently. + ddl = self.sql_schema() + ddl = ddl.replace('"_id" STRING,', "") + with self.pd_sql.run_transaction(): + self.pd_sql.con.exec_driver_sql(ddl) + + # Activate enhancement code. + import pandas.io.sql + + pandas.io.sql.SQLTable = SQLTableWithDtypeMappersForCrateDB + + +ARRAY_TYPE_MAP = { + str: sa.ARRAY(sa.TEXT), + int: sa.ARRAY(sa.INTEGER), + float: sa.ARRAY(sa.FLOAT), +} diff --git a/pyproject.toml b/pyproject.toml index 2d5d4be9..d1b9b745 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,10 +90,10 @@ dependencies = [ "colorlog", "crash", "crate[sqlalchemy]>=0.34", - "croud==1.8", + "croud", 'importlib-metadata; python_version <= "3.7"', "python-dotenv<2", - "sqlalchemy>=2", + "sqlalchemy", "sqlparse<0.5", ] [project.optional-dependencies] @@ -123,6 +123,11 @@ mongodb = [ "python-bsonjs<0.4", "rich<14,>=3.3.2", ] +pymongo = [ + "jessiql==1.0.0rc1", + "pymongo<5,>=3.10.1", + "sqlalchemy<2.0", +] release = [ "build<2", "twine<5", diff --git a/tests/adapter/__init__.py b/tests/adapter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/adapter/test_pymongo.py b/tests/adapter/test_pymongo.py index 6b18f9d1..be1e26b2 100644 --- a/tests/adapter/test_pymongo.py +++ b/tests/adapter/test_pymongo.py @@ -1,10 +1,13 @@ +import datetime as dt import typing as t +from unittest import mock import pymongo import pytest from cratedb_toolkit.adapter.pymongo import PyMongoCrateDbAdapter from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId +from cratedb_toolkit.util.date import truncate_milliseconds from tests.conftest import TESTDRIVE_DATA_SCHEMA, CrateDBFixture pytestmark = pytest.mark.mongodb @@ -35,6 +38,14 @@ def pymongo_client(): ) +@pytest.fixture +def sync_writes(cratedb): + def refresh(): + cratedb.database.refresh_table(f'"{TESTDRIVE_DATA_SCHEMA}"."foobar"') + + return refresh + + def test_pymongo_metadata(pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient): """ Verify attribute access to the database and collection handles works well. @@ -51,7 +62,7 @@ def test_pymongo_metadata(pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client def test_pymongo_insert_one_single( - pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture + pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture, sync_writes ): """ Verify a single basic data insert operation `insert_one` works well. @@ -64,7 +75,9 @@ def test_pymongo_insert_one_single( # TODO: Can this be made type-compatible, by swapping in the surrogate implementation? assert isinstance(inserted_id, AmendedObjectId) - cratedb.database.run_sql(f'REFRESH TABLE "{TESTDRIVE_DATA_SCHEMA}"."foobar";') + # Synchronize write operations. + sync_writes() + results = cratedb.database.run_sql( f'SELECT * FROM "{TESTDRIVE_DATA_SCHEMA}"."foobar" ORDER BY _id;', records=True # noqa: S608 ) @@ -72,7 +85,7 @@ def test_pymongo_insert_one_single( def test_pymongo_insert_one_multiple( - pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture + pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture, sync_writes ): """ Verify the basic data insert operation `insert_one` works well, when called multiple times. @@ -88,15 +101,44 @@ def test_pymongo_insert_one_multiple( assert isinstance(inserted_id_x, AmendedObjectId) assert isinstance(inserted_id_y, AmendedObjectId) - cratedb.database.run_sql(f'REFRESH TABLE "{TESTDRIVE_DATA_SCHEMA}"."foobar";') + # Synchronize write operations. + sync_writes() + results = cratedb.database.run_sql( f'SELECT * FROM "{TESTDRIVE_DATA_SCHEMA}"."foobar" ORDER BY _id;', records=True # noqa: S608 ) - assert results == [{"x": 42, "y": None}, {"x": None, "y": 84}] + assert {"x": 42, "y": None} in results + assert {"x": None, "y": 84} in results + + +def test_pymongo_insert_many( + pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture, sync_writes +): + """ + Verify the basic data insert operation `insert_many` works well. + """ + + # Insert documents. + collection: pymongo.collection.Collection = pymongo_client[TESTDRIVE_DATA_SCHEMA].foobar + result = collection.insert_many([{"x": 42}, {"y": 84}]) + assert len(result.inserted_ids) == 2 + inserted_id_x = result.inserted_ids[0] + inserted_id_y = result.inserted_ids[1] + + # TODO: Can this be made type-compatible, by swapping in the surrogate implementation? + assert isinstance(inserted_id_x, AmendedObjectId) + assert isinstance(inserted_id_y, AmendedObjectId) + + # Synchronize write operations. + sync_writes() + + # Verify documents in database. + assert collection.find_one({"x": 42}) == {"x": 42, "y": None, "_id": mock.ANY} + assert collection.find_one({"y": 84}) == {"x": None, "y": 84, "_id": mock.ANY} def test_pymongo_count_documents( - pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture + pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture, sync_writes ): """ Verify the `count_documents` operation works well. @@ -105,6 +147,148 @@ def test_pymongo_count_documents( # Insert records. collection: pymongo.collection.Collection = pymongo_client[TESTDRIVE_DATA_SCHEMA].foobar collection.insert_one({"x": 42}) + collection.insert_one({"y": 42}) + + # Synchronize write operations. + sync_writes() - cratedb.database.run_sql(f'REFRESH TABLE "{TESTDRIVE_DATA_SCHEMA}"."foobar";') - assert collection.count_documents({}) == 1 + # Validate `count_documents` method. + assert collection.count_documents({}) == 2 + assert collection.count_documents({"x": 42}) == 1 + assert collection.count_documents({"y": 42}) == 1 + + assert collection.count_documents({"foo": "bar"}) == 0 + + +def test_pymongo_roundtrip_document( + pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture, sync_writes +): + """ + Verify the `count_documents` operation works well. + """ + + # Define single document to insert. + document_original = { + "author": "Mike", + "text": "My first blog post!", + "tags": ["mongodb", "python", "pymongo"], + # TODO: With timezone. "date": dt.datetime.now(tz=dt.timezone.utc), + "date": dt.datetime.now(), + } + + # Insert records. + collection: pymongo.collection.Collection = pymongo_client[TESTDRIVE_DATA_SCHEMA].foobar + collection.insert_one(document_original) + + # Synchronize write operations. + sync_writes() + + # Getting a Single Document With find_one(). + # https://pymongo.readthedocs.io/en/stable/tutorial.html#getting-a-single-document-with-find-one + document_loaded = collection.find_one({"author": "Mike"}) + del document_loaded["_id"] + + # CrateDB stores timestamps with milliseconds resolution. When comparing + # Python datetime objects loaded from CrateDB, they need to be downgraded + # from microseconds resolution. + document_original["date"] = truncate_milliseconds(document_original["date"]) + document_loaded["date"] = truncate_milliseconds(document_loaded["date"]) + assert document_loaded == document_original + + +def test_pymongo_tutorial( + pymongo_cratedb: PyMongoCrateDbAdapter, pymongo_client: pymongo.MongoClient, cratedb: CrateDBFixture, sync_writes +): + """ + Verify the PyMongo Tutorial works well. + + https://pymongo.readthedocs.io/en/stable/tutorial.html + """ + + # Define multiple documents to insert. + post_items = [ + { + "author": "Mike", + "text": "My first blog post!", + "tags": ["mongodb", "python", "pymongo"], + "date": dt.datetime.now(tz=dt.timezone.utc), + }, + { + "author": "Mike", + "text": "Another post!", + "tags": ["bulk", "insert"], + "date": dt.datetime(2009, 11, 12, 11, 14), + }, + { + "author": "Eliot", + "title": "MongoDB is fun", + "text": "and pretty easy too!", + "date": dt.datetime(2009, 11, 10, 10, 45), + }, + ] + + # Bulk Inserts. + # https://pymongo.readthedocs.io/en/stable/tutorial.html#bulk-inserts + posts: pymongo.collection.Collection = pymongo_client[TESTDRIVE_DATA_SCHEMA].foobar + result = posts.insert_many(post_items) + assert len(result.inserted_ids) == 3 + assert isinstance(result.inserted_ids[0], AmendedObjectId) + + # Synchronize write operations. + sync_writes() + + # Getting a Single Document With find_one(). + # https://pymongo.readthedocs.io/en/stable/tutorial.html#getting-a-single-document-with-find-one + document_loaded = posts.find_one({"author": "Mike"}) + assert "mongodb" in document_loaded["tags"] + + # Querying By ObjectId. + # https://pymongo.readthedocs.io/en/stable/tutorial.html#querying-by-objectid + post_id_1 = result.inserted_ids[0] + # Note that an ObjectId is not the same as its string representation. + assert posts.find_one({"_id": str(post_id_1)}) is None + # It is necessary to convert the ObjectId from a string before passing it to find_one. + # FIXME: Solve ObjectId value conversion. + # assert posts.find_one({"_id": post_id_1})["author"] == "Mike" # noqa: ERA001 + + # Querying for More Than One Document. + # https://pymongo.readthedocs.io/en/stable/tutorial.html#querying-for-more-than-one-document + assert list(posts.find({"author": "Mike"})) == [ + { + "author": "Mike", + "text": "My first blog post!", + "tags": mock.ANY, + "date": mock.ANY, + "title": mock.ANY, + "_id": mock.ANY, + }, + { + "author": "Mike", + "text": "Another post!", + "tags": mock.ANY, + "date": mock.ANY, + "title": mock.ANY, + "_id": mock.ANY, + }, + ] + + # Counting. + # https://pymongo.readthedocs.io/en/stable/tutorial.html#counting + assert posts.count_documents({}) == 3 + assert posts.count_documents({"author": "Mike"}) == 2 + + # Range Queries. + # https://pymongo.readthedocs.io/en/stable/tutorial.html#range-queries + d = dt.datetime(2009, 11, 12, 12) + iter_documents = posts.find({"date": {"$lt": d}}).sort("author") + documents = list(iter_documents) + assert documents[0]["author"] == "Eliot" + assert documents[1]["author"] == "Mike" + + # Indexing. + # https://pymongo.readthedocs.io/en/stable/tutorial.html#indexing + """ + db = pymongo_client[TESTDRIVE_DATA_SCHEMA] + result = db.profiles.create_index([("user_id", pymongo.ASCENDING)], unique=True) + assert sorted(list(db.profiles.index_information())) == ['_id_', 'user_id_1'] + """