Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Oct 26, 2023
1 parent 0c96c0e commit fec83ce
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 132 deletions.
8 changes: 1 addition & 7 deletions src/ai/backend/manager/scheduler/drf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
RoundRobinContext,
SessionId,
)
from ai.backend.manager.scheduler.utils import select_agent

from ..models import AgentRow, SessionRow
from ..models.scaling_group import ScalingGroupOpts
Expand Down Expand Up @@ -111,14 +110,9 @@ async def _assign_agent(
if self.per_user_dominant_share[access_key] < dominant_share_from_request:
self.per_user_dominant_share[access_key] = dominant_share_from_request

agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
return await self.select_agent(
possible_agents,
pending_session_or_kernel,
agent_selection_strategy,
agent_selection_resource_priority,
roundrobin_context,
)

Expand Down
29 changes: 4 additions & 25 deletions src/ai/backend/manager/scheduler/fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
RoundRobinContext,
SessionId,
)
from ai.backend.manager.scheduler.utils import select_agent

from ..models import AgentRow, SessionRow
from .types import AbstractScheduler, KernelInfo
Expand Down Expand Up @@ -69,14 +68,9 @@ async def assign_agent_for_session(
pending_session: SessionRow,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
return await self.select_agent(
agents,
pending_session,
agent_selection_strategy,
agent_selection_resource_priority,
roundrobin_context,
)

Expand All @@ -85,14 +79,9 @@ async def assign_agent_for_kernel(
agents: Sequence[AgentRow],
pending_kernel: KernelInfo,
) -> Optional[AgentId]:
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
return await self.select_agent(
agents,
pending_kernel,
agent_selection_strategy,
agent_selection_resource_priority,
)


Expand All @@ -114,14 +103,9 @@ async def assign_agent_for_session(
pending_session: SessionRow,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
return await self.select_agent(
agents,
pending_session,
agent_selection_strategy,
agent_selection_resource_priority,
roundrobin_context,
)

Expand All @@ -130,12 +114,7 @@ async def assign_agent_for_kernel(
agents: Sequence[AgentRow],
pending_kernel: KernelInfo,
) -> Optional[AgentId]:
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
return await self.select_agent(
agents,
pending_kernel,
agent_selection_strategy,
agent_selection_resource_priority,
)
108 changes: 108 additions & 0 deletions src/ai/backend/manager/scheduler/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import sys
import uuid
from abc import ABCMeta, abstractmethod
from datetime import datetime
Expand Down Expand Up @@ -29,16 +30,23 @@
from ai.backend.common.types import (
AccessKey,
AgentId,
AgentSelectionStrategy,
ClusterMode,
KernelId,
ResourceSlot,
RoundRobinContext,
RoundRobinState,
SessionId,
SessionTypes,
SlotName,
SlotTypes,
VFolderMount,
)
from ai.backend.manager.scheduler.utils import (
get_num_extras,
get_requested_architecture,
sort_requested_slots_by_priority,
)

from ..defs import DEFAULT_ROLE
from ..models import AgentRow, KernelRow, SessionRow, kernels, keypairs
Expand Down Expand Up @@ -459,3 +467,103 @@ async def assign_agent_for_kernel(
This may be called multiple times for multi-node multi-container sessions.
"""
return None

async def select_agent(
self,
possible_agents: Sequence[AgentRow],
pending_session_or_kernel: SessionRow | KernelInfo,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
"""
Select an agent for the pending session or kernel.
"""

agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
requested_slots = pending_session_or_kernel.requested_slots

agent_candidates = [
agent
for agent in possible_agents
if agent.available_slots - agent.occupied_slots >= requested_slots
]

if not agent_candidates:
return None

resource_priorities = sort_requested_slots_by_priority(
requested_slots, self.agent_selection_resource_priority
)

# Note that ROUNDROBIN is not working with the multi-node multi-container session.
# It assumes the pending session type is single-node session.
# Otherwise, it will use 'Dispersed' strategy as default strategy.
if (
agent_selection_strategy == AgentSelectionStrategy.ROUNDROBIN
and type(pending_session_or_kernel) is KernelInfo
):
agent_selection_strategy = AgentSelectionStrategy.DISPERSED

match agent_selection_strategy:
case AgentSelectionStrategy.ROUNDROBIN:
assert type(pending_session_or_kernel) is SessionRow
assert roundrobin_context is not None
sched_ctx = roundrobin_context.sched_ctx
sgroup_name = roundrobin_context.sgroup_name
requested_architecture = get_requested_architecture(pending_session_or_kernel)

rr_state: RoundRobinState | None = (
await sched_ctx.registry.shared_config.get_roundrobin_state(
sgroup_name, requested_architecture
)
)

if rr_state is None:
agent_idx = 0
else:
agent_idx = rr_state.next_index % len(possible_agents)

# This logic assumes that the list of possible agents is not changed.
# If the list of possible agents is changed, the next agent will be selected at random by agent_idx.
# In this case, we will just use the agent_idx for the simplicity.
chosen_agent = possible_agents[agent_idx]

rr_state = RoundRobinState((agent_idx + 1) % len(possible_agents))

await sched_ctx.registry.shared_config.put_roundrobin_state(
sgroup_name, requested_architecture, rr_state
)
case AgentSelectionStrategy.LEGACY:
chosen_agent = max(
possible_agents,
key=lambda agent: [
-get_num_extras(agent, requested_slots),
*[
agent.available_slots.get(key, -sys.maxsize)
for key in resource_priorities
],
],
)
case AgentSelectionStrategy.CONCENTRATED:
chosen_agent = min(
possible_agents,
key=lambda agent: [
get_num_extras(agent, requested_slots),
*[
(agent.available_slots - agent.occupied_slots).get(key, sys.maxsize)
for key in resource_priorities
],
],
)
case AgentSelectionStrategy.DISPERSED | _:
chosen_agent = max(
possible_agents,
key=lambda agent: [
-get_num_extras(agent, requested_slots),
*[
(agent.available_slots - agent.occupied_slots).get(key, -sys.maxsize)
for key in resource_priorities
],
],
)

return chosen_agent.id
100 changes: 0 additions & 100 deletions src/ai/backend/manager/scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import sys
from decimal import Decimal
from typing import Optional, Sequence

from ai.backend.common.types import (
AgentId,
AgentSelectionStrategy,
ResourceSlot,
RoundRobinContext,
RoundRobinState,
)
from ai.backend.manager.api.exceptions import GenericBadRequest
from ai.backend.manager.models.agent import AgentRow
from ai.backend.manager.models.session import SessionRow
from ai.backend.manager.scheduler.types import KernelInfo


def get_slot_index(slotname: str, agent_selection_resource_priority: list[str]) -> int:
Expand Down Expand Up @@ -65,97 +59,3 @@ def get_requested_architecture(sess_ctx: SessionRow) -> str:
"Cannot assign multiple kernels with different architectures' single node session",
)
return requested_architectures.pop()


async def select_agent(
possible_agents: Sequence[AgentRow],
pending_session_or_kernel: SessionRow | KernelInfo,
agent_selection_strategy: AgentSelectionStrategy,
agent_selection_resource_priority: list[str],
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
requested_slots = pending_session_or_kernel.requested_slots

agent_candidates = [
agent
for agent in possible_agents
if agent.available_slots - agent.occupied_slots >= requested_slots
]

if not agent_candidates:
return None

resource_priorities = sort_requested_slots_by_priority(
requested_slots, agent_selection_resource_priority
)

# Note that ROUNDROBIN is not working with the multi-node multi-container session.
# It assumes the pending session type is single-node session.
# Otherwise, it will use 'Dispersed' strategy as default strategy.
if (
agent_selection_strategy == AgentSelectionStrategy.ROUNDROBIN
and type(pending_session_or_kernel) is KernelInfo
):
agent_selection_strategy = AgentSelectionStrategy.DISPERSED

match agent_selection_strategy:
case AgentSelectionStrategy.ROUNDROBIN:
assert type(pending_session_or_kernel) is SessionRow
assert roundrobin_context is not None
sched_ctx = roundrobin_context.sched_ctx
sgroup_name = roundrobin_context.sgroup_name
requested_architecture = get_requested_architecture(pending_session_or_kernel)

rr_state: RoundRobinState | None = (
await sched_ctx.registry.shared_config.get_roundrobin_state(
sgroup_name, requested_architecture
)
)

if rr_state is None:
agent_idx = 0
else:
agent_idx = rr_state.next_index % len(possible_agents)

# This logic assumes that the list of possible agents is not changed.
# If the list of possible agents is changed, the next agent will be selected at random by agent_idx.
# In this case, we will just use the agent_idx for the simplicity.
chosen_agent = possible_agents[agent_idx]

rr_state = RoundRobinState((agent_idx + 1) % len(possible_agents))

await sched_ctx.registry.shared_config.put_roundrobin_state(
sgroup_name, requested_architecture, rr_state
)
case AgentSelectionStrategy.LEGACY:
chosen_agent = max(
possible_agents,
key=lambda agent: [
-get_num_extras(agent, requested_slots),
*[agent.available_slots.get(key, -sys.maxsize) for key in resource_priorities],
],
)
case AgentSelectionStrategy.CONCENTRATED:
chosen_agent = min(
possible_agents,
key=lambda agent: [
get_num_extras(agent, requested_slots),
*[
(agent.available_slots - agent.occupied_slots).get(key, sys.maxsize)
for key in resource_priorities
],
],
)
case AgentSelectionStrategy.DISPERSED | _:
chosen_agent = max(
possible_agents,
key=lambda agent: [
-get_num_extras(agent, requested_slots),
*[
(agent.available_slots - agent.occupied_slots).get(key, -sys.maxsize)
for key in resource_priorities
],
],
)

return chosen_agent.id

0 comments on commit fec83ce

Please sign in to comment.