Skip to content

Commit

Permalink
MongoDB/PyMongo: Add querying capabilities using JessiQL
Browse files Browse the repository at this point in the history
With corresponding improvements, the amalgamated PyMongo driver can now
run 95% of the MongoDB "getting started" tutorial successfully.
  • Loading branch information
amotl committed Nov 27, 2023
1 parent 2c4b782 commit d38816d
Show file tree
Hide file tree
Showing 12 changed files with 471 additions and 42 deletions.
9 changes: 7 additions & 2 deletions cratedb_toolkit/adapter/pymongo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import pymongo.collection

from cratedb_toolkit.adapter.pymongo.collection import collection_factory
from cratedb_toolkit.sqlalchemy.patch import patch_types_map
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.pandas import patch_pandas_sqltable_with_extended_mapping

patch_types_map()


class PyMongoCrateDbAdapter:
Expand Down Expand Up @@ -39,9 +43,10 @@ def configure_sqlalchemy(self):
https://cratedb.com/docs/crate/reference/en/latest/general/ddl/column-policy.html#dynamic
"""
# TODO: Provide unpatching hook.
from cratedb_toolkit.util.pandas import patch_pandas_io_sqldatabase_with_dialect_parameters
from cratedb_toolkit.util.pandas import patch_pandas_sqltable_with_dialect_parameters

patch_pandas_io_sqldatabase_with_dialect_parameters(table_kwargs={"crate_column_policy": "'dynamic'"})
patch_pandas_sqltable_with_dialect_parameters(table_kwargs={"crate_column_policy": "'dynamic'"})
patch_pandas_sqltable_with_extended_mapping()

def activate_pymongo_adapter(self):
"""
Expand Down
33 changes: 18 additions & 15 deletions cratedb_toolkit/adapter/pymongo/backlog.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
# PyMongo CrateDB Adapter
# PyMongo CrateDB Adapter Backlog


## Iteration +1

Make it work.
- Translate query expressions.

- Using individual columns for fields does not work, because `insert_one` works
iteratively, and needs to evolve the table schema gradually. As a consequence,
we need to use `OBJECT(DYNAMIC)` for storing MongoDB fields.
- Add software tests
- Add documentation
- https://github.com/gordonbusyman/mongo-to-sql-converter
- https://github.com/2do2go/json-sql

## Iteration +2
- https://github.com/kolypto/py-mongosql
- https://github.com/SY-Xuan/mongo2sql
- https://github.com/nsragow/MongoToSqlParse
- https://github.com/sushmitharao2124/MongoToSQLConverter
- https://github.com/Phomint/MongoSQL

- Add documentation

Translate query expressions.

- https://github.com/gordonbusyman/mongo-to-sql-converter
- https://github.com/2do2go/json-sql
## Done

- https://github.com/SY-Xuan/mongo2sql
- https://github.com/nsragow/MongoToSqlParse
- https://github.com/sushmitharao2124/MongoToSQLConverter
- https://github.com/Phomint/MongoSQL
- Make it work
- Using individual columns for fields does not work, because `insert_one` works
iteratively, and needs to evolve the table schema gradually. As a consequence,
we need to use `OBJECT(DYNAMIC)` for storing MongoDB fields.
- Add software tests
61 changes: 52 additions & 9 deletions cratedb_toolkit/adapter/pymongo/collection.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import io
import logging
from typing import Any, Mapping, Optional, Union
from collections import abc
from typing import Any, Iterable, Iterator, Mapping, Optional, Union

import pandas as pd
from bson.raw_bson import RawBSONDocument
from crate.client.sqlalchemy.support import insert_bulk
from pymongo import common
from pymongo.client_session import ClientSession
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from pymongo.results import InsertOneResult
from pymongo.results import InsertManyResult, InsertOneResult
from pymongo.typings import _DocumentType

from cratedb_toolkit.adapter.pymongo.cursor import cursor_factory
from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId
from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId as ObjectId
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,12 +53,9 @@ def insert_one(
session: Optional[ClientSession] = None,
comment: Optional[Any] = None,
) -> InsertOneResult:
logger.debug(
f"Pretending to insert document into MongoDB: database={self.database.name}, collection={self.name}"
)
logger.debug(f"Document: {document}")
logger.debug(f"Reading document: {document}")
data = pd.DataFrame.from_records([document])
# logger.debug(f"Dataframe: {self.get_df_info()}, {data.tail()}") # noqa: ERA001
# logger.debug(f"Dataframe: {self.get_df_info(data)}, {data.tail()}") # noqa: ERA001
logger.debug(f"Inserting record into CrateDB: schema={self.database.name}, table={self.name}")

object_id_cratedb: Optional[str] = None
Expand Down Expand Up @@ -101,8 +101,51 @@ def insert_returning_id(pd_table, conn, keys, data_iter):
if object_id_cratedb is None:
raise ValueError("Object may have been created, but there is no object id")

object_id_mongodb = AmendedObjectId.from_str(object_id_cratedb)
object_id_mongodb = ObjectId.from_str(object_id_cratedb)
logger.debug(f"Created object with id: {object_id_mongodb!r}")
return InsertOneResult(inserted_id=object_id_mongodb, acknowledged=True)

def insert_many(
self,
documents: Iterable[Union[_DocumentType, RawBSONDocument]],
ordered: bool = True,
bypass_document_validation: bool = False,
session: Optional[ClientSession] = None,
comment: Optional[Any] = None,
) -> InsertManyResult:
if not isinstance(documents, abc.Iterable) or isinstance(documents, abc.Mapping) or not documents:
raise TypeError("documents must be a non-empty list")
inserted_ids: list[ObjectId] = []

def gen() -> Iterator[tuple[int, Mapping[str, Any]]]:
"""A generator that validates documents and handles _ids."""
for document in documents:
common.validate_is_document_type("document", document)
if not isinstance(document, RawBSONDocument):
if "_id" not in document:
identifier = ObjectId()
document["_id"] = str(identifier) # type: ignore[index]
inserted_ids.append(identifier)
yield document

logger.debug("Converting documents")
documents = list(gen())

logger.debug(f"Reading documents: {documents}")
data = pd.DataFrame.from_records(documents)
logger.debug(f"Dataframe: {self.get_df_info(data)}, {data.tail()}") # noqa: ERA001
logger.debug(f"Inserting records into CrateDB: schema={self.database.name}, table={self.name}")

data.to_sql(
name=self.name,
schema=self.database.name,
con=cratedb.engine,
index=False,
# TODO: Handle `append` vs. `replace`.
if_exists="append",
method=insert_bulk,
)

return InsertManyResult(inserted_ids, acknowledged=True)

return AmendedCollection
26 changes: 23 additions & 3 deletions cratedb_toolkit/adapter/pymongo/cursor.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import copy
import logging
import warnings
from collections import deque
from typing import Any, Iterable, Mapping, Optional, Union

import sqlalchemy as sa
from bson import SON
from pymongo import CursorType, helpers
from pymongo.client_session import ClientSession
from pymongo.collation import validate_collation_or_none
from pymongo.collection import Collection
from pymongo.common import validate_is_document_type
from pymongo.common import validate_is_document_type, validate_is_mapping
from pymongo.cursor import _QUERY_OPTIONS, Cursor, _Hint, _Sort
from pymongo.errors import InvalidOperation
from pymongo.message import _GetMore, _Query
from pymongo.read_preferences import _ServerMode
from pymongo.typings import _Address, _CollationIn, _DocumentType
from pymongo.write_concern import validate_boolean
from sqlalchemy.util import to_list

from cratedb_toolkit.adapter.pymongo.reactor import mongodb_query, table_to_model
from cratedb_toolkit.adapter.pymongo.util import AmendedObjectId
from cratedb_toolkit.util import DatabaseAdapter

Expand Down Expand Up @@ -77,7 +82,6 @@ def __init__(
self.__explicit_session = False

spec: Mapping[str, Any] = filter or {}
"""
validate_is_mapping("filter", spec)
if not isinstance(skip, int):
raise TypeError("skip must be an instance of int")
Expand Down Expand Up @@ -112,7 +116,6 @@ def __init__(
if allow_disk_use is not None:
allow_disk_use = validate_boolean("allow_disk_use", allow_disk_use)

"""
if projection is not None:
projection = helpers._fields_list_to_dict(projection, "projection")

Expand Down Expand Up @@ -279,6 +282,12 @@ def _refresh(self) -> int:

return len(self.__data)

def sort(self, key_or_list: _Hint, direction: Optional[Union[int, str]] = None) -> Cursor[_DocumentType]:
""" """
self.__ordering = to_list(key_or_list)
super().sort(key_or_list, direction=direction)
return self

def __send_message(self, operation: Union[_Query, _GetMore]) -> None:
"""
Usually sends a query or getmore operation and handles the response to/from a MongoDB server.
Expand All @@ -287,6 +296,15 @@ def __send_message(self, operation: Union[_Query, _GetMore]) -> None:
TODO: OperationFailure / self.close() / PinnedResponse / explain / batching
"""
metadata = sa.MetaData(schema=operation.db)
table_name = operation.coll

table = sa.Table(table_name, metadata, autoload_with=cratedb.engine)
table.append_column(sa.Column("_id", sa.String(), primary_key=True, system=True))
model = table_to_model(table)

query = mongodb_query(model=model, filter=self.__spec or {}, sort=self.__ordering or ["_id"])
"""
tbl = f'"{operation.db}"."{operation.coll}"'
sql = f"SELECT *, _id FROM {tbl}" # noqa: S608
if self.__ordering is not None:
Expand All @@ -307,6 +325,8 @@ def __send_message(self, operation: Union[_Query, _GetMore]) -> None:
sql += f" LIMIT {operation.limit}"
logger.debug(f"Running SQL: {sql}")
records = cratedb.run_sql(sql, records=True)
"""
records = query.fetchall(cratedb.engine)
for record in records:
record["_id"] = AmendedObjectId.from_str(record["_id"])
self.__data = deque(records)
Expand Down
73 changes: 73 additions & 0 deletions cratedb_toolkit/adapter/pymongo/reactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import typing as t

import sqlalchemy as sa
from jessiql import Query, QueryObject
from jessiql.exc import InvalidColumnError
from jessiql.typing import SARowDict


def table_to_model(table: sa.Table) -> t.Type[sa.orm.Mapper]:
"""
Create SQLAlchemy model class from Table object.
- https://docs.sqlalchemy.org/en/14/orm/mapping_styles.html#imperative-mapping
- https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
"""
mapper_registry = sa.orm.registry(metadata=table.metadata)
Surrogate = type("Surrogate", (), {})
mapper_registry.map_imperatively(Surrogate, table)
return Surrogate


def reflect_model(engine: t.Any, metadata: sa.MetaData, table_name: str) -> t.Type[sa.orm.Mapper]:
"""
Create SQLAlchemy model class by reflecting a database table.
"""
table = sa.Table(table_name, metadata, autoload_with=engine)
return table_to_model(table)


def mongodb_query(
model: t.Type[sa.orm.Mapper],
select: t.List = None,
filter: t.Dict = None, # noqa: A002
sort: t.Dict = None,
) -> Query:
"""
Create a JessiQL Query object from an SQLAlchemy model class and typical MongoDB query parameters.
"""

select = select or list(model._sa_class_manager.keys())

filter = filter or {} # noqa: A001
sort = sort or []

# TODO: select, filter, sort, skip, limit
if "_id" in filter:
filter["_id"] = str(filter["_id"])
query_dict = {"select": select, "filter": filter, "sort": sort}
query_object = QueryObject.from_query_object(query_dict)

try:
return Query(query=query_object, Model=model)
except InvalidColumnError as ex:
msg = str(ex)
if "Invalid column" in msg and "specified in filter" in msg:
return EmptyQuery()
else:
raise


class EmptyQuery(Query):
"""
A surrogate QueryExecutor for propagating back empty results.
"""

def __init__(self, *args, **kwargs):
self.related_executors = {}

def _load_results(self, *args, **kwargs) -> t.List[SARowDict]:
return []

def _apply_operations_to_results(self, *args, **kwargs) -> t.List[SARowDict]:
return []
6 changes: 5 additions & 1 deletion cratedb_toolkit/adapter/pymongo/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ def __repr__(self) -> str:
return f"ObjectId('{self!s}')"

def __generate(self) -> None:
"""Generate a new value for this ObjectId."""
"""
Generate a new value for this ObjectId.
TODO: Generate IDs of the same width like CrateDB.
"""
# 4 bytes current time
oid = struct.pack(">I", int(time.time()))

Expand Down
10 changes: 10 additions & 0 deletions cratedb_toolkit/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import typing as t

import sqlalchemy as sa
from crate.client.sqlalchemy.dialect import TYPES_MAP
from sqlalchemy import types as sqltypes


def patch_inspector():
Expand Down Expand Up @@ -32,3 +34,11 @@ def get_table_names(self, connection: sa.Connection, schema: t.Optional[str] = N
return get_table_names_dist(self, connection=connection, schema=schema, **kw)

CrateDialect.get_table_names = get_table_names # type: ignore


def patch_types_map():
"""
Register missing timestamp data type.
"""
# TODO: Submit patch to `crate-python`.
TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP
13 changes: 13 additions & 0 deletions cratedb_toolkit/util/date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import datetime as dt
import math


def truncate_milliseconds(timestamp: dt.datetime):
"""
Downgrade Python datetime objects from microseconds to milliseconds resolution.
Input: 2023-11-26 21:57:18.537624
Output: 2023-11-26 21:57:18.537000
"""
dec, integer = math.modf(timestamp.microsecond / 1000)
return timestamp - dt.timedelta(microseconds=round(dec * 1000))
Loading

0 comments on commit d38816d

Please sign in to comment.