Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement GPU config with get_attached_devices() (#3275) #3297

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3275.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add several commonly used GPU configuration environment variables defined in containers by default: `GPU_TYPE`, `GPU_COUNT`, `GPU_CONFIG`, `GPU_MODEL_NAME` and `TF_GPU_MEMORY_ALLOC`
1 change: 1 addition & 0 deletions src/ai/backend/accelerator/cuda_open/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ async def generate_resource_data(
data["CUDA_GLOBAL_DEVICE_IDS"] = ",".join(
f"{local_idx}:{global_id}" for local_idx, global_id in enumerate(active_device_ids)
)
data["CUDA_RESOURCE_VIRTUALIZED"] = "0"
return data

async def get_docker_networks(
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/accelerator/mock/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def init(self, context: Optional[Any] = None) -> None:
# Read the configurations.
raw_unit_mem = self.plugin_config.get("unit_mem")
if raw_unit_mem is not None:
unit_mem = int(raw_unit_mem)
unit_mem = int(BinarySize.from_str(raw_unit_mem))
if unit_mem < MIN_MEM_UNIT:
raise InitializationError(f"{self.key} plugin: too small unit_mem")
self._unit_mem = unit_mem
Expand Down
38 changes: 37 additions & 1 deletion src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
AcceleratorMetadata,
AgentId,
AutoPullBehavior,
BinarySize,
ClusterInfo,
ClusterSSHPortMapping,
CommitStatus,
Expand Down Expand Up @@ -1822,7 +1823,7 @@ async def create_kernel(
restarting=restarting,
cluster_ssh_port_mapping=cluster_info.get("cluster_ssh_port_mapping"),
)
environ: MutableMapping[str, str] = {**kernel_config["environ"]}
environ: dict[str, str] = {**kernel_config["environ"]}

# Inject Backend.AI-intrinsic env-variables for gosu
if KernelFeatures.UID_MATCH in ctx.kernel_features:
Expand Down Expand Up @@ -1936,6 +1937,40 @@ async def create_kernel(
devices = await computer_ctx.instance.get_attached_devices(device_alloc)
attached_devices[dev_name] = devices

# Generate GPU config env-vars
has_gpu_config = False
for dev_name, attached_accelerators in attached_devices.items():
if has_gpu_config:
# Generate GPU config for the first-seen accelerator only
continue
if dev_name in (DeviceName("cpu"), DeviceName("mem")):
# Skip intrinsic slots
continue
mem_per_device: list[str] = []
mem_per_device_tf: list[str] = []
# proc_items = [] # (unused yet)
for local_idx, dev_info in enumerate(attached_accelerators):
mem = BinarySize(dev_info["data"].get("mem", 0))
mem_per_device.append(f"{local_idx}:{mem:s}")
mem_in_megibytes = f"{mem // (2**20):d}"
mem_per_device_tf.append(f"{local_idx}:{mem_in_megibytes}")
# The processor count is not used yet!
# NOTE: Keep backward-compatibility with the CUDA plugin ("smp")
# proc = dev_info["data"].get("proc", dev_info["data"].get("smp", 0))
# proc_items.append(f"{local_idx}:{proc}")
if attached_accelerators:
# proc_str = ",".join(proc_items) # (unused yet)
environ["GPU_TYPE"] = dev_name
environ["GPU_MODEL_NAME"] = attached_accelerators[0]["model_name"]
environ["GPU_CONFIG"] = ",".join(mem_per_device)
environ["TF_GPU_MEMORY_ALLOC"] = ",".join(mem_per_device_tf)
environ["GPU_COUNT"] = str(len(attached_accelerators))
environ["N_GPUS"] = str(len(attached_accelerators))
has_gpu_config = True
if not has_gpu_config:
environ["GPU_COUNT"] = "0"
environ["N_GPUS"] = "0"

exposed_ports = [2000, 2001]
service_ports: List[ServicePort] = []
port_map: Dict[str, ServicePort] = {}
Expand Down Expand Up @@ -2050,6 +2085,7 @@ async def create_kernel(

# Store information required for restarts.
# NOTE: kconfig may be updated after restarts.
kernel_config["environ"] = environ
resource_spec.freeze()
await self.restart_kernel__store_config(
kernel_id,
Expand Down
24 changes: 12 additions & 12 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import aiotools
import pkg_resources
import zmq
import zmq.asyncio
from aiodocker.docker import Docker, DockerContainer
from aiodocker.exceptions import DockerError
from aiodocker.types import PortInfo
Expand Down Expand Up @@ -661,8 +662,8 @@ def _write_user_bootstrap_script():
with StringIO() as buf:
resource_spec.write_to_file(buf)
for dev_type, device_alloc in resource_spec.allocations.items():
computer_self = self.computers[dev_type]
kvpairs = await computer_self.instance.generate_resource_data(device_alloc)
device_plugin = self.computers[dev_type].instance
kvpairs = await device_plugin.generate_resource_data(device_alloc)
for k, v in kvpairs.items():
buf.write(f"{k}={v}\n")
await loop.run_in_executor(
Expand All @@ -671,18 +672,17 @@ def _write_user_bootstrap_script():
buf.getvalue().encode("utf8"),
)

docker_creds = self.internal_data.get("docker_credentials")
if docker_creds:
await loop.run_in_executor(
None,
(self.config_dir / "docker-creds.json").write_text,
json.dumps(docker_creds),
)

# TODO: refactor out dotfiles/sshkey initialization to the base agent?

shutil.copyfile(self.config_dir / "environ.txt", self.config_dir / "environ_base.txt")
shutil.copyfile(self.config_dir / "resource.txt", self.config_dir / "resource_base.txt")

# TODO: refactor out dotfiles/sshkey initialization to the base agent?
docker_creds = self.internal_data.get("docker_credentials")
if docker_creds:
await loop.run_in_executor(
None,
(self.config_dir / "docker-creds.json").write_text,
json.dumps(docker_creds),
)
# Create SSH keypair only if ssh_keypair internal_data exists and
# /home/work/.ssh folder is not mounted.
if self.internal_data.get("ssh_keypair"):
Expand Down
44 changes: 24 additions & 20 deletions src/ai/backend/agent/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@
import textwrap
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections.abc import (
Collection,
Iterator,
Mapping,
MutableMapping,
Sequence,
)
from decimal import Decimal
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Collection,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Sequence,
Set,
TextIO,
Tuple,
Type,
TypeAlias,
cast,
)

Expand Down Expand Up @@ -66,8 +66,9 @@

from .agent import ComputerContext

log = BraceStyleAdapter(logging.getLogger(__spec__.name))
DeviceAllocation: TypeAlias = Mapping[SlotName, Mapping[DeviceId, Decimal]]

log = BraceStyleAdapter(logging.getLogger(__spec__.name))
known_slot_types: Mapping[SlotName, SlotTypes] = {}


Expand All @@ -92,7 +93,7 @@ class KernelResourceSpec:
scratch_disk_size: int
"""The size of scratch disk. (not implemented yet)"""

mounts: List["Mount"] = attrs.Factory(list)
mounts: list["Mount"] = attrs.Factory(list)
"""The mounted vfolder list."""

def freeze(self) -> None:
Expand Down Expand Up @@ -260,8 +261,8 @@ def __eq__(self, __o: object) -> bool:

class AbstractComputePlugin(AbstractPlugin, metaclass=ABCMeta):
key: DeviceName = DeviceName("accelerator")
slot_types: Sequence[Tuple[SlotName, SlotTypes]]
exclusive_slot_types: Set[str]
slot_types: Sequence[tuple[SlotName, SlotTypes]]
exclusive_slot_types: set[str]

@abstractmethod
def get_metadata(self) -> AcceleratorMetadata:
Expand Down Expand Up @@ -355,7 +356,7 @@ async def get_hooks(self, distro: str, arch: str) -> Sequence[Path]:
async def generate_docker_args(
self,
docker: aiodocker.docker.Docker,
device_alloc,
device_alloc: DeviceAllocation,
) -> Mapping[str, Any]:
"""
When starting a new container, generate device-specific options for the
Expand All @@ -364,7 +365,7 @@ async def generate_docker_args(
"""
return {}

async def generate_resource_data(self, device_alloc) -> Mapping[str, str]:
async def generate_resource_data(self, device_alloc: DeviceAllocation) -> Mapping[str, str]:
"""
Generate extra resource.txt key-value pair sets to be used by the plugin's
own hook libraries in containers.
Expand All @@ -386,7 +387,7 @@ async def restore_from_container(
@abstractmethod
async def get_attached_devices(
self,
device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]],
device_alloc: DeviceAllocation,
) -> Sequence[DeviceModelInfo]:
"""
Make up container-attached device information with allocated device id.
Expand All @@ -398,8 +399,9 @@ async def get_node_hwinfo(self) -> HardwareMetadata:

@abstractmethod
async def get_docker_networks(
self, device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]]
) -> List[str]:
self,
device_alloc: DeviceAllocation,
) -> list[str]:
"""
Returns reference string (e.g. Id, name, ...) of docker networks
to attach to container for accelerator to work properly.
Expand All @@ -408,8 +410,10 @@ async def get_docker_networks(

@abstractmethod
async def generate_mounts(
self, source_path: Path, device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]]
) -> List[MountInfo]:
self,
source_path: Path,
device_alloc: DeviceAllocation,
) -> list[MountInfo]:
"""
Populates additional files/directories under `source_path`
to mount to container and returns `MountInfo`.
Expand All @@ -427,7 +431,7 @@ def discover_plugins(
plugin_group: str,
allowlist: Optional[set[str]] = None,
blocklist: Optional[set[str]] = None,
) -> Iterator[Tuple[str, Type[AbstractComputePlugin]]]:
) -> Iterator[tuple[str, Type[AbstractComputePlugin]]]:
scanned_plugins = [*super().discover_plugins(plugin_group, allowlist, blocklist)]

def accel_lt_intrinsic(item):
Expand Down
7 changes: 6 additions & 1 deletion src/ai/backend/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,10 +1042,15 @@ class ClusterSSHKeyPair(TypedDict):
private_key: str # PEM-encoded string


class ComputedDeviceCapacity(TypedDict):
mem: NotRequired[BinarySize]
proc: NotRequired[int]


class DeviceModelInfo(TypedDict):
device_id: DeviceId | str
model_name: str
data: Mapping[str, Any]
data: ComputedDeviceCapacity # name kept for backward compat with plugins


class KernelCreationResult(TypedDict):
Expand Down
Loading