From 713f133202c0541dc3ca52e83993e6fc9e45a3b0 Mon Sep 17 00:00:00 2001 From: Wilfried BARADAT Date: Mon, 7 Aug 2023 10:35:09 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8F=97=EF=B8=8F(backends)=20add=20close?= =?UTF-8?q?=20method=20to=20sync=20backends?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Synchronous backends such as Elasticsearch or Mongo need their connection to be closed when finished. This commits adds an abstract method close to the BaseDataBackend interface, and implements it for backends that need it. --- src/ralph/backends/data/async_es.py | 3 +- src/ralph/backends/data/async_mongo.py | 2 +- src/ralph/backends/data/base.py | 10 +- src/ralph/backends/data/clickhouse.py | 17 +++ src/ralph/backends/data/es.py | 17 +++ src/ralph/backends/data/fs.py | 6 + src/ralph/backends/data/ldp.py | 6 + src/ralph/backends/data/mongo.py | 23 +++- src/ralph/backends/data/s3.py | 24 +++- src/ralph/backends/data/swift.py | 17 +++ tests/backends/data/test_async_es.py | 20 ++- tests/backends/data/test_async_mongo.py | 13 +- tests/backends/data/test_base.py | 6 + tests/backends/data/test_clickhouse.py | 57 +++++++++ tests/backends/data/test_es.py | 78 ++++++++++++ tests/backends/data/test_fs.py | 12 +- tests/backends/data/test_ldp.py | 10 ++ tests/backends/data/test_mongo.py | 55 ++++++++ tests/backends/data/test_s3.py | 159 ++++++++++++++++-------- tests/backends/data/test_swift.py | 70 ++++++++++- tests/backends/lrs/test_clickhouse.py | 6 + tests/backends/lrs/test_es.py | 11 ++ tests/backends/lrs/test_mongo.py | 6 + 23 files changed, 557 insertions(+), 71 deletions(-) diff --git a/src/ralph/backends/data/async_es.py b/src/ralph/backends/data/async_es.py index b53b0a256..3b39d7fcd 100644 --- a/src/ralph/backends/data/async_es.py +++ b/src/ralph/backends/data/async_es.py @@ -264,9 +264,10 @@ async def close(self) -> None: """Close the AsyncElasticsearch client. Raise: - BackendException: If a failure during the close operation occurs. + BackendException: If a failure occurs during the close operation. """ if not self._client: + logger.warning("No backend client to close.") return try: diff --git a/src/ralph/backends/data/async_mongo.py b/src/ralph/backends/data/async_mongo.py index 8e1eb6738..a13d54324 100644 --- a/src/ralph/backends/data/async_mongo.py +++ b/src/ralph/backends/data/async_mongo.py @@ -256,7 +256,7 @@ async def close(self) -> None: """Close the AsyncIOMotorClient client. Raise: - BackendException: If a failure during the close operation occurs. + BackendException: If a failure occurs during the close operation. """ try: self.client.close() diff --git a/src/ralph/backends/data/base.py b/src/ralph/backends/data/base.py index 4d6ac2b0a..da20328f3 100644 --- a/src/ralph/backends/data/base.py +++ b/src/ralph/backends/data/base.py @@ -227,6 +227,14 @@ def write( # pylint: disable=too-many-arguments BackendParameterException: If a backend argument value is not valid. """ + @abstractmethod + def close(self) -> None: + """Close the data backend client. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + class BaseAsyncDataBackend(ABC): """Base async data backend interface.""" @@ -381,5 +389,5 @@ async def close(self) -> None: """Close the data backend client. Raise: - BackendException: If a failure during the close operation occurs. + BackendException: If a failure occurs during the close operation. """ diff --git a/src/ralph/backends/data/clickhouse.py b/src/ralph/backends/data/clickhouse.py index b417b18cf..5b5c09a4c 100755 --- a/src/ralph/backends/data/clickhouse.py +++ b/src/ralph/backends/data/clickhouse.py @@ -382,6 +382,23 @@ def write( # pylint: disable=too-many-arguments return count + def close(self) -> None: + """Close the ClickHouse backend client. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + if not self._client: + logger.warning("No backend client to close.") + return + + try: + self.client.close() + except ClickHouseError as error: + msg = "Failed to close ClickHouse client: %s" + logger.error(msg, error) + raise BackendException(msg % error) from error + @staticmethod def _to_insert_tuples( data: Iterable[dict], diff --git a/src/ralph/backends/data/es.py b/src/ralph/backends/data/es.py index d9b1174d4..b7a7a9662 100644 --- a/src/ralph/backends/data/es.py +++ b/src/ralph/backends/data/es.py @@ -344,6 +344,23 @@ def write( # pylint: disable=too-many-arguments raise BackendException(msg % (error, details, count)) from error return count + def close(self) -> None: + """Close the Elasticsearch backend client. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + if not self._client: + logger.warning("No backend client to close.") + return + + try: + self.client.close() + except TransportError as error: + msg = "Failed to close Elasticsearch client: %s" + logger.error(msg, error) + raise BackendException(msg % error) from error + @staticmethod def to_documents( data: Iterable[dict], diff --git a/src/ralph/backends/data/fs.py b/src/ralph/backends/data/fs.py index 1e3479ce8..8bba06374 100644 --- a/src/ralph/backends/data/fs.py +++ b/src/ralph/backends/data/fs.py @@ -308,6 +308,12 @@ def write( # pylint: disable=too-many-arguments ) return 1 + def close(self) -> None: + """FS backend has nothing to close, this method is not implemented.""" + msg = "FS data backend does not support `close` method" + logger.error(msg) + raise NotImplementedError(msg) + @staticmethod def _read_raw(file: IO, chunk_size: int, _ignore_errors: bool) -> Iterator[bytes]: """Read the `file` in chunks of size `chunk_size` and yield them.""" diff --git a/src/ralph/backends/data/ldp.py b/src/ralph/backends/data/ldp.py index 6f39d0b19..cfa8cf18d 100644 --- a/src/ralph/backends/data/ldp.py +++ b/src/ralph/backends/data/ldp.py @@ -226,6 +226,12 @@ def write( # pylint: disable=too-many-arguments logger.error(msg, target) raise NotImplementedError(msg % target) + def close(self) -> None: + """LDP client does not support close, this method is not implemented.""" + msg = "LDP data backend does not support `close` method" + logger.error(msg) + raise NotImplementedError(msg) + def _get_archive_endpoint(self, stream_id: Union[None, str] = None) -> str: """Return OVH's archive endpoint.""" stream_id = stream_id if stream_id else self.stream_id diff --git a/src/ralph/backends/data/mongo.py b/src/ralph/backends/data/mongo.py index 96f8704ee..432952678 100644 --- a/src/ralph/backends/data/mongo.py +++ b/src/ralph/backends/data/mongo.py @@ -16,7 +16,13 @@ from pydantic import Json, MongoDsn, constr from pymongo import MongoClient, ReplaceOne from pymongo.collection import Collection -from pymongo.errors import BulkWriteError, ConnectionFailure, InvalidName, PyMongoError +from pymongo.errors import ( + BulkWriteError, + ConnectionFailure, + InvalidName, + InvalidOperation, + PyMongoError, +) from ralph.backends.data.base import ( BaseDataBackend, @@ -113,7 +119,7 @@ def status(self) -> DataBackendStatus: # Check MongoDB connection. try: self.client.admin.command("ping") - except ConnectionFailure as error: + except (ConnectionFailure, InvalidOperation) as error: logger.error("Failed to connect to MongoDB: %s", error) return DataBackendStatus.AWAY @@ -296,6 +302,19 @@ def write( # pylint: disable=too-many-arguments return count + def close(self) -> None: + """Close the MongoDB backend client. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + try: + self.client.close() + except PyMongoError as error: + msg = "Failed to close MongoDB client: %s" + logger.error(msg, error) + raise BackendException(msg % error) from error + @staticmethod def iter_by_batch(data: Iterable[dict], chunk_size: int): """Iterate over `data` Iterable and yield batches of size `chunk_size`.""" diff --git a/src/ralph/backends/data/s3.py b/src/ralph/backends/data/s3.py index 33c57f175..c20521d80 100644 --- a/src/ralph/backends/data/s3.py +++ b/src/ralph/backends/data/s3.py @@ -11,6 +11,7 @@ from boto3.s3.transfer import TransferConfig from botocore.exceptions import ( ClientError, + EndpointConnectionError, ParamValidationError, ReadTimeoutError, ResponseStreamingError, @@ -102,7 +103,7 @@ def status(self) -> DataBackendStatus: """ try: self.client.head_bucket(Bucket=self.default_bucket_name) - except ClientError: + except (ClientError, EndpointConnectionError): return DataBackendStatus.ERROR return DataBackendStatus.OK @@ -197,7 +198,7 @@ def read( try: response = self.client.get_object(Bucket=target, Key=query.query_string) - except ClientError as err: + except (ClientError, EndpointConnectionError) as err: error_msg = err.response["Error"]["Message"] msg = "Failed to download %s: %s" logger.error(msg, query.query_string, error_msg) @@ -321,7 +322,7 @@ def write( # pylint: disable=too-many-arguments Config=TransferConfig(multipart_chunksize=chunk_size), ) response = self.client.head_object(Bucket=target_bucket, Key=target_object) - except (ClientError, ParamValidationError) as exc: + except (ClientError, ParamValidationError, EndpointConnectionError) as exc: msg = "Failed to upload %s" logger.error(msg, target) raise BackendException(msg % target) from exc @@ -340,6 +341,23 @@ def write( # pylint: disable=too-many-arguments return counter["count"] + def close(self) -> None: + """Close the S3 backend client. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + if not self._client: + logger.warning("No backend client to close.") + return + + try: + self.client.close() + except ClientError as error: + msg = "Failed to close S3 backend client: %s" + logger.error(msg, error) + raise BackendException(msg % error) from error + @staticmethod def _read_raw( obj: StreamingBody, chunk_size: int, _ignore_errors: bool diff --git a/src/ralph/backends/data/swift.py b/src/ralph/backends/data/swift.py index 12764c6e4..18516d570 100644 --- a/src/ralph/backends/data/swift.py +++ b/src/ralph/backends/data/swift.py @@ -346,6 +346,23 @@ def write( # pylint: disable=too-many-arguments, disable=too-many-branches ) return count + def close(self) -> None: + """Close the Swift backend client. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + if not self._connection: + logger.warning("No backend client to close.") + return + + try: + self.connection.close() + except ClientException as error: + msg = "Failed to close Swift backend client: %s" + logger.error(msg, error) + raise BackendException(msg % error) from error + def _details(self, container: str, name: str): """Return `name` object details from `container`.""" try: diff --git a/tests/backends/data/test_async_es.py b/tests/backends/data/test_async_es.py index 3b121cf1b..a18745897 100644 --- a/tests/backends/data/test_async_es.py +++ b/tests/backends/data/test_async_es.py @@ -825,7 +825,7 @@ async def test_backends_data_async_es_data_backend_write_method_with_datastream( @pytest.mark.anyio -async def test_backends_data_es_data_backend_close_method( +async def test_backends_data_es_data_backend_close_method_with_failure( async_es_backend, monkeypatch ): """Test the `AsyncESDataBackend.close` method.""" @@ -840,3 +840,21 @@ async def mock_connection_error(): with pytest.raises(BackendException, match="Failed to close Elasticsearch client"): await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_es_data_backend_close_method(async_es_backend, caplog): + """Test the `AsyncESDataBackend.close` method.""" + + # No client instantiated + backend = async_es_backend() + await backend.close() + backend._client = None # pylint: disable=protected-access + with caplog.at_level(logging.WARNING): + await backend.close() + + assert ( + "ralph.backends.data.async_es", + logging.WARNING, + "No backend client to close.", + ) in caplog.record_tuples diff --git a/tests/backends/data/test_async_mongo.py b/tests/backends/data/test_async_mongo.py index 12782d984..3f916b449 100644 --- a/tests/backends/data/test_async_mongo.py +++ b/tests/backends/data/test_async_mongo.py @@ -1056,7 +1056,7 @@ async def test_backends_data_async_mongo_data_backend_write_method_with_custom_c @pytest.mark.anyio -async def test_backends_data_async_mongo_data_backend_close( +async def test_backends_data_async_mongo_data_backend_close_method_with_failure( async_mongo_backend, monkeypatch, caplog ): """Test the `AsyncMongoDataBackend.close` method, given a failed close, @@ -1084,3 +1084,14 @@ def close(): logging.ERROR, "Failed to close AsyncIOMotorClient: Close failure", ) in caplog.record_tuples + + +@pytest.mark.anyio +async def test_backends_data_async_mongo_data_backend_close_method(async_mongo_backend): + """Test the `AsyncMongoDataBackend.close` method.""" + + backend = async_mongo_backend() + + # Not possible to connect to client after closing it + await backend.close() + assert await backend.status() == DataBackendStatus.AWAY diff --git a/tests/backends/data/test_base.py b/tests/backends/data/test_base.py index deacdddfd..63ca68df2 100644 --- a/tests/backends/data/test_base.py +++ b/tests/backends/data/test_base.py @@ -39,6 +39,9 @@ def list(self): # pylint: disable=arguments-differ,missing-function-docstring def write(self): # pylint: disable=arguments-differ,missing-function-docstring pass + def close(self): # pylint: disable=arguments-differ,missing-function-docstring + pass + MockBaseDataBackend().read(query=value) @@ -80,6 +83,9 @@ def list(self): # pylint: disable=arguments-differ,missing-function-docstring def write(self): # pylint: disable=arguments-differ,missing-function-docstring pass + def close(self): # pylint: disable=arguments-differ,missing-function-docstring + pass + with pytest.raises(BackendParameterException, match=error): with caplog.at_level(logging.ERROR): MockBaseDataBackend().read(query=value) diff --git a/tests/backends/data/test_clickhouse.py b/tests/backends/data/test_clickhouse.py index c0876ce37..6d5bc4f40 100644 --- a/tests/backends/data/test_clickhouse.py +++ b/tests/backends/data/test_clickhouse.py @@ -51,6 +51,7 @@ def test_backends_data_clickhouse_data_backend_default_instantiation(monkeypatch assert backend.event_table_name == "xapi_events_all" assert backend.default_chunk_size == 500 assert backend.locale_encoding == "utf8" + backend.close() def test_backends_data_clickhouse_data_backend_instantiation_with_settings(): @@ -75,6 +76,7 @@ def test_backends_data_clickhouse_data_backend_instantiation_with_settings(): assert backend.event_table_name == CLICKHOUSE_TEST_TABLE_NAME assert backend.default_chunk_size == 1000 assert backend.locale_encoding == "utf-16" + backend.close() def test_backends_data_clickhouse_data_backend_status( @@ -93,6 +95,7 @@ def mock_query(*_, **__): monkeypatch.setattr(backend.client, "query", mock_query) assert backend.status() == DataBackendStatus.AWAY + backend.close() def test_backends_data_clickhouse_data_backend_read_method_with_raw_output( @@ -130,6 +133,7 @@ def test_backends_data_clickhouse_data_backend_read_method_with_raw_output( assert len(results) == 3 assert isinstance(results[0], bytes) assert json.loads(results[0])["event"] == statements[0] + backend.close() # pylint: disable=unused-argument @@ -203,6 +207,7 @@ def test_backends_data_clickhouse_data_backend_read_method_with_a_custom_query( results = list(backend.read(query=query)) assert len(results) == 1 assert results[0]["event"] == statements[1] + backend.close() def test_backends_data_clickhouse_data_backend_read_method_with_failures( @@ -276,6 +281,7 @@ def mock_query(*_, **__): logging.ERROR, msg, ) in caplog.record_tuples + backend.close() def test_backends_data_clickhouse_data_backend_list_method( @@ -287,6 +293,7 @@ def test_backends_data_clickhouse_data_backend_list_method( assert list(backend.list(details=True)) == [{"name": CLICKHOUSE_TEST_TABLE_NAME}] assert list(backend.list(details=False)) == [CLICKHOUSE_TEST_TABLE_NAME] + backend.close() def test_backends_data_clickhouse_data_backend_list_method_with_failure( @@ -315,6 +322,7 @@ def mock_query(*_, **__): logging.ERROR, msg, ) in caplog.record_tuples + backend.close() def test_backends_data_clickhouse_data_backend_write_method_with_invalid_timestamp( @@ -343,6 +351,7 @@ def test_backends_data_clickhouse_data_backend_write_method_with_invalid_timesta match=msg, ): backend.write(statements, ignore_errors=False) + backend.close() def test_backends_data_clickhouse_data_backend_write_method_no_timestamp( @@ -388,6 +397,7 @@ def test_backends_data_clickhouse_data_backend_write_method_no_timestamp( logging.ERROR, f"Statement {statement} has an invalid 'id' or 'timestamp' field", ) not in caplog.record_tuples + backend.close() def test_backends_data_clickhouse_data_backend_write_method_with_duplicated_key( @@ -412,6 +422,7 @@ def test_backends_data_clickhouse_data_backend_write_method_with_duplicated_key( with pytest.raises(BackendException, match="Duplicate IDs found in batch"): backend.write(statements, ignore_errors=False) + backend.close() def test_backends_data_clickhouse_data_backend_write_method_chunks_on_error( @@ -435,6 +446,7 @@ def test_backends_data_clickhouse_data_backend_write_method_chunks_on_error( {"id": dupe_id, **timestamp}, ] assert backend.write(statements, ignore_errors=True) == 0 + backend.close() def test_backends_data_clickhouse_data_backend_write_method( @@ -472,6 +484,7 @@ def test_backends_data_clickhouse_data_backend_write_method( assert result[1]["event_id"] == native_statements[1]["id"] assert result[1]["emission_time"] == native_statements[1]["timestamp"] assert result[1]["event"] == statements[1] + backend.close() def test_backends_data_clickhouse_data_backend_write_method_bytes( @@ -514,6 +527,7 @@ def test_backends_data_clickhouse_data_backend_write_method_bytes( assert result[1]["event_id"] == native_statements[1]["id"] assert result[1]["emission_time"] == native_statements[1]["timestamp"] assert result[1]["event"] == statements[1] + backend.close() def test_backends_data_clickhouse_data_backend_write_method_bytes_failed( @@ -545,6 +559,7 @@ def test_backends_data_clickhouse_data_backend_write_method_bytes_failed( result = clickhouse.query(sql).result_set assert result[0][0] == 0 + backend.close() def test_backends_data_clickhouse_data_backend_write_method_empty( @@ -563,6 +578,7 @@ def test_backends_data_clickhouse_data_backend_write_method_empty( result = clickhouse.query(sql).result_set assert result[0][0] == 0 + backend.close() def test_backends_data_clickhouse_data_backend_write_method_wrong_operation_type( @@ -589,6 +605,7 @@ def test_backends_data_clickhouse_data_backend_write_method_wrong_operation_type match=f"{BaseOperationType.APPEND.name} operation_type is not allowed.", ): backend.write(data=statements, operation_type=BaseOperationType.APPEND) + backend.close() def test_backends_data_clickhouse_data_backend_write_method_with_custom_chunk_size( @@ -626,3 +643,43 @@ def test_backends_data_clickhouse_data_backend_write_method_with_custom_chunk_si assert result[1]["event_id"] == native_statements[1]["id"] assert result[1]["emission_time"] == native_statements[1]["timestamp"] assert result[1]["event"] == statements[1] + backend.close() + + +def test_backends_data_clickhouse_data_backend_close_method_with_failure( + clickhouse_backend, monkeypatch +): + """Test the `ClickHouseDataBackend.close` method with failure.""" + + backend = clickhouse_backend() + + def mock_connection_error(): + """ClickHouse client close mock that raises a connection error.""" + raise ClickHouseError("", (Exception("Mocked connection error"),)) + + monkeypatch.setattr(backend.client, "close", mock_connection_error) + + with pytest.raises(BackendException, match="Failed to close ClickHouse client"): + backend.close() + + +def test_backends_data_clickhouse_data_backend_close_method(clickhouse_backend, caplog): + """Test the `ClickHouseDataBackend.close` method.""" + + backend = clickhouse_backend() + + # Not possible to connect to client after closing it + backend.close() + assert backend.status() == DataBackendStatus.AWAY + + # No client instantiated + backend = clickhouse_backend() + backend._client = None # pylint: disable=protected-access + with caplog.at_level(logging.WARNING): + backend.close() + + assert ( + "ralph.backends.data.clickhouse", + logging.WARNING, + "No backend client to close.", + ) in caplog.record_tuples diff --git a/tests/backends/data/test_es.py b/tests/backends/data/test_es.py index ed0b116e6..0e3f06072 100644 --- a/tests/backends/data/test_es.py +++ b/tests/backends/data/test_es.py @@ -108,6 +108,8 @@ def test_backends_data_es_data_backend_instantiation_with_settings(): except Exception as err: # pylint:disable=broad-except pytest.fail(f"Two ESDataBackends should not raise exceptions: {err}") + backend.close() + def test_backends_data_es_data_backend_status_method(monkeypatch, es_backend, caplog): """Test the `ESDataBackend.status` method.""" @@ -157,6 +159,8 @@ def mock_connection_error(): "Exception(Mocked connection error)", ) in caplog.record_tuples + backend.close() + @pytest.mark.parametrize( "exception, error", @@ -189,6 +193,8 @@ def mock_get(index): f"Failed to read indices: {error}", ) in caplog.record_tuples + backend.close() + def test_backends_data_es_data_backend_list_method_without_history( es_backend, monkeypatch @@ -208,6 +214,8 @@ def mock_get(index): assert isinstance(result, Iterable) assert list(result) == list(indices.keys()) + backend.close() + def test_backends_data_es_data_backend_list_method_with_details( es_backend, monkeypatch @@ -229,6 +237,8 @@ def mock_get(index): {"index_2": {"info_2": "baz"}}, ] + backend.close() + def test_backends_data_es_data_backend_list_method_with_history( es_backend, caplog, monkeypatch @@ -247,6 +257,8 @@ def test_backends_data_es_data_backend_list_method_with_history( "The `new` argument is ignored", ) in caplog.record_tuples + backend.close() + @pytest.mark.parametrize( "exception, error", @@ -300,6 +312,8 @@ def mock_es_search_open_pit(**kwargs): "Failed to open Elasticsearch point in time: %s" % error.replace("\\", ""), ) in caplog.record_tuples + backend.close() + def test_backends_data_es_data_backend_read_method_with_ignore_errors( es, es_backend, monkeypatch, caplog @@ -319,6 +333,8 @@ def test_backends_data_es_data_backend_read_method_with_ignore_errors( "The `ignore_errors` argument is ignored", ) in caplog.record_tuples + backend.close() + def test_backends_data_es_data_backend_read_method_with_raw_ouput(es, es_backend): """Test the `ESDataBackend.read` method with `raw_output` set to `True`.""" @@ -331,6 +347,8 @@ def test_backends_data_es_data_backend_read_method_with_raw_ouput(es, es_backend assert isinstance(hit, bytes) assert json.loads(hit).get("_source") == documents[i] + backend.close() + def test_backends_data_es_data_backend_read_method_without_raw_ouput(es, es_backend): """Test the `ESDataBackend.read` method with `raw_output` set to `False`.""" @@ -343,6 +361,8 @@ def test_backends_data_es_data_backend_read_method_without_raw_ouput(es, es_back assert isinstance(hit, dict) assert hit.get("_source") == documents[i] + backend.close() + def test_backends_data_es_data_backend_read_method_with_query(es, es_backend, caplog): """Test the `ESDataBackend.read` method with a query.""" @@ -402,6 +422,8 @@ def test_backends_data_es_data_backend_read_method_with_query(es, es_backend, ca "'type': 'value_error.extra'}]", ) in caplog.record_tuples + backend.close() + def test_backends_data_es_data_backend_write_method_with_create_operation( es, es_backend, caplog @@ -449,6 +471,8 @@ def test_backends_data_es_data_backend_write_method_with_create_operation( hits = list(backend.read()) assert [hit["_source"] for hit in hits] == [{"value": str(idx)} for idx in range(9)] + backend.close() + def test_backends_data_es_data_backend_write_method_with_delete_operation( es, @@ -474,6 +498,8 @@ def test_backends_data_es_data_backend_write_method_with_delete_operation( assert len(hits) == 7 assert sorted([hit["_source"]["id"] for hit in hits]) == list(range(3, 10)) + backend.close() + def test_backends_data_es_data_backend_write_method_with_update_operation( es, @@ -518,6 +544,8 @@ def test_backends_data_es_data_backend_write_method_with_update_operation( map(lambda x: str(x + 10), range(10)) ) + backend.close() + def test_backends_data_es_data_backend_write_method_with_append_operation( es_backend, caplog @@ -537,6 +565,8 @@ def test_backends_data_es_data_backend_write_method_with_append_operation( "Append operation_type is not supported.", ) in caplog.record_tuples + backend.close() + def test_backends_data_es_data_backend_write_method_with_target(es, es_backend): """Test the `ESDataBackend.write` method, given a target index, should insert @@ -570,6 +600,8 @@ def get_data(): {"value": "2"}, ] + backend.close() + def test_backends_data_es_data_backend_write_method_without_ignore_errors( es, es_backend, caplog @@ -639,6 +671,8 @@ def test_backends_data_es_data_backend_write_method_without_ignore_errors( hits = list(backend.read()) assert len(hits) == 5 + backend.close() + def test_backends_data_es_data_backend_write_method_with_ignore_errors(es, es_backend): """Test the `ESDataBackend.write` method with `ignore_errors` set to `True`, given @@ -676,6 +710,8 @@ def test_backends_data_es_data_backend_write_method_with_ignore_errors(es, es_ba assert len(hits) == 11 assert [hit["_source"] for hit in hits[9:]] == [{"foo": "bar"}, {"foo": "baz"}] + backend.close() + def test_backends_data_es_data_backend_write_method_with_datastream( es_data_stream, es_backend @@ -693,3 +729,45 @@ def test_backends_data_es_data_backend_write_method_with_datastream( hits = list(backend.read()) assert len(hits) == 10 assert sorted([hit["_source"]["id"] for hit in hits]) == list(range(10)) + + backend.close() + + +def test_backends_data_es_data_backend_close_method_with_failure( + es_backend, monkeypatch +): + """Test the `ESDataBackend.close` method.""" + + backend = es_backend() + + def mock_connection_error(): + """ES client close mock that raises a connection error.""" + raise ESConnectionError("", (Exception("Mocked connection error"),)) + + monkeypatch.setattr(backend.client, "close", mock_connection_error) + + with pytest.raises(BackendException, match="Failed to close Elasticsearch client"): + backend.close() + + +def test_backends_data_es_data_backend_close_method(es_backend, caplog): + """Test the `ESDataBackend.close` method.""" + + backend = es_backend() + backend.status() + + # Not possible to connect to client after closing it + backend.close() + assert backend.status() == DataBackendStatus.AWAY + + # No client instantiated + backend = es_backend() + backend._client = None # pylint: disable=protected-access + with caplog.at_level(logging.WARNING): + backend.close() + + assert ( + "ralph.backends.data.es", + logging.WARNING, + "No backend client to close.", + ) in caplog.record_tuples diff --git a/tests/backends/data/test_fs.py b/tests/backends/data/test_fs.py index 51779a34f..9d6133e72 100644 --- a/tests/backends/data/test_fs.py +++ b/tests/backends/data/test_fs.py @@ -1,4 +1,4 @@ -"""Tests for Ralph fs data backend""" +"""Tests for Ralph fs data backend""" # pylint: disable = too-many-lines import json import logging import os @@ -996,3 +996,13 @@ def test_backends_data_fs_data_backend_write_method_without_target( "timestamp": frozen_now, }, ] + + +def test_backends_data_fs_data_backend_close_method(fs_backend): + """Test that the `FSDataBackend.close` method raise an error.""" + + backend = fs_backend() + + error = "FS data backend does not support `close` method" + with pytest.raises(NotImplementedError, match=error): + backend.close() diff --git a/tests/backends/data/test_ldp.py b/tests/backends/data/test_ldp.py index 20740980c..a80e7a8b8 100644 --- a/tests/backends/data/test_ldp.py +++ b/tests/backends/data/test_ldp.py @@ -698,3 +698,13 @@ def mock_post(url): backend = ldp_backend() monkeypatch.setattr(backend.client, "post", mock_post) assert backend._url(archive_name) == archive_url + + +def test_backends_data_ldp_data_backend_close_method(ldp_backend): + """Test that the `LDPDataBackend.close` method raise an error.""" + + backend = ldp_backend() + + error = "LDP data backend does not support `close` method" + with pytest.raises(NotImplementedError, match=error): + backend.close() diff --git a/tests/backends/data/test_mongo.py b/tests/backends/data/test_mongo.py index 25d5d6049..2c19b2220 100644 --- a/tests/backends/data/test_mongo.py +++ b/tests/backends/data/test_mongo.py @@ -51,6 +51,7 @@ def test_backends_data_mongo_data_backend_default_instantiation(monkeypatch, fs) assert backend.settings.CLIENT_OPTIONS == MongoClientOptions() assert backend.settings.DEFAULT_CHUNK_SIZE == 500 assert backend.settings.LOCALE_ENCODING == "utf8" + backend.close() def test_backends_data_mongo_data_backend_instantiation_with_settings(): @@ -75,6 +76,7 @@ def test_backends_data_mongo_data_backend_instantiation_with_settings(): MongoDataBackend(settings) except Exception as err: # pylint:disable=broad-except pytest.fail(f"Two MongoDataBackends should not raise exceptions: {err}") + backend.close() def test_backends_data_mongo_data_backend_status_with_connection_failure( @@ -163,6 +165,7 @@ def test_backends_data_mongo_data_backend_status_with_ok_status(mongo_backend): """ backend = mongo_backend() assert backend.status() == DataBackendStatus.OK + backend.close() @pytest.mark.parametrize("invalid_character", [" ", ".", "/", '"']) @@ -182,6 +185,7 @@ def test_backends_data_mongo_data_backend_list_method_with_invalid_target( list(backend.list(f"foo{invalid_character}bar")) assert ("ralph.backends.data.mongo", logging.ERROR, msg) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_list_method_with_failure( @@ -203,6 +207,7 @@ def list_collections(): list(backend.list()) assert ("ralph.backends.data.mongo", logging.ERROR, msg) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_list_method_without_history( @@ -221,6 +226,7 @@ def test_backends_data_mongo_data_backend_list_method_without_history( sorted([MONGO_TEST_COLLECTION, "bar", "baz"]) ) assert not list(backend.list("non_existent_database")) + backend.close() def test_backends_data_mongo_data_backend_list_method_with_history( @@ -238,6 +244,7 @@ def test_backends_data_mongo_data_backend_list_method_with_history( logging.WARNING, "The `new` argument is ignored", ) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_read_method_with_raw_output( @@ -262,6 +269,7 @@ def test_backends_data_mongo_data_backend_read_method_with_raw_output( assert list(backend.read(raw_output=True, target="foobar")) == expected[:2] assert list(backend.read(raw_output=True, chunk_size=2)) == expected assert list(backend.read(raw_output=True, chunk_size=1000)) == expected + backend.close() def test_backends_data_mongo_data_backend_read_method_without_raw_output( @@ -286,6 +294,7 @@ def test_backends_data_mongo_data_backend_read_method_without_raw_output( assert list(backend.read(target="foobar")) == expected[:2] assert list(backend.read(chunk_size=2)) == expected assert list(backend.read(chunk_size=1000)) == expected + backend.close() @pytest.mark.parametrize( @@ -313,6 +322,7 @@ def test_backends_data_mongo_data_backend_read_method_with_invalid_target( list(backend.read(target=invalid_target)) assert ("ralph.backends.data.mongo", logging.ERROR, msg) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_read_method_with_failure( @@ -336,6 +346,7 @@ def mock_find(batch_size, query=None): list(backend.read()) assert ("ralph.backends.data.mongo", logging.ERROR, msg) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_read_method_with_ignore_errors( @@ -370,6 +381,7 @@ def test_backends_data_mongo_data_backend_read_method_with_ignore_errors( "Failed to convert document to bytes: " "Object of type ObjectId is not JSON serializable", ) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_read_method_without_ignore_errors( @@ -414,6 +426,7 @@ def test_backends_data_mongo_data_backend_read_method_without_ignore_errors( error_log = ("ralph.backends.data.mongo", logging.ERROR, msg) assert len(list(filter(lambda x: x == error_log, caplog.record_tuples))) == 4 + backend.close() @pytest.mark.parametrize( @@ -454,6 +467,7 @@ def test_backends_data_mongo_data_backend_read_method_with_query( assert list(backend.read(query=query)) == expected assert list(backend.read(query=query, chunk_size=1)) == expected assert list(backend.read(query=query, chunk_size=1000)) == expected + backend.close() def test_backends_data_mongo_data_backend_write_method_with_target( @@ -480,6 +494,7 @@ def test_backends_data_mongo_data_backend_write_method_with_target( "_id": "62b9ce92fcde2b2edba56bf4", "_source": {"id": "bar", **timestamp}, } + backend.close() def test_backends_data_mongo_data_backend_write_method_without_target( @@ -502,6 +517,7 @@ def test_backends_data_mongo_data_backend_write_method_without_target( "_id": "62b9ce92fcde2b2edba56bf4", "_source": {"id": "bar", **timestamp}, } + backend.close() def test_backends_data_mongo_data_backend_write_method_with_duplicated_key_error( @@ -555,6 +571,7 @@ def test_backends_data_mongo_data_backend_write_method_with_duplicated_key_error logging.ERROR, exception_info.value.args[0], ) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_write_method_with_delete_operation( @@ -582,6 +599,7 @@ def test_backends_data_mongo_data_backend_write_method_with_delete_operation( binary_documents = [json.dumps(documents[2]).encode("utf8")] assert backend.write(binary_documents, operation_type=BaseOperationType.DELETE) == 1 assert not list(backend.read()) + backend.close() def test_backends_data_mongo_data_backend_write_method_with_delete_operation_failure( @@ -615,6 +633,7 @@ def test_backends_data_mongo_data_backend_write_method_with_delete_operation_fai ) assert ("ralph.backends.data.mongo", logging.WARNING, msg) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_write_method_with_update_operation( @@ -651,6 +670,7 @@ def test_backends_data_mongo_data_backend_write_method_with_update_operation( "_id": "62b9ce922c26b46b68ffc68f", "_source": {"id": "foo", "new_field": "bar"}, } + backend.close() def test_backends_data_mongo_data_backend_write_method_with_update_operation_failure( @@ -708,6 +728,7 @@ def test_backends_data_mongo_data_backend_write_method_with_update_operation_fai logging.ERROR, exception_info.value.args[0], ) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_write_method_with_append_operation( @@ -723,6 +744,7 @@ def test_backends_data_mongo_data_backend_write_method_with_append_operation( backend.write(data=[], operation_type=BaseOperationType.APPEND) assert ("ralph.backends.data.mongo", logging.ERROR, msg) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_write_method_with_create_operation( @@ -741,6 +763,7 @@ def test_backends_data_mongo_data_backend_write_method_with_create_operation( results = backend.read() assert next(results)["_source"]["timestamp"] == documents[0]["timestamp"] assert next(results)["_source"]["timestamp"] == documents[1]["timestamp"] + backend.close() @pytest.mark.parametrize( @@ -777,6 +800,7 @@ def test_backends_data_mongo_data_backend_write_method_with_invalid_documents( assert backend.write([document], ignore_errors=True) == 0 assert ("ralph.backends.data.mongo", logging.WARNING, error) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_write_method_with_unparsable_documents( @@ -801,6 +825,7 @@ def test_backends_data_mongo_data_backend_write_method_with_unparsable_documents assert backend.write([b"not valid JSON!"], ignore_errors=True) == 0 assert ("ralph.backends.data.mongo", logging.WARNING, msg) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_write_method_with_no_data( @@ -813,6 +838,7 @@ def test_backends_data_mongo_data_backend_write_method_with_no_data( msg = "Data Iterator is empty; skipping write to target." assert ("ralph.backends.data.mongo", logging.INFO, msg) in caplog.record_tuples + backend.close() def test_backends_data_mongo_data_backend_write_method_with_custom_chunk_size( @@ -870,3 +896,32 @@ def test_backends_data_mongo_data_backend_write_method_with_custom_chunk_size( {"_id": "62b9ce92fcde2b2edba56bf4", "_source": {"id": "bar", **new_timestamp}}, {"_id": "62b9ce92baa5a0964d3320fb", "_source": {"id": "baz", **new_timestamp}}, ] + backend.close() + + +def test_backends_data_mongo_data_backend_close_method_with_failure( + mongo_backend, monkeypatch +): + """Test the `MongoDataBackend.close` method.""" + + backend = mongo_backend() + + def mock_connection_error(): + """Mongo client close mock that raises a connection error.""" + raise PyMongoError("", (Exception("Mocked connection error"),)) + + monkeypatch.setattr(backend.client, "close", mock_connection_error) + + with pytest.raises(BackendException, match="Failed to close MongoDB client"): + backend.close() + + +def test_backends_data_mongo_data_backend_close_method(mongo_backend): + """Test the `MongoDataBackend.close` method.""" + + backend = mongo_backend() + + # Still possible to connect to client after closing it, as it creates + # a new connection + backend.close() + assert backend.status() == DataBackendStatus.AWAY diff --git a/tests/backends/data/test_s3.py b/tests/backends/data/test_s3.py index d8bfc4a3a..67ac83953 100644 --- a/tests/backends/data/test_s3.py +++ b/tests/backends/data/test_s3.py @@ -72,12 +72,16 @@ def test_backends_data_s3_data_backend_status_method(s3_backend): # Regions outside of us-east-1 require the appropriate LocationConstraint s3_client = boto3.client("s3", region_name="us-east-1") - assert s3_backend().status() == DataBackendStatus.ERROR + backend = s3_backend() + assert backend.status() == DataBackendStatus.ERROR + backend.close() bucket_name = "bucket_name" s3_client.create_bucket(Bucket=bucket_name) - assert s3_backend().status() == DataBackendStatus.OK + backend = s3_backend() + assert backend.status() == DataBackendStatus.OK + backend.close() @mock_s3 @@ -117,9 +121,9 @@ def test_backends_data_s3_data_backend_list_should_yield_archive_names( {"name": "2022-10-01.gz"}, ] - s3 = s3_backend() + backend = s3_backend() - s3.history.extend( + backend.history.extend( [ {"id": "bucket_name/2022-04-29.gz", "backend": "s3", "command": "read"}, {"id": "bucket_name/2022-04-30.gz", "backend": "s3", "command": "read"}, @@ -127,15 +131,16 @@ def test_backends_data_s3_data_backend_list_should_yield_archive_names( ) try: - response_list = s3.list() - response_list_new = s3.list(new=True) - response_list_details = s3.list(details=True) + response_list = backend.list() + response_list_new = backend.list(new=True) + response_list_details = backend.list(details=True) except Exception: # pylint:disable=broad-except pytest.fail("S3 backend should not raise exception on successful list") assert list(response_list) == [x["name"] for x in listing] assert list(response_list_new) == ["2022-10-01.gz"] assert [x["Key"] for x in response_list_details] == [x["name"] for x in listing] + backend.close() @mock_s3 @@ -153,15 +158,16 @@ def test_backends_data_s3_list_on_empty_bucket_should_do_nothing( listing = [] - s3 = s3_backend() + backend = s3_backend() - s3.clean_history(lambda *_: True) + backend.clean_history(lambda *_: True) try: - response_list = s3.list() + response_list = backend.list() except Exception: # pylint:disable=broad-except pytest.fail("S3 backend should not raise exception on successful list") assert list(response_list) == [x["name"] for x in listing] + backend.close() @mock_s3 @@ -184,19 +190,19 @@ def test_backends_data_s3_list_with_failed_connection_should_log_the_error( Body=json.dumps({"id": "1", "foo": "bar"}), ) - s3 = s3_backend() + backend = s3_backend() - s3.clean_history(lambda *_: True) + backend.clean_history(lambda *_: True) msg = "Failed to list the bucket wrong_name: The specified bucket does not exist" with caplog.at_level(logging.ERROR): with pytest.raises(BackendException, match=msg): - next(s3.list(target="wrong_name")) + next(backend.list(target="wrong_name")) with pytest.raises(BackendException, match=msg): - next(s3.list(target="wrong_name", new=True)) + next(backend.list(target="wrong_name", new=True)) with pytest.raises(BackendException, match=msg): - next(s3.list(target="wrong_name", details=True)) + next(backend.list(target="wrong_name", details=True)) assert ( list( @@ -207,6 +213,7 @@ def test_backends_data_s3_list_with_failed_connection_should_log_the_error( ) == [("ralph.backends.data.s3", logging.ERROR, msg)] * 3 ) + backend.close() @mock_s3 @@ -242,11 +249,11 @@ def test_backends_data_s3_read_with_valid_name_should_write_to_history( freezed_now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() monkeypatch.setattr("ralph.backends.data.s3.now", lambda: freezed_now) - s3 = s3_backend() - s3.clean_history(lambda *_: True) + backend = s3_backend() + backend.clean_history(lambda *_: True) list( - s3.read( + backend.read( query="2022-09-29.gz", target=bucket_name, chunk_size=1000, @@ -260,10 +267,10 @@ def test_backends_data_s3_read_with_valid_name_should_write_to_history( "id": f"{bucket_name}/2022-09-29.gz", "size": len(raw_body), "timestamp": freezed_now, - } in s3.history + } in backend.history list( - s3.read( + backend.read( query="2022-09-30.gz", raw_output=False, ) @@ -275,7 +282,8 @@ def test_backends_data_s3_read_with_valid_name_should_write_to_history( "id": f"{bucket_name}/2022-09-30.gz", "size": len(json_body), "timestamp": freezed_now, - } in s3.history + } in backend.history + backend.close() @mock_s3 @@ -302,8 +310,8 @@ def test_backends_data_s3_read_with_invalid_output_should_log_the_error( with caplog.at_level(logging.ERROR): with pytest.raises(BackendException): - s3 = s3_backend() - list(s3.read(query="2022-09-29.gz", raw_output=False)) + backend = s3_backend() + list(backend.read(query="2022-09-29.gz", raw_output=False)) assert ( "ralph.backends.data.s3", @@ -311,7 +319,8 @@ def test_backends_data_s3_read_with_invalid_output_should_log_the_error( "Raised error: Expecting value: line 1 column 1 (char 0)", ) in caplog.record_tuples - s3.clean_history(lambda *_: True) + backend.clean_history(lambda *_: True) + backend.close() @mock_s3 @@ -339,8 +348,8 @@ def test_backends_data_s3_read_with_invalid_name_should_log_the_error( with caplog.at_level(logging.ERROR): with pytest.raises(BackendParameterException): - s3 = s3_backend() - list(s3.read(query=None, target=bucket_name)) + backend = s3_backend() + list(backend.read(query=None, target=bucket_name)) assert ( "ralph.backends.data.s3", @@ -348,7 +357,8 @@ def test_backends_data_s3_read_with_invalid_name_should_log_the_error( "Invalid query. The query should be a valid object name.", ) in caplog.record_tuples - s3.clean_history(lambda *_: True) + backend.clean_history(lambda *_: True) + backend.close() @mock_s3 @@ -376,9 +386,9 @@ def test_backends_data_s3_read_with_wrong_name_should_log_the_error( with caplog.at_level(logging.ERROR): with pytest.raises(BackendException): - s3 = s3_backend() - s3.clean_history(lambda *_: True) - list(s3.read(query="invalid_name.gz", target=bucket_name)) + backend = s3_backend() + backend.clean_history(lambda *_: True) + list(backend.read(query="invalid_name.gz", target=bucket_name)) assert ( "ralph.backends.data.s3", @@ -386,7 +396,8 @@ def test_backends_data_s3_read_with_wrong_name_should_log_the_error( "Failed to download invalid_name.gz: The specified key does not exist.", ) in caplog.record_tuples - assert s3.history == [] + assert backend.history == [] + backend.close() @mock_s3 @@ -418,17 +429,18 @@ def mock_read_raw(*args, **kwargs): with caplog.at_level(logging.ERROR): with pytest.raises(BackendException): - s3 = s3_backend() - monkeypatch.setattr(s3, "_read_raw", mock_read_raw) - s3.clean_history(lambda *_: True) - list(s3.read(query=object_name, target=bucket_name, raw_output=True)) + backend = s3_backend() + monkeypatch.setattr(backend, "_read_raw", mock_read_raw) + backend.clean_history(lambda *_: True) + list(backend.read(query=object_name, target=bucket_name, raw_output=True)) assert ( "ralph.backends.data.s3", logging.ERROR, f"Failed to read chunk from object {object_name}", ) in caplog.record_tuples - assert s3.history == [] + assert backend.history == [] + backend.close() @pytest.mark.parametrize( @@ -462,9 +474,9 @@ def test_backends_data_s3_write_method_with_parameter_error( with caplog.at_level(logging.ERROR): with pytest.raises(BackendException): - s3 = s3_backend() - s3.clean_history(lambda *_: True) - s3.write( + backend = s3_backend() + backend.clean_history(lambda *_: True) + backend.write( data=some_content, target=object_name, operation_type=operation_type ) @@ -474,7 +486,8 @@ def test_backends_data_s3_write_method_with_parameter_error( ) assert ("ralph.backends.data.s3", logging.ERROR, msg) in caplog.record_tuples - assert s3.history == [] + assert backend.history == [] + backend.close() @pytest.mark.parametrize( @@ -494,6 +507,7 @@ def test_backends_data_s3_data_backend_write_method_with_append_or_delete_operat match=f"{operation_type.name} operation_type is not allowed.", ): backend.write(data=[b"foo"], operation_type=operation_type) + backend.close() @pytest.mark.parametrize( @@ -520,10 +534,10 @@ def test_backends_data_s3_write_method_with_create_index_operation( object_name = "new-archive.gz" some_content = b"some contents in the stream file to upload" data = [some_content, some_content, some_content] - s3 = s3_backend() - s3.clean_history(lambda *_: True) + backend = s3_backend() + backend.clean_history(lambda *_: True) - response = s3.write( + response = backend.write( data=data, target=object_name, operation_type=operation_type, @@ -537,13 +551,13 @@ def test_backends_data_s3_write_method_with_create_index_operation( "id": f"{bucket_name}/{object_name}", "size": len(some_content) * 3, "timestamp": freezed_now, - } in s3.history + } in backend.history object_name = "new-archive2.gz" other_content = {"some": "content"} data = [other_content, other_content] - response = s3.write( + response = backend.write( data=data, target=object_name, operation_type=operation_type, @@ -557,9 +571,9 @@ def test_backends_data_s3_write_method_with_create_index_operation( "id": f"{bucket_name}/{object_name}", "size": len(bytes(f"{json.dumps(other_content)}\n", encoding="utf8")) * 2, "timestamp": freezed_now, - } in s3.history + } in backend.history - assert list(s3.read(query=object_name, raw_output=False)) == data + assert list(backend.read(query=object_name, raw_output=False)) == data object_name = "new-archive3.gz" date = datetime.datetime(2023, 6, 30, 8, 42, 15, 554892) @@ -571,7 +585,7 @@ def test_backends_data_s3_write_method_with_create_index_operation( with caplog.at_level(logging.ERROR): # Without ignoring error with pytest.raises(BackendException, match=error): - response = s3.write( + response = backend.write( data=data, target=object_name, operation_type=operation_type, @@ -579,7 +593,7 @@ def test_backends_data_s3_write_method_with_create_index_operation( ) # Ignoring error - response = s3.write( + response = backend.write( data=data, target=object_name, operation_type=operation_type, @@ -601,6 +615,7 @@ def test_backends_data_s3_write_method_with_create_index_operation( ] * 2 ) + backend.close() @mock_s3 @@ -618,13 +633,14 @@ def test_backends_data_s3_write_method_with_no_data_should_skip( object_name = "new-archive.gz" - s3 = s3_backend() - response = s3.write( + backend = s3_backend() + response = backend.write( data=[], target=object_name, operation_type=BaseOperationType.CREATE, ) assert response == 0 + backend.close() @mock_s3 @@ -647,12 +663,47 @@ def test_backends_data_s3_write_method_with_failure_should_log_the_error( def raise_client_error(*args, **kwargs): raise ClientError({"Error": {}}, "error") - s3 = s3_backend() - s3.client.put_object = raise_client_error + backend = s3_backend() + backend.client.put_object = raise_client_error with pytest.raises(BackendException, match=error): - s3.write( + backend.write( data=[body], target=object_name, operation_type=BaseOperationType.CREATE, ) + backend.close() + + +def test_backends_data_s3_data_backend_close_method_with_failure( + s3_backend, monkeypatch +): + """Test the `S3DataBackend.close` method.""" + + backend = s3_backend() + + def mock_connection_error(): + """S3 backend client close mock that raises a connection error.""" + raise ClientError({"Error": {}}, "error") + + monkeypatch.setattr(backend.client, "close", mock_connection_error) + + with pytest.raises(BackendException, match="Failed to close S3 backend client"): + backend.close() + + +@mock_s3 +def test_backends_data_s3_data_backend_close_method(s3_backend, caplog): + """Test the `S3DataBackend.close` method.""" + + # No client instantiated + backend = s3_backend() + backend._client = None # pylint: disable=protected-access + with caplog.at_level(logging.WARNING): + backend.close() + + assert ( + "ralph.backends.data.s3", + logging.WARNING, + "No backend client to close.", + ) in caplog.record_tuples diff --git a/tests/backends/data/test_swift.py b/tests/backends/data/test_swift.py index c37fb6045..f0f8fa67b 100644 --- a/tests/backends/data/test_swift.py +++ b/tests/backends/data/test_swift.py @@ -51,6 +51,7 @@ def test_backends_data_swift_data_backend_default_instantiation(monkeypatch, fs) assert backend.options["user_domain_name"] == "Default" assert backend.default_container is None assert backend.locale_encoding == "utf8" + backend.close() def test_backends_data_swift_data_backend_instantiation_with_settings(fs): @@ -85,6 +86,7 @@ def test_backends_data_swift_data_backend_instantiation_with_settings(fs): SwiftDataBackend(settings_) except Exception as err: # pylint:disable=broad-except pytest.fail(f"Two SwiftDataBackends should not raise exceptions: {err}") + backend.close() def test_backends_data_swift_data_backend_status_method_with_error_status( @@ -101,17 +103,18 @@ def mock_failed_head_account(*args, **kwargs): # pylint:disable=unused-argument raise ClientException(error) - swift = swift_backend() - monkeypatch.setattr(swift.connection, "head_account", mock_failed_head_account) + backend = swift_backend() + monkeypatch.setattr(backend.connection, "head_account", mock_failed_head_account) with caplog.at_level(logging.ERROR): - assert swift.status() == DataBackendStatus.ERROR + assert backend.status() == DataBackendStatus.ERROR assert ( "ralph.backends.data.swift", logging.ERROR, f"Unable to connect to the Swift account: {error}", ) in caplog.record_tuples + backend.close() def test_backends_data_swift_data_backend_status_method_with_ok_status( @@ -124,13 +127,16 @@ def test_backends_data_swift_data_backend_status_method_with_ok_status( def mock_successful_head_account(*args, **kwargs): # pylint:disable=unused-argument return 1 - swift = swift_backend() - monkeypatch.setattr(swift.connection, "head_account", mock_successful_head_account) + backend = swift_backend() + monkeypatch.setattr( + backend.connection, "head_account", mock_successful_head_account + ) with caplog.at_level(logging.ERROR): - assert swift.status() == DataBackendStatus.OK + assert backend.status() == DataBackendStatus.OK assert caplog.record_tuples == [] + backend.close() def test_backends_data_swift_data_backend_list_method( @@ -188,6 +194,7 @@ def mock_head_object(container, obj): # pylint:disable=unused-argument assert list(backend.list()) == [x["name"] for x in listing] assert list(backend.list(new=True)) == ["2020-05-01.gz"] assert list(backend.list(details=True)) == listing + backend.close() def test_backends_data_swift_data_backend_list_with_failed_details( @@ -226,6 +233,7 @@ def mock_head_object(*args, **kwargs): # pylint:disable=unused-argument next(backend.list(details=True)) assert ("ralph.backends.data.swift", logging.ERROR, msg) in caplog.record_tuples + backend.close() def test_backends_data_swift_data_backend_list_with_failed_connection( @@ -254,6 +262,7 @@ def mock_get_container(*args, **kwargs): # pylint:disable=unused-argument next(backend.list(details=True)) assert ("ralph.backends.data.swift", logging.ERROR, msg) in caplog.record_tuples + backend.close() def test_backends_data_swift_data_backend_read_method_with_raw_output( @@ -314,6 +323,7 @@ def mock_get_object(*args, **kwargs): # pylint:disable=unused-argument "timestamp": frozen_now, }, ] + backend.close() def test_backends_data_swift_data_backend_read_method_without_raw_output( @@ -352,6 +362,7 @@ def mock_get_object(*args, **kwargs): # pylint:disable=unused-argument "timestamp": frozen_now, } ] + backend.close() def test_backends_data_swift_data_backend_read_method_with_invalid_query(swift_backend): @@ -363,6 +374,7 @@ def test_backends_data_swift_data_backend_read_method_with_invalid_query(swift_b error = "Invalid query. The query should be a valid archive name" with pytest.raises(BackendParameterException, match=error): list(backend.read()) + backend.close() def test_backends_data_swift_data_backend_read_method_with_ignore_errors( @@ -409,6 +421,7 @@ def mock_get_object_2(*args, **kwargs): # pylint:disable=unused-argument result = backend.read(ignore_errors=True, query="2020-06-02.gz") assert isinstance(result, Iterable) assert list(result) == [valid_dictionary] + backend.close() def test_backends_data_swift_data_backend_read_method_without_ignore_errors( @@ -465,6 +478,7 @@ def mock_get_object_2(*args, **kwargs): # pylint:disable=unused-argument assert isinstance(result, Iterable) with pytest.raises(BackendException, match="Raised error:"): next(result) + backend.close() def test_backends_data_swift_data_backend_read_method_with_failed_connection( @@ -488,6 +502,7 @@ def mock_failed_get_object(*args, **kwargs): # pylint:disable=unused-argument next(result) assert ("ralph.backends.data.swift", logging.ERROR, msg) in caplog.record_tuples + backend.close() @pytest.mark.parametrize( @@ -521,6 +536,7 @@ def mock_get_container(*args, **kwargs): # pylint:disable=unused-argument # When the `write` method fails, then no entry should be added to the history. assert not sorted(backend.history, key=itemgetter("id")) + backend.close() def test_backends_data_swift_data_backend_write_method_with_failed_connection( @@ -553,6 +569,7 @@ def mock_head_object(*args, **kwargs): # pylint:disable=unused-argument # When the `write` method fails, then no entry should be added to the history. assert not sorted(backend.history, key=itemgetter("id")) + backend.close() @pytest.mark.parametrize( @@ -582,6 +599,7 @@ def test_backends_data_swift_data_backend_write_method_with_invalid_operation( # When the `write` method fails, then no entry should be added to the history. assert not sorted(backend.history, key=itemgetter("id")) + backend.close() def test_backends_data_swift_data_backend_write_method_without_target( @@ -638,3 +656,43 @@ def mock_head_object(*args, **kwargs): # pylint:disable=unused-argument "timestamp": frozen_now, } ] + backend.close() + + +def test_backends_data_swift_data_backend_close_method_with_failure( + swift_backend, monkeypatch +): + """Test the `SwiftDataBackend.close` method.""" + + backend = swift_backend() + + def mock_connection_error(): + """Swift backend connection close mock that raises a connection error.""" + raise ClientException({"Error": {}}, "error") + + monkeypatch.setattr(backend.connection, "close", mock_connection_error) + + with pytest.raises(BackendException, match="Failed to close Swift backend client"): + backend.close() + + +def test_backends_data_swift_data_backend_close_method(swift_backend, caplog): + """Test the `SwiftDataBackend.close` method.""" + + backend = swift_backend() + + # Not possible to connect to client after closing it + backend.close() + assert backend.status() == DataBackendStatus.ERROR + + # No client instantiated + backend = swift_backend() + backend._connection = None # pylint: disable=protected-access + with caplog.at_level(logging.WARNING): + backend.close() + + assert ( + "ralph.backends.data.swift", + logging.WARNING, + "No backend client to close.", + ) in caplog.record_tuples diff --git a/tests/backends/lrs/test_clickhouse.py b/tests/backends/lrs/test_clickhouse.py index d5bd79e9f..7b44246ac 100644 --- a/tests/backends/lrs/test_clickhouse.py +++ b/tests/backends/lrs/test_clickhouse.py @@ -218,6 +218,7 @@ def mock_read(query, target, ignore_errors): monkeypatch.setattr(backend, "read", mock_read) backend.query_statements(StatementParameters(**params)) + backend.close() def test_backends_lrs_clickhouse_lrs_backend_query_statements( @@ -251,6 +252,7 @@ def test_backends_lrs_clickhouse_lrs_backend_query_statements( StatementParameters(statementId=test_id, limit=10) ) assert result.statements == statements + backend.close() def test_backends_lrs_clickhouse_lrs_backend__find(clickhouse, clickhouse_lrs_backend): @@ -279,6 +281,7 @@ def test_backends_lrs_clickhouse_lrs_backend__find(clickhouse, clickhouse_lrs_ba # Check the expected search query results. result = backend.query_statements(StatementParameters()) assert result.statements == statements + backend.close() def test_backends_lrs_clickhouse_lrs_backend_query_statements_by_ids( @@ -310,6 +313,7 @@ def test_backends_lrs_clickhouse_lrs_backend_query_statements_by_ids( # Check the expected search query results. result = list(backend.query_statements_by_ids([test_id])) assert result[0]["event"] == statements[0] + backend.close() def test_backends_lrs_clickhouse_lrs_backend_query_statements_client_failure( @@ -338,6 +342,7 @@ def mock_query(*args, **kwargs): logging.ERROR, "Failed to read from ClickHouse", ) in caplog.record_tuples + backend.close() def test_backends_lrs_clickhouse_lrs_backend_query_statements_by_ids_client_failure( @@ -366,3 +371,4 @@ def mock_query(*args, **kwargs): logging.ERROR, "Failed to read from ClickHouse", ) in caplog.record_tuples + backend.close() diff --git a/tests/backends/lrs/test_es.py b/tests/backends/lrs/test_es.py index 89dbf5b45..91bb56f31 100644 --- a/tests/backends/lrs/test_es.py +++ b/tests/backends/lrs/test_es.py @@ -280,6 +280,8 @@ def mock_read(query, chunk_size): assert result.pit_id == "foo_pit_id" assert result.search_after == "bar_search_after|baz_search_after" + backend.close() + def test_backends_lrs_es_lrs_backend_query_statements(es, es_lrs_backend): """Test the `ESLRSBackend.query_statements` method, given a query, @@ -297,6 +299,8 @@ def test_backends_lrs_es_lrs_backend_query_statements(es, es_lrs_backend): assert result.statements == documents assert re.match(r"[0-9]+\|0", result.search_after) + backend.close() + def test_backends_lrs_es_lrs_backend_query_statements_with_search_query_failure( es, es_lrs_backend, monkeypatch, caplog @@ -324,6 +328,8 @@ def mock_read(**_): "Failed to read from Elasticsearch", ) in caplog.record_tuples + backend.close() + def test_backends_lrs_es_lrs_backend_query_statements_by_ids_with_search_query_failure( es, es_lrs_backend, monkeypatch, caplog @@ -351,6 +357,8 @@ def mock_search(**_): "Failed to read from Elasticsearch", ) in caplog.record_tuples + backend.close() + def test_backends_lrs_es_lrs_backend_query_statements_by_ids_with_multiple_indexes( es, es_forwarding, es_lrs_backend @@ -387,3 +395,6 @@ def test_backends_lrs_es_lrs_backend_query_statements_by_ids_with_multiple_index assert not list(backend_1.query_statements_by_ids(["2"])) assert not list(backend_2.query_statements_by_ids(["1"])) assert list(backend_2.query_statements_by_ids(["2"])) == [index_2_document] + + backend_1.close() + backend_2.close() diff --git a/tests/backends/lrs/test_mongo.py b/tests/backends/lrs/test_mongo.py index 2effe53d2..85edc3f0d 100644 --- a/tests/backends/lrs/test_mongo.py +++ b/tests/backends/lrs/test_mongo.py @@ -242,6 +242,7 @@ def mock_read(query, chunk_size): assert result.statements == [{}] assert not result.pit_id assert result.search_after == "search_after_id" + backend.close() def test_backends_lrs_mongo_lrs_backend_query_statements_with_success( @@ -285,6 +286,7 @@ def test_backends_lrs_mongo_lrs_backend_query_statements_with_success( assert statement_query_result.statements == [ {"id": "62b9ce922c26b46b68ffc68f", **timestamp, **meta} ] + backend.close() def test_backends_lrs_mongo_lrs_backend_query_statements_with_query_failure( @@ -314,6 +316,7 @@ def mock_read(**_): logging.ERROR, "Failed to read from MongoDB", ) in caplog.record_tuples + backend.close() def test_backends_lrs_mongo_lrs_backend_query_statements_by_ids_with_query_failure( @@ -343,6 +346,7 @@ def mock_read(**_): logging.ERROR, "Failed to read from MongoDB", ) in caplog.record_tuples + backend.close() def test_backends_lrs_mongo_lrs_backend_query_statements_by_ids_with_two_collections( @@ -368,3 +372,5 @@ def test_backends_lrs_mongo_lrs_backend_query_statements_by_ids_with_two_collect assert not list(backend_1.query_statements_by_ids(["2"])) assert not list(backend_2.query_statements_by_ids(["1"])) assert list(backend_2.query_statements_by_ids(["2"])) == [{"id": "2", **timestamp}] + backend_1.close() + backend_2.close()