diff --git a/src/ai/backend/manager/scheduler/drf.py b/src/ai/backend/manager/scheduler/drf.py index e261f0b2072..85cc3047a1c 100644 --- a/src/ai/backend/manager/scheduler/drf.py +++ b/src/ai/backend/manager/scheduler/drf.py @@ -15,7 +15,6 @@ RoundRobinContext, SessionId, ) -from ai.backend.manager.scheduler.utils import select_agent from ..models import AgentRow, SessionRow from ..models.scaling_group import ScalingGroupOpts @@ -111,14 +110,9 @@ async def _assign_agent( if self.per_user_dominant_share[access_key] < dominant_share_from_request: self.per_user_dominant_share[access_key] = dominant_share_from_request - agent_selection_strategy = self.sgroup_opts.agent_selection_strategy - agent_selection_resource_priority = self.agent_selection_resource_priority - - return await select_agent( + return await self.select_agent( possible_agents, pending_session_or_kernel, - agent_selection_strategy, - agent_selection_resource_priority, roundrobin_context, ) diff --git a/src/ai/backend/manager/scheduler/fifo.py b/src/ai/backend/manager/scheduler/fifo.py index 55c7f7fa938..c4e530878b7 100644 --- a/src/ai/backend/manager/scheduler/fifo.py +++ b/src/ai/backend/manager/scheduler/fifo.py @@ -11,7 +11,6 @@ RoundRobinContext, SessionId, ) -from ai.backend.manager.scheduler.utils import select_agent from ..models import AgentRow, SessionRow from .types import AbstractScheduler, KernelInfo @@ -69,14 +68,9 @@ async def assign_agent_for_session( pending_session: SessionRow, roundrobin_context: Optional[RoundRobinContext] = None, ) -> Optional[AgentId]: - agent_selection_strategy = self.sgroup_opts.agent_selection_strategy - agent_selection_resource_priority = self.agent_selection_resource_priority - - return await select_agent( + return await self.select_agent( agents, pending_session, - agent_selection_strategy, - agent_selection_resource_priority, roundrobin_context, ) @@ -85,14 +79,9 @@ async def assign_agent_for_kernel( agents: Sequence[AgentRow], pending_kernel: KernelInfo, ) -> Optional[AgentId]: - agent_selection_strategy = self.sgroup_opts.agent_selection_strategy - agent_selection_resource_priority = self.agent_selection_resource_priority - - return await select_agent( + return await self.select_agent( agents, pending_kernel, - agent_selection_strategy, - agent_selection_resource_priority, ) @@ -114,14 +103,9 @@ async def assign_agent_for_session( pending_session: SessionRow, roundrobin_context: Optional[RoundRobinContext] = None, ) -> Optional[AgentId]: - agent_selection_strategy = self.sgroup_opts.agent_selection_strategy - agent_selection_resource_priority = self.agent_selection_resource_priority - - return await select_agent( + return await self.select_agent( agents, pending_session, - agent_selection_strategy, - agent_selection_resource_priority, roundrobin_context, ) @@ -130,12 +114,7 @@ async def assign_agent_for_kernel( agents: Sequence[AgentRow], pending_kernel: KernelInfo, ) -> Optional[AgentId]: - agent_selection_strategy = self.sgroup_opts.agent_selection_strategy - agent_selection_resource_priority = self.agent_selection_resource_priority - - return await select_agent( + return await self.select_agent( agents, pending_kernel, - agent_selection_strategy, - agent_selection_resource_priority, ) diff --git a/src/ai/backend/manager/scheduler/types.py b/src/ai/backend/manager/scheduler/types.py index 85b9e488363..ea6c45a86cf 100644 --- a/src/ai/backend/manager/scheduler/types.py +++ b/src/ai/backend/manager/scheduler/types.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import sys import uuid from abc import ABCMeta, abstractmethod from datetime import datetime @@ -29,16 +30,23 @@ from ai.backend.common.types import ( AccessKey, AgentId, + AgentSelectionStrategy, ClusterMode, KernelId, ResourceSlot, RoundRobinContext, + RoundRobinState, SessionId, SessionTypes, SlotName, SlotTypes, VFolderMount, ) +from ai.backend.manager.scheduler.utils import ( + get_num_extras, + get_requested_architecture, + sort_requested_slots_by_priority, +) from ..defs import DEFAULT_ROLE from ..models import AgentRow, KernelRow, SessionRow, kernels, keypairs @@ -459,3 +467,103 @@ async def assign_agent_for_kernel( This may be called multiple times for multi-node multi-container sessions. """ return None + + async def select_agent( + self, + possible_agents: Sequence[AgentRow], + pending_session_or_kernel: SessionRow | KernelInfo, + roundrobin_context: Optional[RoundRobinContext] = None, + ) -> Optional[AgentId]: + """ + Select an agent for the pending session or kernel. + """ + + agent_selection_strategy = self.sgroup_opts.agent_selection_strategy + requested_slots = pending_session_or_kernel.requested_slots + + agent_candidates = [ + agent + for agent in possible_agents + if agent.available_slots - agent.occupied_slots >= requested_slots + ] + + if not agent_candidates: + return None + + resource_priorities = sort_requested_slots_by_priority( + requested_slots, self.agent_selection_resource_priority + ) + + # Note that ROUNDROBIN is not working with the multi-node multi-container session. + # It assumes the pending session type is single-node session. + # Otherwise, it will use 'Dispersed' strategy as default strategy. + if ( + agent_selection_strategy == AgentSelectionStrategy.ROUNDROBIN + and type(pending_session_or_kernel) is KernelInfo + ): + agent_selection_strategy = AgentSelectionStrategy.DISPERSED + + match agent_selection_strategy: + case AgentSelectionStrategy.ROUNDROBIN: + assert type(pending_session_or_kernel) is SessionRow + assert roundrobin_context is not None + sched_ctx = roundrobin_context.sched_ctx + sgroup_name = roundrobin_context.sgroup_name + requested_architecture = get_requested_architecture(pending_session_or_kernel) + + rr_state: RoundRobinState | None = ( + await sched_ctx.registry.shared_config.get_roundrobin_state( + sgroup_name, requested_architecture + ) + ) + + if rr_state is None: + agent_idx = 0 + else: + agent_idx = rr_state.next_index % len(possible_agents) + + # This logic assumes that the list of possible agents is not changed. + # If the list of possible agents is changed, the next agent will be selected at random by agent_idx. + # In this case, we will just use the agent_idx for the simplicity. + chosen_agent = possible_agents[agent_idx] + + rr_state = RoundRobinState((agent_idx + 1) % len(possible_agents)) + + await sched_ctx.registry.shared_config.put_roundrobin_state( + sgroup_name, requested_architecture, rr_state + ) + case AgentSelectionStrategy.LEGACY: + chosen_agent = max( + possible_agents, + key=lambda agent: [ + -get_num_extras(agent, requested_slots), + *[ + agent.available_slots.get(key, -sys.maxsize) + for key in resource_priorities + ], + ], + ) + case AgentSelectionStrategy.CONCENTRATED: + chosen_agent = min( + possible_agents, + key=lambda agent: [ + get_num_extras(agent, requested_slots), + *[ + (agent.available_slots - agent.occupied_slots).get(key, sys.maxsize) + for key in resource_priorities + ], + ], + ) + case AgentSelectionStrategy.DISPERSED | _: + chosen_agent = max( + possible_agents, + key=lambda agent: [ + -get_num_extras(agent, requested_slots), + *[ + (agent.available_slots - agent.occupied_slots).get(key, -sys.maxsize) + for key in resource_priorities + ], + ], + ) + + return chosen_agent.id diff --git a/src/ai/backend/manager/scheduler/utils.py b/src/ai/backend/manager/scheduler/utils.py index ce32a98df20..98fe49d944c 100644 --- a/src/ai/backend/manager/scheduler/utils.py +++ b/src/ai/backend/manager/scheduler/utils.py @@ -1,18 +1,12 @@ import sys from decimal import Decimal -from typing import Optional, Sequence from ai.backend.common.types import ( - AgentId, - AgentSelectionStrategy, ResourceSlot, - RoundRobinContext, - RoundRobinState, ) from ai.backend.manager.api.exceptions import GenericBadRequest from ai.backend.manager.models.agent import AgentRow from ai.backend.manager.models.session import SessionRow -from ai.backend.manager.scheduler.types import KernelInfo def get_slot_index(slotname: str, agent_selection_resource_priority: list[str]) -> int: @@ -65,97 +59,3 @@ def get_requested_architecture(sess_ctx: SessionRow) -> str: "Cannot assign multiple kernels with different architectures' single node session", ) return requested_architectures.pop() - - -async def select_agent( - possible_agents: Sequence[AgentRow], - pending_session_or_kernel: SessionRow | KernelInfo, - agent_selection_strategy: AgentSelectionStrategy, - agent_selection_resource_priority: list[str], - roundrobin_context: Optional[RoundRobinContext] = None, -) -> Optional[AgentId]: - requested_slots = pending_session_or_kernel.requested_slots - - agent_candidates = [ - agent - for agent in possible_agents - if agent.available_slots - agent.occupied_slots >= requested_slots - ] - - if not agent_candidates: - return None - - resource_priorities = sort_requested_slots_by_priority( - requested_slots, agent_selection_resource_priority - ) - - # Note that ROUNDROBIN is not working with the multi-node multi-container session. - # It assumes the pending session type is single-node session. - # Otherwise, it will use 'Dispersed' strategy as default strategy. - if ( - agent_selection_strategy == AgentSelectionStrategy.ROUNDROBIN - and type(pending_session_or_kernel) is KernelInfo - ): - agent_selection_strategy = AgentSelectionStrategy.DISPERSED - - match agent_selection_strategy: - case AgentSelectionStrategy.ROUNDROBIN: - assert type(pending_session_or_kernel) is SessionRow - assert roundrobin_context is not None - sched_ctx = roundrobin_context.sched_ctx - sgroup_name = roundrobin_context.sgroup_name - requested_architecture = get_requested_architecture(pending_session_or_kernel) - - rr_state: RoundRobinState | None = ( - await sched_ctx.registry.shared_config.get_roundrobin_state( - sgroup_name, requested_architecture - ) - ) - - if rr_state is None: - agent_idx = 0 - else: - agent_idx = rr_state.next_index % len(possible_agents) - - # This logic assumes that the list of possible agents is not changed. - # If the list of possible agents is changed, the next agent will be selected at random by agent_idx. - # In this case, we will just use the agent_idx for the simplicity. - chosen_agent = possible_agents[agent_idx] - - rr_state = RoundRobinState((agent_idx + 1) % len(possible_agents)) - - await sched_ctx.registry.shared_config.put_roundrobin_state( - sgroup_name, requested_architecture, rr_state - ) - case AgentSelectionStrategy.LEGACY: - chosen_agent = max( - possible_agents, - key=lambda agent: [ - -get_num_extras(agent, requested_slots), - *[agent.available_slots.get(key, -sys.maxsize) for key in resource_priorities], - ], - ) - case AgentSelectionStrategy.CONCENTRATED: - chosen_agent = min( - possible_agents, - key=lambda agent: [ - get_num_extras(agent, requested_slots), - *[ - (agent.available_slots - agent.occupied_slots).get(key, sys.maxsize) - for key in resource_priorities - ], - ], - ) - case AgentSelectionStrategy.DISPERSED | _: - chosen_agent = max( - possible_agents, - key=lambda agent: [ - -get_num_extras(agent, requested_slots), - *[ - (agent.available_slots - agent.occupied_slots).get(key, -sys.maxsize) - for key in resource_priorities - ], - ], - ) - - return chosen_agent.id