From bcc25674c6918d059f3f1142b69bbed87950abc3 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard <126242332+bisgaard-itis@users.noreply.github.com> Date: Mon, 11 Nov 2024 15:32:25 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Ensure=20chunk=20file=20download?= =?UTF-8?q?=20(#206)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- clients/python/requirements/e2e-test.txt | 1 + clients/python/setup.py | 1 + clients/python/src/osparc/_api_files_api.py | 69 +++++++++++++----- clients/python/src/osparc/_http_client.py | 42 ++++++++++- clients/python/src/osparc/_utils.py | 16 ++--- clients/python/test/e2e/_utils.py | 14 ---- clients/python/test/e2e/conftest.py | 38 +++++----- clients/python/test/e2e/test_files_api.py | 70 ++++++++++++++----- clients/python/test/e2e/test_solvers_api.py | 3 +- .../test_osparc/test_async_http_client.py | 4 +- 10 files changed, 174 insertions(+), 84 deletions(-) diff --git a/clients/python/requirements/e2e-test.txt b/clients/python/requirements/e2e-test.txt index e2c13d97..d19e26e0 100644 --- a/clients/python/requirements/e2e-test.txt +++ b/clients/python/requirements/e2e-test.txt @@ -5,6 +5,7 @@ ipykernel ipython jinja2 matplotlib +memory_profiler packaging pandas papermill diff --git a/clients/python/setup.py b/clients/python/setup.py index 28e09da5..36499be2 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -30,6 +30,7 @@ "tenacity", "tqdm>=4.48.0", f"osparc_client=={VERSION}", + "aiofiles", ] SETUP = dict( diff --git a/clients/python/src/osparc/_api_files_api.py b/clients/python/src/osparc/_api_files_api.py index 88bdea55..96be898c 100644 --- a/clients/python/src/osparc/_api_files_api.py +++ b/clients/python/src/osparc/_api_files_api.py @@ -4,9 +4,6 @@ import json import logging import math -import random -import shutil -import string from pathlib import Path from typing import Any, Iterator, List, Optional, Tuple, Union @@ -28,6 +25,10 @@ FileUploadData, UploadedPart, ) +from urllib.parse import urljoin +import aiofiles +from tempfile import NamedTemporaryFile +import shutil from ._utils import ( DEFAULT_TIMEOUT_SECONDS, PaginationGenerator, @@ -65,25 +66,57 @@ def __getattr__(self, name: str) -> Any: return super().__getattribute__(name) def download_file( - self, file_id: str, *, destination_folder: Optional[Path] = None, **kwargs + self, + file_id: str, + *, + destination_folder: Optional[Path] = None, + timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, + **kwargs, + ) -> str: + return asyncio.run( + self.download_file_async( + file_id=file_id, + destination_folder=destination_folder, + timeout_seconds=timeout_seconds, + **kwargs, + ) + ) + + async def download_file_async( + self, + file_id: str, + *, + destination_folder: Optional[Path] = None, + timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, + **kwargs, ) -> str: if destination_folder is not None and not destination_folder.is_dir(): raise RuntimeError( f"destination_folder: {destination_folder} must be a directory" ) - downloaded_file: Path = Path(super().download_file(file_id, **kwargs)) - if destination_folder is not None: - dest_file: Path = destination_folder / downloaded_file.name - while dest_file.is_file(): - new_name = ( - downloaded_file.stem - + "".join(random.choices(string.ascii_letters, k=8)) - + downloaded_file.suffix + async with aiofiles.tempfile.NamedTemporaryFile( + mode="wb", + delete=False, + ) as downloaded_file: + async with AsyncHttpClient( + configuration=self.api_client.configuration, timeout=timeout_seconds + ) as session: + url = urljoin( + self.api_client.configuration.host, f"/v0/files/{file_id}/content" ) - dest_file = destination_folder / new_name - shutil.move(downloaded_file, dest_file) - downloaded_file = dest_file - return str(downloaded_file.resolve()) + async for response in await session.stream( + "GET", url=url, auth=self._auth, follow_redirects=True + ): + response.raise_for_status() + async for chunk in response.aiter_bytes(): + await downloaded_file.write(chunk) + dest_file = f"{downloaded_file.name}" + if destination_folder is not None: + dest_file = NamedTemporaryFile(dir=destination_folder, delete=False).name + shutil.move( + f"{downloaded_file.name}", dest_file + ) # aiofiles doesnt seem to have an async variant of this + return dest_file def upload_file( self, @@ -105,7 +138,7 @@ async def upload_file_async( file = Path(file) if not file.is_file(): raise RuntimeError(f"{file} is not a file") - checksum: str = compute_sha256(file) + checksum: str = await compute_sha256(file) for file_result in self._search_files( sha256_checksum=checksum, timeout_seconds=timeout_seconds ): @@ -159,7 +192,7 @@ async def upload_file_async( ) async with AsyncHttpClient( configuration=self.api_client.configuration, - request_type="post", + method="post", url=links.abort_upload, body=abort_body.to_dict(), base_url=self.api_client.configuration.host, diff --git a/clients/python/src/osparc/_http_client.py b/clients/python/src/osparc/_http_client.py index 2aa8bc65..63a3d432 100644 --- a/clients/python/src/osparc/_http_client.py +++ b/clients/python/src/osparc/_http_client.py @@ -1,7 +1,16 @@ from contextlib import suppress from datetime import datetime from email.utils import parsedate_to_datetime -from typing import Any, Awaitable, Callable, Dict, Optional, Set +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Optional, + Set, + Literal, + AsyncGenerator, +) import httpx import tenacity @@ -17,14 +26,14 @@ def __init__( self, *, configuration: Configuration, - request_type: Optional[str] = None, + method: Optional[str] = None, url: Optional[str] = None, body: Optional[Dict] = None, **httpx_async_client_kwargs, ): self.configuration = configuration self._client = httpx.AsyncClient(**httpx_async_client_kwargs) - self._callback = getattr(self._client, request_type) if request_type else None + self._callback = getattr(self._client, method) if method else None self._url = url self._body = body if self._callback is not None: @@ -77,6 +86,28 @@ async def _(): return await _() + async def _stream( + self, method: Literal["GET"], url: str, *args, **kwargs + ) -> AsyncGenerator[httpx.Response, None]: + n_attempts = self.configuration.retries.total + assert isinstance(n_attempts, int) + + @tenacity.retry( + reraise=True, + wait=self._wait_callback, + stop=tenacity.stop_after_attempt(n_attempts), + retry=tenacity.retry_if_exception_type(httpx.HTTPStatusError), + ) + async def _() -> AsyncGenerator[httpx.Response, None]: + async with self._client.stream( + method=method, url=url, *args, **kwargs + ) as response: + if response.status_code in self.configuration.retries.status_forcelist: + response.raise_for_status() + yield response + + return _() + async def put(self, *args, **kwargs) -> httpx.Response: return await self._request(self._client.put, *args, **kwargs) @@ -92,6 +123,11 @@ async def patch(self, *args, **kwargs) -> httpx.Response: async def get(self, *args, **kwargs) -> httpx.Response: return await self._request(self._client.get, *args, **kwargs) + async def stream( + self, method: Literal["GET"], url: str, *args, **kwargs + ) -> AsyncGenerator[httpx.Response, None]: + return await self._stream(method=method, url=url, *args, **kwargs) + def _wait_callback(self, retry_state: tenacity.RetryCallState) -> int: assert retry_state.outcome is not None if retry_state.outcome and retry_state.outcome.exception(): diff --git a/clients/python/src/osparc/_utils.py b/clients/python/src/osparc/_utils.py index 5a012826..537f7118 100644 --- a/clients/python/src/osparc/_utils.py +++ b/clients/python/src/osparc/_utils.py @@ -16,7 +16,7 @@ Solver, Study, ) - +import aiofiles from ._exceptions import RequestError _KB = 1024 # in bytes @@ -87,15 +87,15 @@ async def file_chunk_generator( bytes_read: int = 0 file_size: int = file.stat().st_size while bytes_read < file_size: - with open(file, "rb") as f: - f.seek(bytes_read) + async with aiofiles.open(file, "rb") as f: + await f.seek(bytes_read) nbytes = ( chunk_size if (bytes_read + chunk_size <= file_size) else (file_size - bytes_read) ) assert nbytes > 0 - chunk = await asyncio.get_event_loop().run_in_executor(None, f.read, nbytes) + chunk = await f.read(nbytes) yield chunk, nbytes bytes_read += nbytes @@ -109,16 +109,16 @@ async def _fcn_to_coro(callback: Callable[..., S], *args) -> S: return result -def compute_sha256(file: Path) -> str: +async def compute_sha256(file: Path) -> str: assert file.is_file() sha256 = hashlib.sha256() - with open(file, "rb") as f: + async with aiofiles.open(file, "rb") as f: while True: - data = f.read(100 * _KB) + data = await f.read(100 * _KB) if not data: break sha256.update(data) - return sha256.hexdigest() + return sha256.hexdigest() def dev_features_enabled() -> bool: diff --git a/clients/python/test/e2e/_utils.py b/clients/python/test/e2e/_utils.py index 83026596..2fc7c516 100644 --- a/clients/python/test/e2e/_utils.py +++ b/clients/python/test/e2e/_utils.py @@ -24,20 +24,6 @@ def repo_version() -> Version: return Version(version_file.read_text()) -def skip_if_no_dev_features(test): - if ( - Version(osparc.__version__) < repo_version() - or not osparc_dev_features_enabled() - ): - return pytest.mark.skip( - ( - f"{osparc.__version__=}<{str(repo_version)} " - f"or {osparc_dev_features_enabled()=}" - ) - )(test) - return test - - def skip_if_osparc_version( *, at_least: Optional[Version] = None, diff --git a/clients/python/test/e2e/conftest.py b/clients/python/test/e2e/conftest.py index 738146f8..e00da8cb 100644 --- a/clients/python/test/e2e/conftest.py +++ b/clients/python/test/e2e/conftest.py @@ -17,6 +17,7 @@ from numpy import random from packaging.version import Version from pydantic import ByteSize +from typing import Callable try: from osparc._settings import ConfigurationEnvVars @@ -24,11 +25,6 @@ pass -_KB: ByteSize = ByteSize(1024) # in bytes -_MB: ByteSize = ByteSize(_KB * 1024) # in bytes -_GB: ByteSize = ByteSize(_MB * 1024) # in bytes - - # Dictionary to store start times of tests _test_start_times = {} @@ -133,20 +129,24 @@ def async_client() -> Iterable[AsyncClient]: @pytest.fixture -def tmp_file(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> Path: - caplog.set_level(logging.INFO) - byte_size: ByteSize = 1 * _GB - tmp_file = tmp_path / "large_test_file.txt" - ss: random.SeedSequence = random.SeedSequence() - logging.info("Entropy used to generate random file: %s", f"{ss.entropy}") - rng: random.Generator = random.default_rng(ss) - tmp_file.write_bytes(rng.bytes(1000)) - with open(tmp_file, "wb") as f: - f.truncate(byte_size) - assert ( - tmp_file.stat().st_size == byte_size - ), f"Could not create file of size: {byte_size}" - return tmp_file +def create_tmp_file( + tmp_path: Path, caplog: pytest.LogCaptureFixture +) -> Callable[[ByteSize], Path]: + def _generate_file(file_size: ByteSize): + caplog.set_level(logging.INFO) + tmp_file = tmp_path / "large_test_file.txt" + ss: random.SeedSequence = random.SeedSequence() + logging.info("Entropy used to generate random file: %s", f"{ss.entropy}") + rng: random.Generator = random.default_rng(ss) + tmp_file.write_bytes(rng.bytes(1000)) + with open(tmp_file, "wb") as f: + f.truncate(file_size) + assert ( + tmp_file.stat().st_size == file_size + ), f"Could not create file of size: {file_size}" + return tmp_file + + return _generate_file @pytest.fixture diff --git a/clients/python/test/e2e/test_files_api.py b/clients/python/test/e2e/test_files_api.py index 57736dd3..b52221a8 100644 --- a/clients/python/test/e2e/test_files_api.py +++ b/clients/python/test/e2e/test_files_api.py @@ -9,8 +9,13 @@ import osparc import pytest -from _utils import skip_if_no_dev_features -from conftest import _KB +from memory_profiler import memory_usage +from typing import Final, List, Callable +from pydantic import ByteSize + +_KB: ByteSize = ByteSize(1024) # in bytes +_MB: ByteSize = ByteSize(_KB * 1024) # in bytes +_GB: ByteSize = ByteSize(_MB * 1024) # in bytes def _hash_file(file: Path) -> str: @@ -25,30 +30,59 @@ def _hash_file(file: Path) -> str: return sha256.hexdigest() -@skip_if_no_dev_features -def test_upload_file(tmp_file: Path, api_client: osparc.ApiClient) -> None: - """Test that we can upload a file via the multipart upload""" +def test_upload_file( + create_tmp_file: Callable[[ByteSize], Path], api_client: osparc.ApiClient +) -> None: + """Test that we can upload a file via the multipart upload and download it again. Also check RAM usage of upload/download fcns""" + _allowed_ram_usage_in_mb: Final[int] = 300 # 300MB + tmp_file = create_tmp_file(ByteSize(1 * _GB)) + assert ( + tmp_file.stat().st_size > _allowed_ram_usage_in_mb * 1024 * 1024 + ), "For this test to make sense, file size must be larger than allowed ram usage." + + def max_diff(data: List[int]) -> int: + return max(data) - min(data) + tmp_path: Path = tmp_file.parent files_api: osparc.FilesApi = osparc.FilesApi(api_client=api_client) - uploaded_file1: osparc.File = files_api.upload_file(tmp_file) - uploaded_file2: osparc.File = files_api.upload_file(tmp_file) - assert ( - uploaded_file1.id == uploaded_file2.id - ), "could not detect that file was already on server" - downloaded_file = files_api.download_file( - uploaded_file1.id, destination_folder=tmp_path - ) - assert Path(downloaded_file).parent == tmp_path - assert _hash_file(Path(downloaded_file)) == _hash_file(tmp_file) - files_api.delete_file(uploaded_file1.id) + try: + upload_ram_usage_in_mb, uploaded_file1 = memory_usage( + (files_api.upload_file, (tmp_file,)), # type: ignore + retval=True, + ) + assert ( + max_diff(upload_ram_usage_in_mb) < _allowed_ram_usage_in_mb + ), f"Used more than {_allowed_ram_usage_in_mb=} to upload file of size {create_tmp_file.stat().st_size=}" + uploaded_file2: osparc.File = files_api.upload_file(tmp_file) + assert ( + uploaded_file1.id == uploaded_file2.id + ), "could not detect that file was already on server" + download_ram_usage_in_mb, downloaded_file = memory_usage( + ( + files_api.download_file, + (uploaded_file1.id,), + {"destination_folder": tmp_path}, + ), # type: ignore + retval=True, + ) + assert ( + max_diff(download_ram_usage_in_mb) < _allowed_ram_usage_in_mb + ), f"Used more than {_allowed_ram_usage_in_mb=} to down file of size {Path(downloaded_file).stat().st_size=}" + assert Path(downloaded_file).parent == tmp_path + assert _hash_file(Path(downloaded_file)) == _hash_file(tmp_file) + finally: + files_api.delete_file(uploaded_file1.id) -@skip_if_no_dev_features @pytest.mark.parametrize("use_checksum", [True, False]) @pytest.mark.parametrize("use_id", [True, False]) def test_search_files( - tmp_file: Path, api_client: osparc.ApiClient, use_checksum: bool, use_id: bool + create_tmp_file: Callable[[ByteSize], Path], + api_client: osparc.ApiClient, + use_checksum: bool, + use_id: bool, ) -> None: + tmp_file = create_tmp_file(ByteSize(1 * _GB)) checksum: str = _hash_file(tmp_file) results: osparc.PaginationGenerator files_api: osparc.FilesApi = osparc.FilesApi(api_client=api_client) diff --git a/clients/python/test/e2e/test_solvers_api.py b/clients/python/test/e2e/test_solvers_api.py index e0181732..41a5439d 100644 --- a/clients/python/test/e2e/test_solvers_api.py +++ b/clients/python/test/e2e/test_solvers_api.py @@ -7,14 +7,13 @@ import json import osparc -from _utils import skip_if_no_dev_features, skip_if_osparc_version +from _utils import skip_if_osparc_version from httpx import AsyncClient from packaging.version import Version DEFAULT_TIMEOUT_SECONDS = 10 * 60 # 10 min -@skip_if_no_dev_features def test_jobs(api_client: osparc.ApiClient, sleeper: osparc.Solver): """Test the jobs method diff --git a/clients/python/test/test_osparc/test_async_http_client.py b/clients/python/test/test_osparc/test_async_http_client.py index ea247673..4f49e3a0 100644 --- a/clients/python/test/test_osparc/test_async_http_client.py +++ b/clients/python/test/test_osparc/test_async_http_client.py @@ -29,7 +29,7 @@ class FakeRetryCallState: def test_retry_strategy(cfg: osparc.Configuration, fake_retry_state): async_client = AsyncHttpClient( configuration=cfg, - request_type="get", + method="get", url="79ae41cc-0d89-4714-ac9d-c23ee1b110ce", body={}, ) @@ -76,7 +76,7 @@ def _side_effect(request: httpx.Request): with pytest.raises(httpx.HTTPError): async with AsyncHttpClient( configuration=cfg, - request_type="post", + method="post", url=_exit_url, body=_body, ) as session: