From 3ee9c74a834e16b6820a515d193d03412b98a96b Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Wed, 10 Jan 2024 17:14:25 +0900 Subject: [PATCH 01/13] fix: update session's occupying slots when kernel starts --- src/ai/backend/manager/models/session.py | 57 ++++++++++++++++++++++++ src/ai/backend/manager/registry.py | 11 ++--- tests/manager/test_registry.py | 6 +-- 3 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/ai/backend/manager/models/session.py b/src/ai/backend/manager/models/session.py index bd23e11eef..0e20a709b1 100644 --- a/src/ai/backend/manager/models/session.py +++ b/src/ai/backend/manager/models/session.py @@ -784,6 +784,63 @@ async def _check_and_update() -> SessionStatus | None: return await execute_with_retry(_check_and_update) + @classmethod + async def finalize_running( + cls, + db: ExtendedAsyncSAEngine, + session_id: SessionId, + kernel_actual_allocs: ResourceSlot, + ) -> SessionStatus: + """ + Update session's occupying resource slots and finalize session status to RUNNING. + + Check status of session's sibling kernels and transit the status of session. + Return the current status of session. + """ + now = datetime.now(tzutc()) + + async def _check_and_update() -> SessionStatus: + async with db.begin_session() as db_session: + session_query = ( + sa.select(SessionRow) + .where(SessionRow.id == session_id) + .with_for_update() + .options( + noload("*"), + load_only( + SessionRow.status, + SessionRow.occupying_slots, + ), + selectinload(SessionRow.kernels).options( + noload("*"), load_only(KernelRow.status, KernelRow.cluster_role) + ), + ) + ) + session_row: SessionRow = (await db_session.scalars(session_query)).first() + determined_status = determine_session_status(session_row.kernels) + + update_values = { + "occupying_slots": session_row.occupying_slots + kernel_actual_allocs, + } + if determined_status != session_row.status: + update_values["status"] = determined_status + update_values["status_history"] = sql_json_merge( + SessionRow.status_history, + (), + { + determined_status.name: now.isoformat(), + }, + ) + if determined_status in (SessionStatus.CANCELLED, SessionStatus.TERMINATED): + update_values["terminated_at"] = now + update_query = ( + sa.update(SessionRow).where(SessionRow.id == session_id).values(**update_values) + ) + await db_session.execute(update_query) + return determined_status + + return await execute_with_retry(_check_and_update) + @staticmethod async def set_session_status( db: ExtendedAsyncSAEngine, diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index 8158a992a6..f3545a016d 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -1514,7 +1514,7 @@ def convert_resource_spec_to_resource_slot( total_allocs.append(Decimal(BinarySize.from_str(allocation))) else: # maybe Decimal("Infinity"), etc. total_allocs.append(Decimal(allocation)) - slots[slot_name] = str(sum(total_allocs)) + slots[slot_name] = sum(total_allocs) return slots async def finalize_running( @@ -1549,13 +1549,10 @@ async def finalize_running( ), } self._kernel_actual_allocated_resources[kernel_id] = actual_allocs - kernel_did_update = await KernelRow.update_kernel( - self.db, kernel_id, new_status, update_data=update_data + await KernelRow.update_kernel(self.db, kernel_id, new_status, update_data=update_data) + new_session_status = await SessionRow.finalize_running( + self.db, session_id, actual_allocs ) - if not kernel_did_update: - return - - new_session_status = await SessionRow.transit_session_status(self.db, session_id) if new_session_status is None or new_session_status != SessionStatus.RUNNING: return query = ( diff --git a/tests/manager/test_registry.py b/tests/manager/test_registry.py index 58c6f1a390..9e651e9655 100644 --- a/tests/manager/test_registry.py +++ b/tests/manager/test_registry.py @@ -173,7 +173,7 @@ async def test_convert_resource_spec_to_resource_slot( }, } converted_allocations = registry.convert_resource_spec_to_resource_slot(allocations) - assert converted_allocations["cuda.shares"] == "4.5" + assert converted_allocations["cuda.shares"] == 4.5 allocations = { "cpu": { SlotName("cpu"): { @@ -189,5 +189,5 @@ async def test_convert_resource_spec_to_resource_slot( }, } converted_allocations = registry.convert_resource_spec_to_resource_slot(allocations) - assert converted_allocations["cpu"] == "4" - assert converted_allocations["ram"] == str(Decimal(BinarySize.from_str("1g")) * 3) + assert converted_allocations["cpu"] == 4 + assert converted_allocations["ram"] == Decimal(BinarySize.from_str("1g")) * 3 From c76c17822620c36578669c19c69c4484a2f4a3f6 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Wed, 10 Jan 2024 17:30:02 +0900 Subject: [PATCH 02/13] add news fragment --- changes/1832.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/1832.fix.md diff --git a/changes/1832.fix.md b/changes/1832.fix.md new file mode 100644 index 0000000000..38c826e181 --- /dev/null +++ b/changes/1832.fix.md @@ -0,0 +1 @@ +Update `occupying_slots` field of `sessions` table when kernel starts. From b147512d3bec841d8f7602b6744c73b889d2e13a Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 29 Mar 2024 13:35:40 +0900 Subject: [PATCH 03/13] revert convert_resource_spec_to_resource_slot to return stringified ResourceSlot --- src/ai/backend/manager/registry.py | 2 +- tests/manager/test_registry.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index b0081c1622..89f1e03bc7 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -1523,7 +1523,7 @@ def convert_resource_spec_to_resource_slot( total_allocs.append(Decimal(BinarySize.from_str(allocation))) else: # maybe Decimal("Infinity"), etc. total_allocs.append(Decimal(allocation)) - slots[slot_name] = sum(total_allocs) + slots[slot_name] = str(sum(total_allocs)) return slots async def finalize_running( diff --git a/tests/manager/test_registry.py b/tests/manager/test_registry.py index 9e651e9655..58c6f1a390 100644 --- a/tests/manager/test_registry.py +++ b/tests/manager/test_registry.py @@ -173,7 +173,7 @@ async def test_convert_resource_spec_to_resource_slot( }, } converted_allocations = registry.convert_resource_spec_to_resource_slot(allocations) - assert converted_allocations["cuda.shares"] == 4.5 + assert converted_allocations["cuda.shares"] == "4.5" allocations = { "cpu": { SlotName("cpu"): { @@ -189,5 +189,5 @@ async def test_convert_resource_spec_to_resource_slot( }, } converted_allocations = registry.convert_resource_spec_to_resource_slot(allocations) - assert converted_allocations["cpu"] == 4 - assert converted_allocations["ram"] == Decimal(BinarySize.from_str("1g")) * 3 + assert converted_allocations["cpu"] == "4" + assert converted_allocations["ram"] == str(Decimal(BinarySize.from_str("1g")) * 3) From f7f03a1de2a6b192a3d192020a09d4cf1c37b955 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 29 Mar 2024 13:52:05 +0900 Subject: [PATCH 04/13] convert value of occupying_slots to Decimal when sum session's all kernel slots --- src/ai/backend/common/types.py | 5 +++++ src/ai/backend/manager/models/session.py | 17 ++++++++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/ai/backend/common/types.py b/src/ai/backend/common/types.py index ebeb67ee82..d2941d6b61 100644 --- a/src/ai/backend/common/types.py +++ b/src/ai/backend/common/types.py @@ -591,6 +591,11 @@ def __format__(self, format_spec): class ResourceSlot(UserDict): + """ + key: `str` type slot name. + value: `str` or `Decimal` type value. Do not convert this to `float` or `int`. + """ + __slots__ = ("data",) def __init__(self, *args, **kwargs) -> None: diff --git a/src/ai/backend/manager/models/session.py b/src/ai/backend/manager/models/session.py index e5e8701704..dd0f04a56c 100644 --- a/src/ai/backend/manager/models/session.py +++ b/src/ai/backend/manager/models/session.py @@ -16,6 +16,7 @@ Optional, Sequence, Union, + cast, ) from uuid import UUID @@ -816,12 +817,18 @@ async def _check_and_update() -> SessionStatus: ), ) ) - session_row: SessionRow = (await db_session.scalars(session_query)).first() - determined_status = determine_session_status(session_row.kernels) - - update_values = { - "occupying_slots": session_row.occupying_slots + kernel_actual_allocs, + result = (await db_session.scalars(session_query)).first() + session_row = cast(SessionRow, result) + session_occupying_slots = cast(ResourceSlot, session_row.occupying_slots) + session_occupying_slots.sync_keys(kernel_actual_allocs) + for key, val in session_occupying_slots.items(): + session_occupying_slots[key] = str( + Decimal(val) + Decimal(kernel_actual_allocs[key]) + ) + update_values: dict[str, Any] = { + "occupying_slots": session_occupying_slots, } + determined_status = determine_session_status(session_row.kernels) if determined_status != session_row.status: update_values["status"] = determined_status update_values["status_history"] = sql_json_merge( From a68f7d0fa73429d1560e95b1efbf9322692ea617 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 29 Mar 2024 04:55:04 +0000 Subject: [PATCH 05/13] chore: update GraphQL schema dump --- src/ai/backend/manager/api/schema.graphql | 200 ++++++++++++++++++---- 1 file changed, 171 insertions(+), 29 deletions(-) diff --git a/src/ai/backend/manager/api/schema.graphql b/src/ai/backend/manager/api/schema.graphql index 82d08d96e9..823fd7365a 100644 --- a/src/ai/backend/manager/api/schema.graphql +++ b/src/ai/backend/manager/api/schema.graphql @@ -8,7 +8,7 @@ type Queries { node( """The ID of the object""" id: ID! - ): AsyncNode + ): Node agent(agent_id: String!): Agent agent_list(limit: Int!, offset: Int!, filter: String, order: String, scaling_group: String, status: String): AgentList agents(scaling_group: String, status: String): [Agent] @@ -24,8 +24,14 @@ type Queries { group_nodes(filter: String, order: String, offset: Int, before: String, after: String, first: Int, last: Int): GroupConnection group(id: UUID!, domain_name: String): Group groups_by_name(name: String!, domain_name: String): [Group] - groups(domain_name: String, is_active: Boolean): [Group] - image(reference: String!, architecture: String = "aarch64"): Image + groups( + domain_name: String + is_active: Boolean + + """Added since 24.03.0. Available values: GENERAL, MODEL_STORE""" + type: [String] = ["GENERAL"] + ): [Group] + image(reference: String!, architecture: String = "x86_64"): Image images(is_installed: Boolean, is_operation: Boolean): [Image] user(domain_name: String, email: String): User user_from_uuid(domain_name: String, user_id: ID): User @@ -78,13 +84,19 @@ type Queries { quota_scope(storage_host_name: String!, quota_scope_id: String!): QuotaScope container_registry(hostname: String!): ContainerRegistry container_registries: [ContainerRegistry] + + """Added in 24.03.0.""" + model_card(id: String!): ModelCard + + """Added in 24.03.0.""" + model_cards(filter: String, order: String, offset: Int, before: String, after: String, first: Int, last: Int): ModelCardConnection } """ This GraphQL Relay Node extension is for running asynchronous resolvers and fine-grained handling of global id. Refer to: https://github.com/graphql-python/graphene/blob/master/graphene/relay/node.py """ -interface AsyncNode { +interface Node { """The ID of the object""" id: ID! } @@ -218,7 +230,7 @@ type Domain { scaling_groups: [String] } -type GroupNode implements AsyncNode { +type GroupNode implements Node { """The ID of the object""" id: ID! name: String @@ -272,7 +284,7 @@ type UserEdge { cursor: String! } -type UserNode implements AsyncNode { +type UserNode implements Node { """The ID of the object""" id: ID! @@ -338,6 +350,9 @@ type Group { allowed_vfolder_hosts: JSONString integration_id: String resource_policy: String + + """Added since 24.03.0.""" + type: String scaling_groups: [String] } @@ -397,6 +412,11 @@ type User implements Item { totp_activated: Boolean totp_activated_at: DateTime sudo_session_enabled: Boolean + + """ + Added in 24.03.0. Used as the default authentication credential for password-based logins and sets the user's total resource usage limit. User's main_access_key cannot be deleted, and only super-admin can replace main_access_key. + """ + main_access_key: String groups: [UserGroup] } @@ -529,9 +549,9 @@ type KeyPairResourcePolicy { max_containers_per_session: Int idle_timeout: BigInt allowed_vfolder_hosts: JSONString - max_vfolder_count: Int @deprecated(reason: "Deprecated since 23.09.4.") - max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.4.") - max_quota_scope_size: BigInt @deprecated(reason: "Deprecated since 23.09.4.") + max_vfolder_count: Int @deprecated(reason: "Deprecated since 23.09.4") + max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.4") + max_quota_scope_size: BigInt @deprecated(reason: "Deprecated since 23.09.4") } type UserResourcePolicy { @@ -544,7 +564,12 @@ type UserResourcePolicy { """Added since 24.03.1. Limitation of the quota size of user vfolders.""" max_quota_scope_size: BigInt - max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.1.") + max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.1") + + """ + Added since 23.09.10. Maximum available number of sessions per single model service which the user is in charge of. + """ + max_session_count_per_model_session: Int } type ProjectResourcePolicy { @@ -557,7 +582,7 @@ type ProjectResourcePolicy { """Added since 24.03.1. Limitation of the quota size of project vfolders.""" max_quota_scope_size: BigInt - max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.1.") + max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.1") } type ResourcePreset { @@ -697,7 +722,10 @@ type PredefinedAtomicPermission { type Endpoint implements Item { id: ID endpoint_id: UUID - image: String + image: String @deprecated(reason: "Deprecated since 23.09.9; use `image_object`") + + """Added at 23.09.9""" + image_object: ImageNode domain: String project: String resource_group: String @@ -705,8 +733,20 @@ type Endpoint implements Item { url: String model: UUID model_mount_destiation: String - created_user: UUID - session_owner: UUID + created_user: UUID @deprecated(reason: "Deprecated since 23.09.8; use `created_user_id`") + + """Added at 23.09.8""" + created_user_email: String + + """Added at 23.09.8""" + created_user_id: UUID + session_owner: UUID @deprecated(reason: "Deprecated since 23.09.8; use `session_owner_id`") + + """Added at 23.09.8""" + session_owner_email: String + + """Added at 23.09.8""" + session_owner_id: UUID tag: String startup_command: String bootstrap_script: String @@ -727,6 +767,22 @@ type Endpoint implements Item { errors: [InferenceSessionError!]! } +type ImageNode implements Node { + """The ID of the object""" + id: ID! + name: String + humanized_name: String + tag: String + registry: String + architecture: String + is_local: Boolean + digest: String + labels: [KVPair] + size_bytes: BigInt + resource_limits: [ResourceLimit] + supported_accelerators: [String] +} + type Routing implements Item { id: ID routing_id: UUID @@ -795,12 +851,6 @@ type ContainerRegistry implements Node { config: ContainerRegistryConfig } -"""An object with an ID""" -interface Node { - """The ID of the object""" - id: ID! -} - type ContainerRegistryConfig { url: String! type: String! @@ -810,6 +860,58 @@ type ContainerRegistryConfig { ssl_verify: Boolean } +type ModelCard implements Node { + """The ID of the object""" + id: ID! + name: String + vfolder: VirtualFolder + author: String + + """Human readable name of the model.""" + title: String + version: String + + """The time the model was created.""" + created_at: DateTime + + """The last time the model was modified.""" + modified_at: DateTime + description: String + task: String + category: String + architecture: String + framework: [String] + label: [String] + license: String + min_resource: JSONString + readme: String + + """ + Type (mostly extension of the filename) of the README file. e.g. md, rst, txt, ... + """ + readme_filetype: String +} + +type ModelCardConnection { + """Pagination data for this connection.""" + pageInfo: PageInfo! + + """Contains the nodes in this connection.""" + edges: [ModelCardEdge]! + + """Total count of the GQL nodes of the query.""" + count: Int +} + +"""A Relay edge containing a `ModelCard` and its cursor.""" +type ModelCardEdge { + """The item at the end of the edge""" + node: ModelCard + + """A cursor for use in pagination""" + cursor: String! +} + """All available GraphQL mutations.""" type Mutations { modify_agent(id: String!, props: ModifyAgentInput!): ModifyAgent @@ -871,9 +973,9 @@ type Mutations { rescan_images(registry: String): RescanImages preload_image(references: [String]!, target_agents: [String]!): PreloadImage unload_image(references: [String]!, target_agents: [String]!): UnloadImage - modify_image(architecture: String = "aarch64", props: ModifyImageInput!, target: String!): ModifyImage - forget_image(architecture: String = "aarch64", reference: String!): ForgetImage - alias_image(alias: String!, architecture: String = "aarch64", target: String!): AliasImage + modify_image(architecture: String = "x86_64", props: ModifyImageInput!, target: String!): ModifyImage + forget_image(architecture: String = "x86_64", reference: String!): ForgetImage + alias_image(alias: String!, architecture: String = "x86_64", target: String!): AliasImage dealias_image(alias: String!): DealiasImage clear_images(registry: String): ClearImages create_keypair_resource_policy(name: String!, props: CreateKeyPairResourcePolicyInput!): CreateKeyPairResourcePolicy @@ -904,6 +1006,7 @@ type Mutations { create_container_registry(hostname: String!, props: CreateContainerRegistryInput!): CreateContainerRegistry modify_container_registry(hostname: String!, props: ModifyContainerRegistryInput!): ModifyContainerRegistry delete_container_registry(hostname: String!): DeleteContainerRegistry + modify_endpoint(endpoint_id: UUID!, props: ModifyEndpointInput!): ModifyEndpoint } type ModifyAgent { @@ -971,6 +1074,8 @@ type CreateGroup { } input GroupInput { + """Added since 24.03.0. Available values: GENERAL, MODEL_STORE""" + type: String = "GENERAL" description: String = "" is_active: Boolean = true domain_name: String! @@ -1061,6 +1166,7 @@ input ModifyUserInput { totp_activated: Boolean resource_policy: String sudo_session_enabled: Boolean + main_access_key: String } """ @@ -1212,9 +1318,9 @@ input CreateKeyPairResourcePolicyInput { max_containers_per_session: Int! idle_timeout: BigInt! allowed_vfolder_hosts: JSONString - max_vfolder_count: Int @deprecated(reason: "Deprecated since 23.09.4.") - max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.4.") - max_quota_scope_size: BigInt @deprecated(reason: "Deprecated since 23.09.4.") + max_vfolder_count: Int @deprecated(reason: "Deprecated since 23.09.4") + max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.4") + max_quota_scope_size: BigInt @deprecated(reason: "Deprecated since 23.09.4") } type ModifyKeyPairResourcePolicy { @@ -1231,9 +1337,9 @@ input ModifyKeyPairResourcePolicyInput { max_containers_per_session: Int idle_timeout: BigInt allowed_vfolder_hosts: JSONString - max_vfolder_count: Int @deprecated(reason: "Deprecated since 23.09.4.") - max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.4.") - max_quota_scope_size: BigInt @deprecated(reason: "Deprecated since 23.09.4.") + max_vfolder_count: Int @deprecated(reason: "Deprecated since 23.09.4") + max_vfolder_size: BigInt @deprecated(reason: "Deprecated since 23.09.4") + max_quota_scope_size: BigInt @deprecated(reason: "Deprecated since 23.09.4") } type DeleteKeyPairResourcePolicy { @@ -1253,6 +1359,11 @@ input CreateUserResourcePolicyInput { """Added since 24.03.1. Limitation of the quota size of user vfolders.""" max_quota_scope_size: BigInt + + """ + Added since 24.03.1. Maximum available number of sessions per single model service which the user is in charge of. + """ + max_session_count_per_model_session: Int } type ModifyUserResourcePolicy { @@ -1266,6 +1377,11 @@ input ModifyUserResourcePolicyInput { """Added since 24.03.1. Limitation of the quota size of user vfolders.""" max_quota_scope_size: BigInt + + """ + Added since 24.03.1. Maximum available number of sessions per single model service which the user is in charge of. + """ + max_session_count_per_model_session: Int } type DeleteUserResourcePolicy { @@ -1453,4 +1569,30 @@ input ModifyContainerRegistryInput { type DeleteContainerRegistry { container_registry: ContainerRegistry +} + +type ModifyEndpoint { + ok: Boolean + msg: String + + """Added at 23.09.8""" + endpoint: Endpoint +} + +input ModifyEndpointInput { + resource_slots: JSONString + resource_opts: JSONString + cluster_mode: String + cluster_size: Int + desired_session_count: Int + image: ImageRefType + name: String + resource_group: String + open_to_public: Boolean +} + +input ImageRefType { + name: String! + registry: String + architecture: String } \ No newline at end of file From dde28a78f6ba428a35ee197128bf2d984d94e4e6 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 29 Mar 2024 14:34:24 +0900 Subject: [PATCH 06/13] classmethod to function --- src/ai/backend/manager/models/session.py | 125 +++++++++++------------ src/ai/backend/manager/registry.py | 5 +- 2 files changed, 64 insertions(+), 66 deletions(-) diff --git a/src/ai/backend/manager/models/session.py b/src/ai/backend/manager/models/session.py index dd0f04a56c..cf69dc5173 100644 --- a/src/ai/backend/manager/models/session.py +++ b/src/ai/backend/manager/models/session.py @@ -785,69 +785,6 @@ async def _check_and_update() -> SessionStatus | None: return await execute_with_retry(_check_and_update) - @classmethod - async def finalize_running( - cls, - db: ExtendedAsyncSAEngine, - session_id: SessionId, - kernel_actual_allocs: ResourceSlot, - ) -> SessionStatus: - """ - Update session's occupying resource slots and finalize session status to RUNNING. - - Check status of session's sibling kernels and transit the status of session. - Return the current status of session. - """ - now = datetime.now(tzutc()) - - async def _check_and_update() -> SessionStatus: - async with db.begin_session() as db_session: - session_query = ( - sa.select(SessionRow) - .where(SessionRow.id == session_id) - .with_for_update() - .options( - noload("*"), - load_only( - SessionRow.status, - SessionRow.occupying_slots, - ), - selectinload(SessionRow.kernels).options( - noload("*"), load_only(KernelRow.status, KernelRow.cluster_role) - ), - ) - ) - result = (await db_session.scalars(session_query)).first() - session_row = cast(SessionRow, result) - session_occupying_slots = cast(ResourceSlot, session_row.occupying_slots) - session_occupying_slots.sync_keys(kernel_actual_allocs) - for key, val in session_occupying_slots.items(): - session_occupying_slots[key] = str( - Decimal(val) + Decimal(kernel_actual_allocs[key]) - ) - update_values: dict[str, Any] = { - "occupying_slots": session_occupying_slots, - } - determined_status = determine_session_status(session_row.kernels) - if determined_status != session_row.status: - update_values["status"] = determined_status - update_values["status_history"] = sql_json_merge( - SessionRow.status_history, - (), - { - determined_status.name: now.isoformat(), - }, - ) - if determined_status in (SessionStatus.CANCELLED, SessionStatus.TERMINATED): - update_values["terminated_at"] = now - update_query = ( - sa.update(SessionRow).where(SessionRow.id == session_id).values(**update_values) - ) - await db_session.execute(update_query) - return determined_status - - return await execute_with_retry(_check_and_update) - @staticmethod async def set_session_status( db: ExtendedAsyncSAEngine, @@ -1143,6 +1080,68 @@ async def get_sgroup_managed_sessions( return result.scalars().all() +async def finalize_running( + db: ExtendedAsyncSAEngine, + session_id: SessionId, + kernel_actual_allocs: ResourceSlot, +) -> SessionStatus: + """ + Update session's occupying resource slots and finalize session status to RUNNING. + + Check status of session's sibling kernels and transit the status of session. + Return the current status of session. + """ + now = datetime.now(tzutc()) + + async def _check_and_update() -> SessionStatus: + async with db.begin_session() as db_session: + session_query = ( + sa.select(SessionRow) + .where(SessionRow.id == session_id) + .with_for_update() + .options( + noload("*"), + load_only( + SessionRow.status, + SessionRow.occupying_slots, + ), + selectinload(SessionRow.kernels).options( + noload("*"), load_only(KernelRow.status, KernelRow.cluster_role) + ), + ) + ) + result = (await db_session.scalars(session_query)).first() + session_row = cast(SessionRow, result) + session_occupying_slots = cast(ResourceSlot, session_row.occupying_slots) + session_occupying_slots.sync_keys(kernel_actual_allocs) + for key, val in session_occupying_slots.items(): + session_occupying_slots[key] = str( + Decimal(val) + Decimal(kernel_actual_allocs[key]) + ) + update_values: dict[str, Any] = { + "occupying_slots": session_occupying_slots, + } + determined_status = determine_session_status(session_row.kernels) + if determined_status != session_row.status: + update_values["status"] = determined_status + update_values["status_history"] = sql_json_merge( + SessionRow.status_history, + (), + { + determined_status.name: now.isoformat(), + }, + ) + if determined_status in (SessionStatus.CANCELLED, SessionStatus.TERMINATED): + update_values["terminated_at"] = now + update_query = ( + sa.update(SessionRow).where(SessionRow.id == session_id).values(**update_values) + ) + await db_session.execute(update_query) + return determined_status + + return await execute_with_retry(_check_and_update) + + class SessionDependencyRow(Base): __tablename__ = "session_dependencies" session_id = sa.Column( diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index 89f1e03bc7..b2db0e778f 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -163,6 +163,7 @@ scaling_groups, verify_vfolder_name, ) +from .models.session import finalize_running as finalize_session_running from .models.utils import ( ExtendedAsyncSAEngine, execute_with_retry, @@ -1559,9 +1560,7 @@ async def finalize_running( } self._kernel_actual_allocated_resources[kernel_id] = actual_allocs await KernelRow.update_kernel(self.db, kernel_id, new_status, update_data=update_data) - new_session_status = await SessionRow.finalize_running( - self.db, session_id, actual_allocs - ) + new_session_status = await finalize_session_running(self.db, session_id, actual_allocs) if new_session_status is None or new_session_status != SessionStatus.RUNNING: return query = ( From 296bd2ed0d2dc1cb814c1e44f08e812dbdec3987 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 1 Apr 2024 17:32:42 +0900 Subject: [PATCH 07/13] resolve occupying_slots GQL field from row instead of sibling kernels --- src/ai/backend/manager/models/session.py | 34 ++---------------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/src/ai/backend/manager/models/session.py b/src/ai/backend/manager/models/session.py index cf69dc5173..605d8273c8 100644 --- a/src/ai/backend/manager/models/session.py +++ b/src/ai/backend/manager/models/session.py @@ -39,7 +39,6 @@ SessionId, SessionResult, SessionTypes, - SlotName, VFolderMount, ) @@ -1322,6 +1321,8 @@ def parse_row(cls, ctx: GraphQueryContext, row: Row) -> Mapping[str, Any]: "service_ports": row.main_kernel.service_ports, "mounts": [mount.name for mount in row.vfolder_mounts], "vfolder_mounts": row.vfolder_mounts, + "occupying_slots": row.occupying_slots.to_json(), + "occupied_slots": row.occupying_slots.to_json(), # statistics "num_queries": row.num_queries, } @@ -1333,23 +1334,6 @@ def from_row(cls, ctx: GraphQueryContext, row: Row) -> ComputeSession | None: props = cls.parse_row(ctx, row) return cls(**props) - async def resolve_occupying_slots(self, info: graphene.ResolveInfo) -> Mapping[str, Any]: - """ - Calculate the sum of occupying resource slots of all sub-kernels, - and return the JSON-serializable object from the sum result. - """ - graph_ctx: GraphQueryContext = info.context - loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "ComputeContainer.by_session") - containers = await loader.load(self.session_id) - zero = ResourceSlot() - return sum( - ( - ResourceSlot({SlotName(k): Decimal(v) for k, v in c.occupied_slots.items()}) - for c in containers - ), - start=zero, - ).to_json() - async def resolve_inference_metrics( self, info: graphene.ResolveInfo ) -> Optional[Mapping[str, Any]]: @@ -1359,20 +1343,6 @@ async def resolve_inference_metrics( ) return await loader.load(self.id) - # legacy - async def resolve_occupied_slots(self, info: graphene.ResolveInfo) -> Mapping[str, Any]: - graph_ctx: GraphQueryContext = info.context - loader = graph_ctx.dataloader_manager.get_loader(graph_ctx, "ComputeContainer.by_session") - containers = await loader.load(self.session_id) - zero = ResourceSlot() - return sum( - ( - ResourceSlot({SlotName(k): Decimal(v) for k, v in c.occupied_slots.items()}) - for c in containers - ), - start=zero, - ).to_json() - async def resolve_containers( self, info: graphene.ResolveInfo, From c0cfd295e7ffe4941ba387e5c5ac873df125c82c Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 1 Apr 2024 18:31:20 +0900 Subject: [PATCH 08/13] add alembic migration to sync occupying_slots --- ...ync_session_occupying_slots_to_sibling_.py | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py diff --git a/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py b/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py new file mode 100644 index 0000000000..668a87a429 --- /dev/null +++ b/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py @@ -0,0 +1,114 @@ +"""sync_session_occupying_slots_to_sibling_kernels + +Revision ID: 679e5721e94d +Revises: 75ea2b136830 +Create Date: 2024-04-01 17:34:33.480996 + +""" + +import textwrap +from typing import Any, cast + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql as pgsql +from sqlalchemy.orm import Session, registry, relationship, selectinload +from sqlalchemy.sql import text + +from ai.backend.common.types import ResourceSlot +from ai.backend.manager.models.base import GUID, IDColumn, ResourceSlotColumn, convention + +# revision identifiers, used by Alembic. +revision = "679e5721e94d" +down_revision = "75ea2b136830" +branch_labels = None +depends_on = None + +metadata = sa.MetaData(naming_convention=convention) +mapper_registry = registry(metadata=metadata) +Base: Any = mapper_registry.generate_base() + +PAGE_SIZE = 100 + + +class SessionRow(Base): + __tablename__ = "sessions" + __table_args__ = {"extend_existing": True} + + id = IDColumn() + cluster_size = sa.Column("cluster_size", sa.Integer, nullable=False, default=1) + starts_at = sa.Column("starts_at", sa.DateTime(timezone=True), nullable=True, default=sa.null()) + status_history = sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()) + occupying_slots = sa.Column("occupying_slots", ResourceSlotColumn(), nullable=False) + + kernels = relationship("KernelRow") + + +class KernelRow(Base): + __tablename__ = "kernels" + __table_args__ = {"extend_existing": True} + + id = IDColumn() + session_id = sa.Column( + "session_id", + GUID, + sa.ForeignKey("sessions.id"), + unique=False, + index=True, + nullable=False, + ) + occupied_slots = sa.Column("occupied_slots", ResourceSlotColumn(), nullable=False) + + +def _sync_single_kernel_cluster_session(): + conn = op.get_bind() + sync_stmt = textwrap.dedent( + """ + UPDATE sessions + SET occupying_slots = kernels.occupied_slots + FROM kernels + WHERE sessions.id = kernels.session_id + AND sessions.cluster_size = 1; + """ + ) + conn.execute(text(sync_stmt)) + + +def _sync_multi_kernel_cluster_session(): + db_sess = Session(op.get_bind()) + + while True: + select_stmt = ( + sa.select(SessionRow) + .where( + (SessionRow.cluster_size != 1) + & (SessionRow.occupying_slots == {}) + & (SessionRow.status_history.op("?")("RUNNING")) + & (sa.not_(SessionRow.status_history.op("?")("ERROR"))) + ) + .limit(PAGE_SIZE) + .options(selectinload(SessionRow.kernels)) + ) + session_list = cast(list[SessionRow], db_sess.scalars(select_stmt).all()) + if not session_list: + return + + update_stmt = ( + sa.update(SessionRow) + .where(SessionRow.id == sa.bindparam("session_id")) + .values(occupying_slots=sa.bindparam("occupying_slots")) + ) + data = [] + for session in session_list: + occupying_slots = sum([k.occupied_slots for k in session.kernels], start=ResourceSlot()) + data.append({"session_id": session.id, "occupying_slots": occupying_slots}) + db_sess.execute(update_stmt, data) + + +def upgrade(): + _sync_single_kernel_cluster_session() + _sync_multi_kernel_cluster_session() + + +def downgrade(): + pass From 401319323cfa126e433b789d994f7da7cac12f33 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 5 Apr 2024 23:26:47 +0900 Subject: [PATCH 09/13] update alembic migration --- .../679e5721e94d_sync_session_occupying_slots_to_sibling_.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py b/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py index 668a87a429..675994d68e 100644 --- a/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py +++ b/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py @@ -1,7 +1,7 @@ """sync_session_occupying_slots_to_sibling_kernels Revision ID: 679e5721e94d -Revises: 75ea2b136830 +Revises: 857b763b8618 Create Date: 2024-04-01 17:34:33.480996 """ @@ -20,7 +20,7 @@ # revision identifiers, used by Alembic. revision = "679e5721e94d" -down_revision = "75ea2b136830" +down_revision = "857b763b8618" branch_labels = None depends_on = None From b710811d403f23d9b269fd860c072aabfe761be8 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 10 Jun 2024 10:33:00 +0900 Subject: [PATCH 10/13] update alembic migration --- .../679e5721e94d_sync_session_occupying_slots_to_sibling_.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py b/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py index 675994d68e..ae01075068 100644 --- a/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py +++ b/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py @@ -1,7 +1,7 @@ """sync_session_occupying_slots_to_sibling_kernels Revision ID: 679e5721e94d -Revises: 857b763b8618 +Revises: f56a82d0ac9f Create Date: 2024-04-01 17:34:33.480996 """ @@ -20,7 +20,7 @@ # revision identifiers, used by Alembic. revision = "679e5721e94d" -down_revision = "857b763b8618" +down_revision = "f56a82d0ac9f" branch_labels = None depends_on = None From d38ef739c7ddb85097fe4c07125122eb1990dc8a Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 10 Jun 2024 10:42:49 +0900 Subject: [PATCH 11/13] update news fragment --- changes/1832.fix.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/1832.fix.md b/changes/1832.fix.md index 38c826e181..0d3985c2ec 100644 --- a/changes/1832.fix.md +++ b/changes/1832.fix.md @@ -1 +1 @@ -Update `occupying_slots` field of `sessions` table when kernel starts. +Do not omit to update session's occupying resources to DB when a kernel starts. From bde8d203dd00223d962a3e11772d9a2319b770a9 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 14 Jun 2024 14:37:58 +0900 Subject: [PATCH 12/13] load kernels.occupied_slots only when migration --- .../679e5721e94d_sync_session_occupying_slots_to_sibling_.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py b/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py index ae01075068..b96cd09b99 100644 --- a/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py +++ b/src/ai/backend/manager/models/alembic/versions/679e5721e94d_sync_session_occupying_slots_to_sibling_.py @@ -12,7 +12,7 @@ import sqlalchemy as sa from alembic import op from sqlalchemy.dialects import postgresql as pgsql -from sqlalchemy.orm import Session, registry, relationship, selectinload +from sqlalchemy.orm import Session, load_only, registry, relationship, selectinload from sqlalchemy.sql import text from ai.backend.common.types import ResourceSlot @@ -84,10 +84,9 @@ def _sync_multi_kernel_cluster_session(): (SessionRow.cluster_size != 1) & (SessionRow.occupying_slots == {}) & (SessionRow.status_history.op("?")("RUNNING")) - & (sa.not_(SessionRow.status_history.op("?")("ERROR"))) ) .limit(PAGE_SIZE) - .options(selectinload(SessionRow.kernels)) + .options(selectinload(SessionRow.kernels).options(load_only(KernelRow.occupied_slots))) ) session_list = cast(list[SessionRow], db_sess.scalars(select_stmt).all()) if not session_list: From 63b9dde0e14459899564a3232f9ee998bd862ffc Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Sun, 16 Jun 2024 12:59:03 +0900 Subject: [PATCH 13/13] revert usage of SessionRow.finalize_runnning() --- src/ai/backend/manager/models/session.py | 65 ------------------------ src/ai/backend/manager/registry.py | 26 ++++++++-- 2 files changed, 23 insertions(+), 68 deletions(-) diff --git a/src/ai/backend/manager/models/session.py b/src/ai/backend/manager/models/session.py index 772e040e69..fc7f7a14ed 100644 --- a/src/ai/backend/manager/models/session.py +++ b/src/ai/backend/manager/models/session.py @@ -5,7 +5,6 @@ import logging from contextlib import asynccontextmanager as actxmgr from datetime import datetime -from decimal import Decimal from typing import ( TYPE_CHECKING, Any, @@ -16,7 +15,6 @@ Optional, Sequence, Union, - cast, ) from uuid import UUID @@ -35,7 +33,6 @@ AccessKey, ClusterMode, KernelId, - ResourceSlot, SessionId, SessionResult, SessionTypes, @@ -1082,68 +1079,6 @@ async def get_sgroup_managed_sessions( return result.scalars().all() -async def finalize_running( - db: ExtendedAsyncSAEngine, - session_id: SessionId, - kernel_actual_allocs: ResourceSlot, -) -> SessionStatus: - """ - Update session's occupying resource slots and finalize session status to RUNNING. - - Check status of session's sibling kernels and transit the status of session. - Return the current status of session. - """ - now = datetime.now(tzutc()) - - async def _check_and_update() -> SessionStatus: - async with db.begin_session() as db_session: - session_query = ( - sa.select(SessionRow) - .where(SessionRow.id == session_id) - .with_for_update() - .options( - noload("*"), - load_only( - SessionRow.status, - SessionRow.occupying_slots, - ), - selectinload(SessionRow.kernels).options( - noload("*"), load_only(KernelRow.status, KernelRow.cluster_role) - ), - ) - ) - result = (await db_session.scalars(session_query)).first() - session_row = cast(SessionRow, result) - session_occupying_slots = cast(ResourceSlot, session_row.occupying_slots) - session_occupying_slots.sync_keys(kernel_actual_allocs) - for key, val in session_occupying_slots.items(): - session_occupying_slots[key] = str( - Decimal(val) + Decimal(kernel_actual_allocs[key]) - ) - update_values: dict[str, Any] = { - "occupying_slots": session_occupying_slots, - } - determined_status = determine_session_status(session_row.kernels) - if determined_status != session_row.status: - update_values["status"] = determined_status - update_values["status_history"] = sql_json_merge( - SessionRow.status_history, - (), - { - determined_status.name: now.isoformat(), - }, - ) - if determined_status in (SessionStatus.CANCELLED, SessionStatus.TERMINATED): - update_values["terminated_at"] = now - update_query = ( - sa.update(SessionRow).where(SessionRow.id == session_id).values(**update_values) - ) - await db_session.execute(update_query) - return determined_status - - return await execute_with_retry(_check_and_update) - - class SessionDependencyRow(Base): __tablename__ = "session_dependencies" session_id = sa.Column( diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index c18bfdf51b..9b5453121f 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -164,10 +164,10 @@ scaling_groups, verify_vfolder_name, ) -from .models.session import finalize_running as finalize_session_running from .models.utils import ( ExtendedAsyncSAEngine, execute_with_retry, + execute_with_txn_retry, is_db_retry_error, reenter_txn, reenter_txn_session, @@ -1582,8 +1582,28 @@ async def finalize_running( ), } self._kernel_actual_allocated_resources[kernel_id] = actual_allocs - await KernelRow.update_kernel(self.db, kernel_id, new_status, update_data=update_data) - new_session_status = await finalize_session_running(self.db, session_id, actual_allocs) + + async def _update_session_occupying_slots(db_session: AsyncSession) -> None: + _stmt = sa.select(SessionRow).where(SessionRow.id == session_id) + session_row = cast(SessionRow | None, await db_session.scalar(_stmt)) + if session_row is None: + raise SessionNotFound(f"Failed to fetch session (id:{session_id})") + session_occupying_slots = ResourceSlot.from_json({**session_row.occupying_slots}) + session_occupying_slots.sync_keys(actual_allocs) + for key, val in session_occupying_slots.items(): + session_occupying_slots[key] = str(Decimal(val) + Decimal(actual_allocs[key])) + session_row.occupying_slots = session_occupying_slots + + async with self.db.connect() as db_conn: + await execute_with_txn_retry( + _update_session_occupying_slots, self.db.begin_session, db_conn + ) + kernel_did_update = await KernelRow.update_kernel( + self.db, kernel_id, new_status, update_data=update_data + ) + if not kernel_did_update: + return + new_session_status = await SessionRow.transit_session_status(self.db, session_id) if new_session_status is None or new_session_status != SessionStatus.RUNNING: return query = (