diff --git a/changes/1637.fix.md b/changes/1637.fix.md new file mode 100644 index 0000000000..8b4771c79e --- /dev/null +++ b/changes/1637.fix.md @@ -0,0 +1 @@ +Update GPFS storage client's Quota API parameters and queries. diff --git a/src/ai/backend/storage/gpfs/__init__.py b/src/ai/backend/storage/gpfs/__init__.py index 656e3b8d70..96ce7e9025 100644 --- a/src/ai/backend/storage/gpfs/__init__.py +++ b/src/ai/backend/storage/gpfs/__init__.py @@ -63,7 +63,7 @@ async def describe_quota_scope(self, quota_scope_id: QuotaScopeID) -> Optional[Q return None quotas = await self.api_client.list_fileset_quotas(self.fs, quota_scope_id.pathname) - custom_defined_quotas = [q for q in quotas if not q.defaultQuota] + custom_defined_quotas = [q for q in quotas if not q.isDefaultQuota] if len(custom_defined_quotas) == 0: return QuotaUsage(-1, -1) quota_info = custom_defined_quotas[0] diff --git a/src/ai/backend/storage/gpfs/gpfs_client.py b/src/ai/backend/storage/gpfs/gpfs_client.py index 0afca270ff..7ac7aafcac 100644 --- a/src/ai/backend/storage/gpfs/gpfs_client.py +++ b/src/ai/backend/storage/gpfs/gpfs_client.py @@ -1,9 +1,10 @@ import contextlib +import json import logging import urllib.parse from pathlib import Path from ssl import SSLContext -from typing import Any, AsyncIterator, Dict, List, Mapping, Optional +from typing import Any, AsyncIterator, Callable, Coroutine, Dict, List, Mapping, Optional, TypeAlias import aiohttp from aiohttp import BasicAuth, web @@ -12,6 +13,7 @@ from ai.backend.common.logging import BraceStyleAdapter from ai.backend.common.types import BinarySize +from ..exception import ExternalError from .exceptions import ( GPFSAPIError, GPFSInvalidBodyError, @@ -33,6 +35,10 @@ log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined] +ResponseHandler: TypeAlias = Callable[ + [aiohttp.ClientResponse], Coroutine[None, None, aiohttp.ClientResponse] +] + def error_handler(inner): async def outer(*args, **kwargs): @@ -46,6 +52,24 @@ async def outer(*args, **kwargs): return outer +async def base_response_handler(response: aiohttp.ClientResponse) -> aiohttp.ClientResponse: + match response.status // 100: + case 2: + pass + case 4: + pass + case 5: + try: + data = await response.json() + msg_detail = str(data) + except json.decoder.JSONDecodeError: + msg_detail = "Unable to decode response body." + raise ExternalError( + f"GPFS API server error. (status code: {response.status}, detail: {msg_detail})" + ) + return response + + class GPFSAPIClient: api_endpoint: str username: str @@ -74,7 +98,10 @@ async def _build_request( method: str, path: str, body: Optional[Any] = None, + *, + err_handler: Optional[ResponseHandler] = None, ) -> aiohttp.ClientResponse: + response_handler = err_handler or base_response_handler match method: case "GET": func = sess.get @@ -90,11 +117,14 @@ async def _build_request( raise GPFSAPIError(f"Unsupported request method {method}") try: if method == "GET" or method == "DELETE": - return await func("/scalemgmt/v2" + path, headers=self._req_header, ssl=self.ssl) + response = await func( + "/scalemgmt/v2" + path, headers=self._req_header, ssl=self.ssl + ) else: - return await func( + response = await func( "/scalemgmt/v2" + path, headers=self._req_header, json=body, ssl=self.ssl ) + return await response_handler(response) except web.HTTPUnauthorized: raise GPFSUnauthorizedError @@ -179,11 +209,10 @@ async def list_quotas( self, fs_name: str, quota_type: GPFSQuotaType = GPFSQuotaType.FILESET ) -> List[GPFSQuota]: async with self._build_session() as sess: - query = urllib.parse.urlencode({"filter": f"entityType={quota_type}"}) response = await self._build_request( sess, "GET", - f"/filesystems/{fs_name}/quotas?{query}", + f"/filesystems/{fs_name}/quotas", ) data = await response.json() return [GPFSQuota.from_dict(quota_info) for quota_info in data["quotas"]] @@ -196,11 +225,10 @@ async def list_fileset_quotas( quota_type: GPFSQuotaType = GPFSQuotaType.FILESET, ) -> List[GPFSQuota]: async with self._build_session() as sess: - query = urllib.parse.urlencode({"filter": f"entityType={quota_type}"}) response = await self._build_request( sess, "GET", - f"/filesystems/{fs_name}/filesets/{fileset_name}/quotas?{query}", + f"/filesystems/{fs_name}/quotas?filter=objectName={fileset_name}", ) data = await response.json() log.debug("response: {}", data) @@ -254,9 +282,26 @@ async def create_fileset( body["permissions"] = permissions if path is not None: body["path"] = path.as_posix() + + async def handler(response: aiohttp.ClientResponse) -> aiohttp.ClientResponse: + match response.status: + case 200 | 201 | 202: + pass + case 409: + log.warning(f"GPFS fileset already exists. Skip create. (name: {fileset_name})") + case _: + raise ExternalError( + f"Cannot create GPFS fileset. status code: {response.status}" + ) + return response + async with self._build_session() as sess: response = await self._build_request( - sess, "POST", f"/filesystems/{fs_name}/filesets", body + sess, + "POST", + f"/filesystems/{fs_name}/filesets", + body, + err_handler=handler, ) data = await response.json() await self._wait_for_job_done([GPFSJob.from_dict(x) for x in data["jobs"]]) diff --git a/src/ai/backend/storage/gpfs/types.py b/src/ai/backend/storage/gpfs/types.py index 0a95455e1a..57dcea5662 100644 --- a/src/ai/backend/storage/gpfs/types.py +++ b/src/ai/backend/storage/gpfs/types.py @@ -100,9 +100,8 @@ class GPFSFilesystem(DataClassJsonMixin): class GPFSQuota(DataClassJsonMixin): quotaId: Optional[int] filesystemName: Optional[str] - filesetName: Optional[str] quotaType: Optional[str] - objectName: Optional[str] + objectName: Optional[str] # This represents fileset name you query quota of filesets. objectId: Optional[int] blockUsage: Optional[int] blockQuota: Optional[int] @@ -114,7 +113,7 @@ class GPFSQuota(DataClassJsonMixin): filesLimit: Optional[int] filesInDoubt: Optional[int] filesGrace: Optional[str] - defaultQuota: Optional[bool] + isDefaultQuota: Optional[bool] @dataclass