Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Oct 26, 2023
1 parent 5316e98 commit 3f8d232
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 211 deletions.
4 changes: 4 additions & 0 deletions src/ai/backend/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,10 @@ def as_trafaret(cls) -> t.Trafaret:
)


# This is only used when AgentSelectionStrategy is ROUNDROBIN.
RoundRobinContext = namedtuple("RoundRobinContext", ["sgroup_name", "sched_ctx"])


# States of the round-robin scheduler for each resource group and architecture.
RoundRobinStates: TypeAlias = dict[str, dict[str, RoundRobinState]]

Expand Down
37 changes: 18 additions & 19 deletions src/ai/backend/manager/scheduler/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
ClusterMode,
RedisConnectionInfo,
ResourceSlot,
RoundRobinContext,
SessionId,
aobject,
)
Expand Down Expand Up @@ -123,13 +124,14 @@ def load_scheduler(
name: str,
sgroup_opts: ScalingGroupOpts,
scheduler_config: dict[str, Any],
agent_selection_resource_priority: list[str],
) -> AbstractScheduler:
entry_prefix = "backendai_scheduler_v10"
for entrypoint in scan_entrypoints(entry_prefix):
if entrypoint.name == name:
log.debug('loading scheduler plugin "{}" from {}', name, entrypoint.module)
scheduler_cls = entrypoint.load()
return scheduler_cls(sgroup_opts, scheduler_config)
return scheduler_cls(sgroup_opts, scheduler_config, agent_selection_resource_priority)
raise ImportError("Cannot load the scheduler plugin", name)


Expand Down Expand Up @@ -345,8 +347,18 @@ async def _load_scheduler(
global_scheduler_opts = self.shared_config["plugins"]["scheduler"].get(
scheduler_name, {}
)

agent_selection_resource_priority = self.local_config["manager"][
"agent-selection-resource-priority"
]

scheduler_specific_config = {**global_scheduler_opts, **sgroup_opts.config}
return load_scheduler(scheduler_name, sgroup_opts, scheduler_specific_config)
return load_scheduler(
scheduler_name,
sgroup_opts,
scheduler_specific_config,
agent_selection_resource_priority,
)

async def _schedule_in_sgroup(
self,
Expand Down Expand Up @@ -637,18 +649,13 @@ async def _update_session_status_data() -> None:
),
)

agent_selection_resource_priority = self.local_config["manager"][
"agent-selection-resource-priority"
]

if schedulable_sess.cluster_mode == ClusterMode.SINGLE_NODE:
await self._schedule_single_node_session(
sched_ctx,
scheduler,
sgroup_name,
candidate_agents,
schedulable_sess,
agent_selection_resource_priority,
check_results,
)
elif schedulable_sess.cluster_mode == ClusterMode.MULTI_NODE:
Expand All @@ -658,7 +665,6 @@ async def _update_session_status_data() -> None:
sgroup_name,
candidate_agents,
schedulable_sess,
agent_selection_resource_priority,
check_results,
)
else:
Expand Down Expand Up @@ -698,7 +704,6 @@ async def _schedule_single_node_session(
sgroup_name: str,
candidate_agents: Sequence[AgentRow],
sess_ctx: SessionRow,
agent_selection_resource_priority: list[str],
check_results: List[Tuple[str, Union[Exception, PredicateResult]]],
) -> None:
"""
Expand Down Expand Up @@ -774,11 +779,10 @@ async def _schedule_single_node_session(
cand_agent_id = await scheduler.assign_agent_for_session(
compatible_candidate_agents,
sess_ctx,
scheduler.sgroup_opts.agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
requested_architecture,
RoundRobinContext(
sgroup_name=sgroup_name,
sched_ctx=sched_ctx,
),
)
if cand_agent_id is None:
raise InstanceNotAvailable(
Expand Down Expand Up @@ -915,7 +919,6 @@ async def _schedule_multi_node_session(
sgroup_name: str,
candidate_agents: Sequence[AgentRow],
sess_ctx: SessionRow,
agent_selection_resource_priority: list[str],
check_results: List[Tuple[str, Union[Exception, PredicateResult]]],
) -> None:
"""
Expand Down Expand Up @@ -994,10 +997,6 @@ async def _schedule_multi_node_session(
agent_id = await scheduler.assign_agent_for_kernel(
available_candidate_agents,
kernel,
scheduler.sgroup_opts.agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
)
if agent_id is None:
raise InstanceNotAvailable(
Expand Down
48 changes: 16 additions & 32 deletions src/ai/backend/manager/scheduler/drf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
from ai.backend.common.types import (
AccessKey,
AgentId,
AgentSelectionStrategy,
ResourceSlot,
RoundRobinContext,
SessionId,
)
from ai.backend.manager.scheduler.utils import select_agent

from ..models import AgentRow, SessionRow
from ..models.scaling_group import ScalingGroupOpts
from .types import AbstractScheduler, KernelInfo, SchedulingContext
from .types import AbstractScheduler, KernelInfo

log = BraceStyleAdapter(logging.getLogger("ai.backend.manager.scheduler"))

Expand All @@ -29,8 +29,13 @@ class DRFScheduler(AbstractScheduler):
per_user_dominant_share: Dict[AccessKey, Decimal]
total_capacity: ResourceSlot

def __init__(self, sgroup_opts: ScalingGroupOpts, config: Mapping[str, Any]) -> None:
super().__init__(sgroup_opts, config)
def __init__(
self,
sgroup_opts: ScalingGroupOpts,
config: Mapping[str, Any],
agent_selection_resource_priority: list[str],
) -> None:
super().__init__(sgroup_opts, config, agent_selection_resource_priority)
self.per_user_dominant_share = defaultdict(lambda: Decimal(0))

def pick_session(
Expand Down Expand Up @@ -80,11 +85,7 @@ async def _assign_agent(
self,
possible_agents: Sequence[AgentRow],
pending_session_or_kernel: SessionRow | KernelInfo,
agent_selection_strategy: AgentSelectionStrategy,
agent_selection_resource_priority: list[str],
sgroup_name: Optional[str] = None,
sched_ctx: Optional[SchedulingContext] = None,
requested_architecture: Optional[str] = None,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
# If some predicate checks for a picked session fail,
# this method is NOT called at all for the picked session.
Expand All @@ -110,52 +111,35 @@ async def _assign_agent(
if self.per_user_dominant_share[access_key] < dominant_share_from_request:
self.per_user_dominant_share[access_key] = dominant_share_from_request

agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
possible_agents,
pending_session_or_kernel,
agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
requested_architecture,
roundrobin_context,
)

async def assign_agent_for_session(
self,
possible_agents: Sequence[AgentRow],
pending_session: SessionRow,
agent_selection_strategy: AgentSelectionStrategy,
agent_selection_resource_priority: list[str],
sgroup_name: Optional[str] = None,
sched_ctx: Optional[SchedulingContext] = None,
requested_architecture: Optional[str] = None,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
return await self._assign_agent(
possible_agents,
pending_session,
agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
requested_architecture,
roundrobin_context,
)

async def assign_agent_for_kernel(
self,
possible_agents: Sequence[AgentRow],
pending_kernel: KernelInfo,
agent_selection_strategy: AgentSelectionStrategy,
agent_selection_resource_priority: list[str],
sgroup_name: Optional[str] = None,
sched_ctx: Optional[SchedulingContext] = None,
requested_architecture: Optional[str] = None,
) -> Optional[AgentId]:
return await self._assign_agent(
possible_agents,
pending_kernel,
agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
requested_architecture,
)
56 changes: 20 additions & 36 deletions src/ai/backend/manager/scheduler/fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

from ai.backend.common.types import (
AgentId,
AgentSelectionStrategy,
ResourceSlot,
RoundRobinContext,
SessionId,
)
from ai.backend.manager.scheduler.utils import select_agent

from ..models import AgentRow, SessionRow
from .types import AbstractScheduler, KernelInfo, SchedulingContext
from .types import AbstractScheduler, KernelInfo


def get_num_extras(agent: AgentRow, requested_slots: ResourceSlot) -> int:
Expand Down Expand Up @@ -67,40 +67,32 @@ async def assign_agent_for_session(
self,
agents: Sequence[AgentRow],
pending_session: SessionRow,
agent_selection_strategy: AgentSelectionStrategy,
agent_selection_resource_priority: list[str],
sgroup_name: Optional[str] = None,
sched_ctx: Optional[SchedulingContext] = None,
requested_architecture: Optional[str] = None,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
agents,
pending_session,
agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
requested_architecture,
roundrobin_context,
)

async def assign_agent_for_kernel(
self,
agents: Sequence[AgentRow],
kernel_info: KernelInfo,
agent_selection_strategy: AgentSelectionStrategy,
agent_selection_resource_priority: list[str],
sgroup_name: Optional[str] = None,
sched_ctx: Optional[SchedulingContext] = None,
requested_architecture: Optional[str] = None,
pending_kernel: KernelInfo,
) -> Optional[AgentId]:
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
agents,
kernel_info,
pending_kernel,
agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
requested_architecture,
)


Expand All @@ -120,38 +112,30 @@ async def assign_agent_for_session(
self,
agents: Sequence[AgentRow],
pending_session: SessionRow,
agent_selection_strategy: AgentSelectionStrategy,
agent_selection_resource_priority: list[str],
sgroup_name: Optional[str] = None,
sched_ctx: Optional[SchedulingContext] = None,
requested_architecture: Optional[str] = None,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
agents,
pending_session,
agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
requested_architecture,
roundrobin_context,
)

async def assign_agent_for_kernel(
self,
agents: Sequence[AgentRow],
pending_kernel: KernelInfo,
agent_selection_strategy: AgentSelectionStrategy,
agent_selection_resource_priority: list[str],
sgroup_name: Optional[str] = None,
sched_ctx: Optional[SchedulingContext] = None,
requested_architecture: Optional[str] = None,
) -> Optional[AgentId]:
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
agent_selection_resource_priority = self.agent_selection_resource_priority

return await select_agent(
agents,
pending_kernel,
agent_selection_strategy,
agent_selection_resource_priority,
sgroup_name,
sched_ctx,
requested_architecture,
)
Loading

0 comments on commit 3f8d232

Please sign in to comment.