Skip to content

Commit

Permalink
🏗️(backends) add close method to sync backends
Browse files Browse the repository at this point in the history
Synchronous backends such as Elasticsearch or Mongo need their connection to
be closed when finished. This commits adds an abstract method close to the
BaseDataBackend interface, and implements it for backends that need it.
  • Loading branch information
wilbrdt committed Sep 6, 2023
1 parent 9dca245 commit 713f133
Show file tree
Hide file tree
Showing 23 changed files with 557 additions and 71 deletions.
3 changes: 2 additions & 1 deletion src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,10 @@ async def close(self) -> None:
"""Close the AsyncElasticsearch client.
Raise:
BackendException: If a failure during the close operation occurs.
BackendException: If a failure occurs during the close operation.
"""
if not self._client:
logger.warning("No backend client to close.")
return

try:
Expand Down
2 changes: 1 addition & 1 deletion src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async def close(self) -> None:
"""Close the AsyncIOMotorClient client.
Raise:
BackendException: If a failure during the close operation occurs.
BackendException: If a failure occurs during the close operation.
"""
try:
self.client.close()
Expand Down
10 changes: 9 additions & 1 deletion src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ def write( # pylint: disable=too-many-arguments
BackendParameterException: If a backend argument value is not valid.
"""

@abstractmethod
def close(self) -> None:
"""Close the data backend client.
Raise:
BackendException: If a failure occurs during the close operation.
"""


class BaseAsyncDataBackend(ABC):
"""Base async data backend interface."""
Expand Down Expand Up @@ -381,5 +389,5 @@ async def close(self) -> None:
"""Close the data backend client.
Raise:
BackendException: If a failure during the close operation occurs.
BackendException: If a failure occurs during the close operation.
"""
17 changes: 17 additions & 0 deletions src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,23 @@ def write( # pylint: disable=too-many-arguments

return count

def close(self) -> None:
"""Close the ClickHouse backend client.
Raise:
BackendException: If a failure occurs during the close operation.
"""
if not self._client:
logger.warning("No backend client to close.")
return

try:
self.client.close()
except ClickHouseError as error:
msg = "Failed to close ClickHouse client: %s"
logger.error(msg, error)
raise BackendException(msg % error) from error

@staticmethod
def _to_insert_tuples(
data: Iterable[dict],
Expand Down
17 changes: 17 additions & 0 deletions src/ralph/backends/data/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,23 @@ def write( # pylint: disable=too-many-arguments
raise BackendException(msg % (error, details, count)) from error
return count

def close(self) -> None:
"""Close the Elasticsearch backend client.
Raise:
BackendException: If a failure occurs during the close operation.
"""
if not self._client:
logger.warning("No backend client to close.")
return

try:
self.client.close()
except TransportError as error:
msg = "Failed to close Elasticsearch client: %s"
logger.error(msg, error)
raise BackendException(msg % error) from error

@staticmethod
def to_documents(
data: Iterable[dict],
Expand Down
6 changes: 6 additions & 0 deletions src/ralph/backends/data/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ def write( # pylint: disable=too-many-arguments
)
return 1

def close(self) -> None:
"""FS backend has nothing to close, this method is not implemented."""
msg = "FS data backend does not support `close` method"
logger.error(msg)
raise NotImplementedError(msg)

@staticmethod
def _read_raw(file: IO, chunk_size: int, _ignore_errors: bool) -> Iterator[bytes]:
"""Read the `file` in chunks of size `chunk_size` and yield them."""
Expand Down
6 changes: 6 additions & 0 deletions src/ralph/backends/data/ldp.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ def write( # pylint: disable=too-many-arguments
logger.error(msg, target)
raise NotImplementedError(msg % target)

def close(self) -> None:
"""LDP client does not support close, this method is not implemented."""
msg = "LDP data backend does not support `close` method"
logger.error(msg)
raise NotImplementedError(msg)

def _get_archive_endpoint(self, stream_id: Union[None, str] = None) -> str:
"""Return OVH's archive endpoint."""
stream_id = stream_id if stream_id else self.stream_id
Expand Down
23 changes: 21 additions & 2 deletions src/ralph/backends/data/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from pydantic import Json, MongoDsn, constr
from pymongo import MongoClient, ReplaceOne
from pymongo.collection import Collection
from pymongo.errors import BulkWriteError, ConnectionFailure, InvalidName, PyMongoError
from pymongo.errors import (
BulkWriteError,
ConnectionFailure,
InvalidName,
InvalidOperation,
PyMongoError,
)

from ralph.backends.data.base import (
BaseDataBackend,
Expand Down Expand Up @@ -113,7 +119,7 @@ def status(self) -> DataBackendStatus:
# Check MongoDB connection.
try:
self.client.admin.command("ping")
except ConnectionFailure as error:
except (ConnectionFailure, InvalidOperation) as error:
logger.error("Failed to connect to MongoDB: %s", error)
return DataBackendStatus.AWAY

Expand Down Expand Up @@ -296,6 +302,19 @@ def write( # pylint: disable=too-many-arguments

return count

def close(self) -> None:
"""Close the MongoDB backend client.
Raise:
BackendException: If a failure occurs during the close operation.
"""
try:
self.client.close()
except PyMongoError as error:
msg = "Failed to close MongoDB client: %s"
logger.error(msg, error)
raise BackendException(msg % error) from error

@staticmethod
def iter_by_batch(data: Iterable[dict], chunk_size: int):
"""Iterate over `data` Iterable and yield batches of size `chunk_size`."""
Expand Down
24 changes: 21 additions & 3 deletions src/ralph/backends/data/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import (
ClientError,
EndpointConnectionError,
ParamValidationError,
ReadTimeoutError,
ResponseStreamingError,
Expand Down Expand Up @@ -102,7 +103,7 @@ def status(self) -> DataBackendStatus:
"""
try:
self.client.head_bucket(Bucket=self.default_bucket_name)
except ClientError:
except (ClientError, EndpointConnectionError):
return DataBackendStatus.ERROR

return DataBackendStatus.OK
Expand Down Expand Up @@ -197,7 +198,7 @@ def read(

try:
response = self.client.get_object(Bucket=target, Key=query.query_string)
except ClientError as err:
except (ClientError, EndpointConnectionError) as err:
error_msg = err.response["Error"]["Message"]
msg = "Failed to download %s: %s"
logger.error(msg, query.query_string, error_msg)
Expand Down Expand Up @@ -321,7 +322,7 @@ def write( # pylint: disable=too-many-arguments
Config=TransferConfig(multipart_chunksize=chunk_size),
)
response = self.client.head_object(Bucket=target_bucket, Key=target_object)
except (ClientError, ParamValidationError) as exc:
except (ClientError, ParamValidationError, EndpointConnectionError) as exc:
msg = "Failed to upload %s"
logger.error(msg, target)
raise BackendException(msg % target) from exc
Expand All @@ -340,6 +341,23 @@ def write( # pylint: disable=too-many-arguments

return counter["count"]

def close(self) -> None:
"""Close the S3 backend client.
Raise:
BackendException: If a failure occurs during the close operation.
"""
if not self._client:
logger.warning("No backend client to close.")
return

try:
self.client.close()
except ClientError as error:
msg = "Failed to close S3 backend client: %s"
logger.error(msg, error)
raise BackendException(msg % error) from error

@staticmethod
def _read_raw(
obj: StreamingBody, chunk_size: int, _ignore_errors: bool
Expand Down
17 changes: 17 additions & 0 deletions src/ralph/backends/data/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,23 @@ def write( # pylint: disable=too-many-arguments, disable=too-many-branches
)
return count

def close(self) -> None:
"""Close the Swift backend client.
Raise:
BackendException: If a failure occurs during the close operation.
"""
if not self._connection:
logger.warning("No backend client to close.")
return

try:
self.connection.close()
except ClientException as error:
msg = "Failed to close Swift backend client: %s"
logger.error(msg, error)
raise BackendException(msg % error) from error

def _details(self, container: str, name: str):
"""Return `name` object details from `container`."""
try:
Expand Down
20 changes: 19 additions & 1 deletion tests/backends/data/test_async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ async def test_backends_data_async_es_data_backend_write_method_with_datastream(


@pytest.mark.anyio
async def test_backends_data_es_data_backend_close_method(
async def test_backends_data_es_data_backend_close_method_with_failure(
async_es_backend, monkeypatch
):
"""Test the `AsyncESDataBackend.close` method."""
Expand All @@ -840,3 +840,21 @@ async def mock_connection_error():

with pytest.raises(BackendException, match="Failed to close Elasticsearch client"):
await backend.close()


@pytest.mark.anyio
async def test_backends_data_es_data_backend_close_method(async_es_backend, caplog):
"""Test the `AsyncESDataBackend.close` method."""

# No client instantiated
backend = async_es_backend()
await backend.close()
backend._client = None # pylint: disable=protected-access
with caplog.at_level(logging.WARNING):
await backend.close()

assert (
"ralph.backends.data.async_es",
logging.WARNING,
"No backend client to close.",
) in caplog.record_tuples
13 changes: 12 additions & 1 deletion tests/backends/data/test_async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ async def test_backends_data_async_mongo_data_backend_write_method_with_custom_c


@pytest.mark.anyio
async def test_backends_data_async_mongo_data_backend_close(
async def test_backends_data_async_mongo_data_backend_close_method_with_failure(
async_mongo_backend, monkeypatch, caplog
):
"""Test the `AsyncMongoDataBackend.close` method, given a failed close,
Expand Down Expand Up @@ -1084,3 +1084,14 @@ def close():
logging.ERROR,
"Failed to close AsyncIOMotorClient: Close failure",
) in caplog.record_tuples


@pytest.mark.anyio
async def test_backends_data_async_mongo_data_backend_close_method(async_mongo_backend):
"""Test the `AsyncMongoDataBackend.close` method."""

backend = async_mongo_backend()

# Not possible to connect to client after closing it
await backend.close()
assert await backend.status() == DataBackendStatus.AWAY
6 changes: 6 additions & 0 deletions tests/backends/data/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def list(self): # pylint: disable=arguments-differ,missing-function-docstring
def write(self): # pylint: disable=arguments-differ,missing-function-docstring
pass

def close(self): # pylint: disable=arguments-differ,missing-function-docstring
pass

MockBaseDataBackend().read(query=value)


Expand Down Expand Up @@ -80,6 +83,9 @@ def list(self): # pylint: disable=arguments-differ,missing-function-docstring
def write(self): # pylint: disable=arguments-differ,missing-function-docstring
pass

def close(self): # pylint: disable=arguments-differ,missing-function-docstring
pass

with pytest.raises(BackendParameterException, match=error):
with caplog.at_level(logging.ERROR):
MockBaseDataBackend().read(query=value)
Expand Down
Loading

0 comments on commit 713f133

Please sign in to comment.