Skip to content

Commit

Permalink
feat: Implement GPU config with get_attached_devices() (#3275) (#3297)
Browse files Browse the repository at this point in the history
Co-authored-by: Joongi Kim <[email protected]>
  • Loading branch information
lablup-octodog and achimnol authored Dec 24, 2024
1 parent 874799e commit 650e052
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 35 deletions.
1 change: 1 addition & 0 deletions changes/3275.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add several commonly used GPU configuration environment variables defined in containers by default: `GPU_TYPE`, `GPU_COUNT`, `GPU_CONFIG`, `GPU_MODEL_NAME` and `TF_GPU_MEMORY_ALLOC`
1 change: 1 addition & 0 deletions src/ai/backend/accelerator/cuda_open/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ async def generate_resource_data(
data["CUDA_GLOBAL_DEVICE_IDS"] = ",".join(
f"{local_idx}:{global_id}" for local_idx, global_id in enumerate(active_device_ids)
)
data["CUDA_RESOURCE_VIRTUALIZED"] = "0"
return data

async def get_docker_networks(
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/accelerator/mock/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def init(self, context: Optional[Any] = None) -> None:
# Read the configurations.
raw_unit_mem = self.plugin_config.get("unit_mem")
if raw_unit_mem is not None:
unit_mem = int(raw_unit_mem)
unit_mem = int(BinarySize.from_str(raw_unit_mem))
if unit_mem < MIN_MEM_UNIT:
raise InitializationError(f"{self.key} plugin: too small unit_mem")
self._unit_mem = unit_mem
Expand Down
38 changes: 37 additions & 1 deletion src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
AcceleratorMetadata,
AgentId,
AutoPullBehavior,
BinarySize,
ClusterInfo,
ClusterSSHPortMapping,
CommitStatus,
Expand Down Expand Up @@ -1822,7 +1823,7 @@ async def create_kernel(
restarting=restarting,
cluster_ssh_port_mapping=cluster_info.get("cluster_ssh_port_mapping"),
)
environ: MutableMapping[str, str] = {**kernel_config["environ"]}
environ: dict[str, str] = {**kernel_config["environ"]}

# Inject Backend.AI-intrinsic env-variables for gosu
if KernelFeatures.UID_MATCH in ctx.kernel_features:
Expand Down Expand Up @@ -1936,6 +1937,40 @@ async def create_kernel(
devices = await computer_ctx.instance.get_attached_devices(device_alloc)
attached_devices[dev_name] = devices

# Generate GPU config env-vars
has_gpu_config = False
for dev_name, attached_accelerators in attached_devices.items():
if has_gpu_config:
# Generate GPU config for the first-seen accelerator only
continue
if dev_name in (DeviceName("cpu"), DeviceName("mem")):
# Skip intrinsic slots
continue
mem_per_device: list[str] = []
mem_per_device_tf: list[str] = []
# proc_items = [] # (unused yet)
for local_idx, dev_info in enumerate(attached_accelerators):
mem = BinarySize(dev_info["data"].get("mem", 0))
mem_per_device.append(f"{local_idx}:{mem:s}")
mem_in_megibytes = f"{mem // (2**20):d}"
mem_per_device_tf.append(f"{local_idx}:{mem_in_megibytes}")
# The processor count is not used yet!
# NOTE: Keep backward-compatibility with the CUDA plugin ("smp")
# proc = dev_info["data"].get("proc", dev_info["data"].get("smp", 0))
# proc_items.append(f"{local_idx}:{proc}")
if attached_accelerators:
# proc_str = ",".join(proc_items) # (unused yet)
environ["GPU_TYPE"] = dev_name
environ["GPU_MODEL_NAME"] = attached_accelerators[0]["model_name"]
environ["GPU_CONFIG"] = ",".join(mem_per_device)
environ["TF_GPU_MEMORY_ALLOC"] = ",".join(mem_per_device_tf)
environ["GPU_COUNT"] = str(len(attached_accelerators))
environ["N_GPUS"] = str(len(attached_accelerators))
has_gpu_config = True
if not has_gpu_config:
environ["GPU_COUNT"] = "0"
environ["N_GPUS"] = "0"

exposed_ports = [2000, 2001]
service_ports: List[ServicePort] = []
port_map: Dict[str, ServicePort] = {}
Expand Down Expand Up @@ -2050,6 +2085,7 @@ async def create_kernel(

# Store information required for restarts.
# NOTE: kconfig may be updated after restarts.
kernel_config["environ"] = environ
resource_spec.freeze()
await self.restart_kernel__store_config(
kernel_id,
Expand Down
24 changes: 12 additions & 12 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import aiotools
import pkg_resources
import zmq
import zmq.asyncio
from aiodocker.docker import Docker, DockerContainer
from aiodocker.exceptions import DockerError
from aiodocker.types import PortInfo
Expand Down Expand Up @@ -661,8 +662,8 @@ def _write_user_bootstrap_script():
with StringIO() as buf:
resource_spec.write_to_file(buf)
for dev_type, device_alloc in resource_spec.allocations.items():
computer_self = self.computers[dev_type]
kvpairs = await computer_self.instance.generate_resource_data(device_alloc)
device_plugin = self.computers[dev_type].instance
kvpairs = await device_plugin.generate_resource_data(device_alloc)
for k, v in kvpairs.items():
buf.write(f"{k}={v}\n")
await loop.run_in_executor(
Expand All @@ -671,18 +672,17 @@ def _write_user_bootstrap_script():
buf.getvalue().encode("utf8"),
)

docker_creds = self.internal_data.get("docker_credentials")
if docker_creds:
await loop.run_in_executor(
None,
(self.config_dir / "docker-creds.json").write_text,
json.dumps(docker_creds),
)

# TODO: refactor out dotfiles/sshkey initialization to the base agent?

shutil.copyfile(self.config_dir / "environ.txt", self.config_dir / "environ_base.txt")
shutil.copyfile(self.config_dir / "resource.txt", self.config_dir / "resource_base.txt")

# TODO: refactor out dotfiles/sshkey initialization to the base agent?
docker_creds = self.internal_data.get("docker_credentials")
if docker_creds:
await loop.run_in_executor(
None,
(self.config_dir / "docker-creds.json").write_text,
json.dumps(docker_creds),
)
# Create SSH keypair only if ssh_keypair internal_data exists and
# /home/work/.ssh folder is not mounted.
if self.internal_data.get("ssh_keypair"):
Expand Down
44 changes: 24 additions & 20 deletions src/ai/backend/agent/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@
import textwrap
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections.abc import (
Collection,
Iterator,
Mapping,
MutableMapping,
Sequence,
)
from decimal import Decimal
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Collection,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Sequence,
Set,
TextIO,
Tuple,
Type,
TypeAlias,
cast,
)

Expand Down Expand Up @@ -66,8 +66,9 @@

from .agent import ComputerContext

log = BraceStyleAdapter(logging.getLogger(__spec__.name))
DeviceAllocation: TypeAlias = Mapping[SlotName, Mapping[DeviceId, Decimal]]

log = BraceStyleAdapter(logging.getLogger(__spec__.name))
known_slot_types: Mapping[SlotName, SlotTypes] = {}


Expand All @@ -92,7 +93,7 @@ class KernelResourceSpec:
scratch_disk_size: int
"""The size of scratch disk. (not implemented yet)"""

mounts: List["Mount"] = attrs.Factory(list)
mounts: list["Mount"] = attrs.Factory(list)
"""The mounted vfolder list."""

def freeze(self) -> None:
Expand Down Expand Up @@ -260,8 +261,8 @@ def __eq__(self, __o: object) -> bool:

class AbstractComputePlugin(AbstractPlugin, metaclass=ABCMeta):
key: DeviceName = DeviceName("accelerator")
slot_types: Sequence[Tuple[SlotName, SlotTypes]]
exclusive_slot_types: Set[str]
slot_types: Sequence[tuple[SlotName, SlotTypes]]
exclusive_slot_types: set[str]

@abstractmethod
def get_metadata(self) -> AcceleratorMetadata:
Expand Down Expand Up @@ -355,7 +356,7 @@ async def get_hooks(self, distro: str, arch: str) -> Sequence[Path]:
async def generate_docker_args(
self,
docker: aiodocker.docker.Docker,
device_alloc,
device_alloc: DeviceAllocation,
) -> Mapping[str, Any]:
"""
When starting a new container, generate device-specific options for the
Expand All @@ -364,7 +365,7 @@ async def generate_docker_args(
"""
return {}

async def generate_resource_data(self, device_alloc) -> Mapping[str, str]:
async def generate_resource_data(self, device_alloc: DeviceAllocation) -> Mapping[str, str]:
"""
Generate extra resource.txt key-value pair sets to be used by the plugin's
own hook libraries in containers.
Expand All @@ -386,7 +387,7 @@ async def restore_from_container(
@abstractmethod
async def get_attached_devices(
self,
device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]],
device_alloc: DeviceAllocation,
) -> Sequence[DeviceModelInfo]:
"""
Make up container-attached device information with allocated device id.
Expand All @@ -398,8 +399,9 @@ async def get_node_hwinfo(self) -> HardwareMetadata:

@abstractmethod
async def get_docker_networks(
self, device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]]
) -> List[str]:
self,
device_alloc: DeviceAllocation,
) -> list[str]:
"""
Returns reference string (e.g. Id, name, ...) of docker networks
to attach to container for accelerator to work properly.
Expand All @@ -408,8 +410,10 @@ async def get_docker_networks(

@abstractmethod
async def generate_mounts(
self, source_path: Path, device_alloc: Mapping[SlotName, Mapping[DeviceId, Decimal]]
) -> List[MountInfo]:
self,
source_path: Path,
device_alloc: DeviceAllocation,
) -> list[MountInfo]:
"""
Populates additional files/directories under `source_path`
to mount to container and returns `MountInfo`.
Expand All @@ -427,7 +431,7 @@ def discover_plugins(
plugin_group: str,
allowlist: Optional[set[str]] = None,
blocklist: Optional[set[str]] = None,
) -> Iterator[Tuple[str, Type[AbstractComputePlugin]]]:
) -> Iterator[tuple[str, Type[AbstractComputePlugin]]]:
scanned_plugins = [*super().discover_plugins(plugin_group, allowlist, blocklist)]

def accel_lt_intrinsic(item):
Expand Down
7 changes: 6 additions & 1 deletion src/ai/backend/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,10 +1042,15 @@ class ClusterSSHKeyPair(TypedDict):
private_key: str # PEM-encoded string


class ComputedDeviceCapacity(TypedDict):
mem: NotRequired[BinarySize]
proc: NotRequired[int]


class DeviceModelInfo(TypedDict):
device_id: DeviceId | str
model_name: str
data: Mapping[str, Any]
data: ComputedDeviceCapacity # name kept for backward compat with plugins


class KernelCreationResult(TypedDict):
Expand Down

0 comments on commit 650e052

Please sign in to comment.