From 9ee1b52aeb32e5eb72558851018abf2a5dcd2219 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 9 Oct 2024 07:24:33 +0200 Subject: [PATCH] IO: Improve `BulkProcessor` when running per-record operations ... by also checking `rowcount` for handling `INSERT OK, 0 rows` responses. --- CHANGES.md | 2 ++ cratedb_toolkit/io/core.py | 23 +++++++++----- cratedb_toolkit/io/dynamodb/copy.py | 3 -- cratedb_toolkit/io/mongodb/copy.py | 3 -- cratedb_toolkit/sqlalchemy/__init__.py | 0 cratedb_toolkit/sqlalchemy/patch.py | 19 ----------- pyproject.toml | 2 +- tests/io/mongodb/test_copy.py | 44 +++++++++++++++++++++++++- 8 files changed, 61 insertions(+), 35 deletions(-) delete mode 100644 cratedb_toolkit/sqlalchemy/__init__.py delete mode 100644 cratedb_toolkit/sqlalchemy/patch.py diff --git a/CHANGES.md b/CHANGES.md index 1ea9bfba..8ae1dbed 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## Unreleased +- IO: Improved `BulkProcessor` when running per-record operations by + also checking `rowcount` for handling `INSERT OK, 0 rows` responses ## 2024/10/01 v0.0.27 - MongoDB: Updated to pymongo 4.9 diff --git a/cratedb_toolkit/io/core.py b/cratedb_toolkit/io/core.py index 157e1e6d..91ca6cd2 100644 --- a/cratedb_toolkit/io/core.py +++ b/cratedb_toolkit/io/core.py @@ -49,7 +49,7 @@ def failed_records(self) -> t.List[t.Dict[str, t.Any]]: return [] errors: t.List[t.Dict[str, t.Any]] = [] for record, status in zip(self.parameters, self.cratedb_bulk_result): - if status["rowcount"] == -2: + if status["rowcount"] != 1: errors.append(record) return errors @@ -143,12 +143,17 @@ def start(self) -> BulkMetrics: try: cursor = self.connection.execute(statement=statement, parameters=operation.parameters) self.connection.commit() - cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None) - bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result) - failed_records = bulk_response.failed_records - count_success_local = bulk_response.success_count - self._metrics.count_success_total += bulk_response.success_count - self.progress_bar and self.progress_bar.update(n=bulk_response.success_count) + if cursor.rowcount > 0: + cratedb_bulk_result = getattr(cursor.context, "last_result", None) + bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result) + failed_records = bulk_response.failed_records + count_success_local = bulk_response.success_count + self._metrics.count_success_total += bulk_response.success_count + self.progress_bar and self.progress_bar.update(n=bulk_response.success_count) + else: + failed_records = operation.parameters + count_success_local = 0 + self.progress_bar and self.progress_bar.update(n=1) # When a batch is of size one, an exception is raised. # Just signal the same condition as if a batch would have failed. @@ -165,8 +170,10 @@ def start(self) -> BulkMetrics: ) for record in failed_records: try: - self.connection.execute(statement=statement, parameters=record) + cursor = self.connection.execute(statement=statement, parameters=record) self.connection.commit() + if cursor.rowcount != 1: + raise IOError("Record has not been processed") self._metrics.count_success_total += 1 except Exception as ex: logger.error(f"Operation failed: {ex}") diff --git a/cratedb_toolkit/io/dynamodb/copy.py b/cratedb_toolkit/io/dynamodb/copy.py index c3850597..1bd16222 100644 --- a/cratedb_toolkit/io/dynamodb/copy.py +++ b/cratedb_toolkit/io/dynamodb/copy.py @@ -10,7 +10,6 @@ from cratedb_toolkit.io.core import BulkProcessor from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter from cratedb_toolkit.model import DatabaseAddress -from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.data import asbool @@ -30,8 +29,6 @@ def __init__( progress: bool = False, debug: bool = True, ): - monkeypatch_executemany() - cratedb_address = DatabaseAddress.from_string(cratedb_url) cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() cratedb_table = cratedb_table_address.fullname diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index 27ce6bac..d15fb26b 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -13,7 +13,6 @@ from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress -from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany from cratedb_toolkit.util import DatabaseAdapter logger = logging.getLogger(__name__) @@ -33,8 +32,6 @@ def __init__( progress: bool = False, debug: bool = True, ): - monkeypatch_executemany() - self.mongodb_uri = URL(mongodb_url) self.cratedb_uri = URL(cratedb_url) diff --git a/cratedb_toolkit/sqlalchemy/__init__.py b/cratedb_toolkit/sqlalchemy/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/cratedb_toolkit/sqlalchemy/patch.py b/cratedb_toolkit/sqlalchemy/patch.py deleted file mode 100644 index 9537f5bc..00000000 --- a/cratedb_toolkit/sqlalchemy/patch.py +++ /dev/null @@ -1,19 +0,0 @@ -from sqlalchemy_cratedb import dialect - - -def do_executemany(self, cursor, statement, parameters, context=None): - """ - Improved version of `do_executemany` that stores its response into the request context instance. - - TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`. - """ - result = cursor.executemany(statement, parameters) - if context is not None: - context.last_executemany_result = result - - -def monkeypatch_executemany(): - """ - Enable improved version of `do_executemany`. - """ - dialect.do_executemany = do_executemany diff --git a/pyproject.toml b/pyproject.toml index afb7e26c..8e422ae1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ dependencies = [ "python-dotenv<2", "python-slugify<9", "pyyaml<7", - "sqlalchemy-cratedb>=0.37,<1", + "sqlalchemy-cratedb>=0.40,<1", "sqlparse<0.6", "tqdm<5", 'typing-extensions<5; python_version <= "3.7"', diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index 7c9a11d6..1a16a3f3 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -325,7 +325,7 @@ def test_mongodb_copy_filesystem_bson(caplog, cratedb): assert timestamp_type == "bigint" -def test_mongodb_copy_http_json_relaxed(caplog, cratedb): +def test_mongodb_copy_http_json_relaxed_books(caplog, cratedb): """ Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP. """ @@ -355,3 +355,45 @@ def test_mongodb_copy_http_json_relaxed(caplog, cratedb): "Charlie Collins", "Robi Sen", ] + + +def test_mongodb_copy_http_json_relaxed_products(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP. + + `datasets/products.json` includes one invalid record. + """ + + # Define source and target URLs. + json_resource = "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/products.json" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + jqlang_transformation = """ + .[] |= ( + select(true) + | if (.for) then .for |= to_array end + | if (.type) then .type |= to_array end + | if (.limits.data.n) then .limits.data.n |= tostring end + | if (.limits.sms.n) then .limits.sms.n |= tostring end + | if (.limits.voice.n) then .limits.voice.n |= tostring end + ) + """ + transformation = TransformationProject().add( + CollectionTransformation( + address=CollectionAddress(container="datasets", name="products"), + pre=MokshaTransformation().jq(jqlang_transformation), + ) + ) + mongodb_copy(json_resource, cratedb_url, transformation=transformation) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 10 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 'ac3';", records=True) + assert results[0]["data"]["name"] == "AC3 Phone" + + assert "Bulk processor metrics: BulkMetrics(count_success_total=10, count_error_total=1" in caplog.text