Skip to content

Commit

Permalink
Distinguish compatible_agents and possible_agents
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Mar 27, 2024
1 parent 54ab1a3 commit 3819025
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 26 deletions.
20 changes: 14 additions & 6 deletions src/ai/backend/manager/scheduler/drf.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,25 @@ def pick_session(

async def _assign_agent(
self,
possible_agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_session_or_kernel: SessionRow | KernelRow,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
# If some predicate checks for a picked session fail,
# this method is NOT called at all for the picked session.
# In such case, we just skip updating self.per_user_dominant_share state
# and the scheduler dispatcher continues to pick another session within the same scaling group.

access_key = pending_session_or_kernel.access_key
requested_slots = pending_session_or_kernel.requested_slots

if not [
agent
for agent in compatible_agents
if agent.available_slots - agent.occupied_slots >= requested_slots
]:
return None

# We have one or more agents that can host the picked session.

# Update the dominant share.
Expand All @@ -111,30 +119,30 @@ async def _assign_agent(
self.per_user_dominant_share[access_key] = dominant_share_from_request

return await self.select_agent(
possible_agents,
compatible_agents,
pending_session_or_kernel,
False,
roundrobin_context,
)

async def assign_agent_for_session(
self,
possible_agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_session: SessionRow,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
return await self._assign_agent(
possible_agents,
compatible_agents,
pending_session,
roundrobin_context,
)

async def assign_agent_for_kernel(
self,
possible_agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_kernel: KernelRow,
) -> Optional[AgentId]:
return await self._assign_agent(
possible_agents,
compatible_agents,
pending_kernel,
)
16 changes: 8 additions & 8 deletions src/ai/backend/manager/scheduler/fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,24 @@ def pick_session(

async def assign_agent_for_session(
self,
agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_session: SessionRow,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
return await self.select_agent(
agents,
compatible_agents,
pending_session,
True,
roundrobin_context,
)

async def assign_agent_for_kernel(
self,
agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_kernel: KernelRow,
) -> Optional[AgentId]:
return await self.select_agent(
agents,
compatible_agents,
pending_kernel,
True,
)
Expand All @@ -99,24 +99,24 @@ def pick_session(

async def assign_agent_for_session(
self,
agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_session: SessionRow,
roundrobin_context: Optional[RoundRobinContext] = None,
) -> Optional[AgentId]:
return await self.select_agent(
agents,
compatible_agents,
pending_session,
True,
roundrobin_context,
)

async def assign_agent_for_kernel(
self,
agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_kernel: KernelRow,
) -> Optional[AgentId]:
return await self.select_agent(
agents,
compatible_agents,
pending_kernel,
True,
)
24 changes: 12 additions & 12 deletions src/ai/backend/manager/scheduler/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def pick_session(
@abstractmethod
async def assign_agent_for_session(
self,
possible_agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_session: SessionRow,
roundrobin_context: Optional[
RoundRobinContext
Expand All @@ -459,7 +459,7 @@ async def assign_agent_for_session(
@abstractmethod
async def assign_agent_for_kernel(
self,
possible_agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_kernel: KernelRow,
) -> Optional[AgentId]:
"""
Expand All @@ -470,7 +470,7 @@ async def assign_agent_for_kernel(

async def select_agent(
self,
possible_agents: Sequence[AgentRow],
compatible_agents: Sequence[AgentRow],
pending_session_or_kernel: SessionRow | KernelRow,
use_num_extras: bool,
roundrobin_context: Optional[RoundRobinContext] = None,
Expand All @@ -482,13 +482,13 @@ async def select_agent(
agent_selection_strategy = self.sgroup_opts.agent_selection_strategy
requested_slots = pending_session_or_kernel.requested_slots

agent_candidates = [
possible_candidates = [
agent
for agent in possible_agents
for agent in compatible_agents
if agent.available_slots - agent.occupied_slots >= requested_slots
]

if not agent_candidates:
if not possible_candidates:
return None

resource_priorities = sort_requested_slots_by_priority(
Expand Down Expand Up @@ -521,21 +521,21 @@ async def select_agent(
if rr_state is None:
agent_idx = 0
else:
agent_idx = rr_state.next_index % len(possible_agents)
agent_idx = rr_state.next_index % len(possible_candidates)

# This logic assumes that the list of possible agents is not changed.
# If the list of possible agents is changed, the next agent will be selected at random by agent_idx.
# In this case, we will just use the agent_idx for the simplicity.
chosen_agent = possible_agents[agent_idx]
chosen_agent = possible_candidates[agent_idx]

rr_state = RoundRobinState((agent_idx + 1) % len(possible_agents))
rr_state = RoundRobinState((agent_idx + 1) % len(possible_candidates))

await sched_ctx.registry.shared_config.put_roundrobin_state(
sgroup_name, requested_architecture, rr_state
)
case AgentSelectionStrategy.LEGACY:
chosen_agent = max(
possible_agents,
possible_candidates,
key=lambda agent: [
-get_num_extras(agent, requested_slots) if use_num_extras else 0,
*[
Expand All @@ -546,7 +546,7 @@ async def select_agent(
)
case AgentSelectionStrategy.CONCENTRATED:
chosen_agent = min(
possible_agents,
possible_candidates,
key=lambda agent: [
get_num_extras(agent, requested_slots) if use_num_extras else 0,
*[
Expand All @@ -557,7 +557,7 @@ async def select_agent(
)
case AgentSelectionStrategy.DISPERSED | _:
chosen_agent = max(
possible_agents,
possible_candidates,
key=lambda agent: [
-get_num_extras(agent, requested_slots) if use_num_extras else 0,
*[
Expand Down

0 comments on commit 3819025

Please sign in to comment.