From 874799efb063561af7ea46d12ae1aa0b2dd45009 Mon Sep 17 00:00:00 2001 From: octodog Date: Tue, 24 Dec 2024 13:32:53 +0900 Subject: [PATCH] refactor: Container preparation step in Agent (#3267) (#3295) Co-authored-by: Joongi Kim --- src/ai/backend/agent/agent.py | 4 ++-- src/ai/backend/agent/docker/agent.py | 13 ++----------- src/ai/backend/agent/dummy/agent.py | 3 +-- src/ai/backend/agent/kubernetes/agent.py | 3 +-- src/ai/backend/agent/resources.py | 6 +----- tests/agent/test_resources.py | 4 ---- 6 files changed, 7 insertions(+), 26 deletions(-) diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index a8a0b2c992..313a82c428 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -334,7 +334,7 @@ def get_runner_mount( raise NotImplementedError @abstractmethod - async def spawn( + async def prepare_container( self, resource_spec: KernelResourceSpec, environ: Mapping[str, str], @@ -2068,7 +2068,7 @@ async def create_kernel( "kernel starting with resource spec: \n{0}", pretty(attrs.asdict(resource_spec)), ) - kernel_obj: KernelObjectType = await ctx.spawn( + kernel_obj: KernelObjectType = await ctx.prepare_container( resource_spec, environ, service_ports, diff --git a/src/ai/backend/agent/docker/agent.py b/src/ai/backend/agent/docker/agent.py index 62ef38b27b..f7837e8029 100644 --- a/src/ai/backend/agent/docker/agent.py +++ b/src/ai/backend/agent/docker/agent.py @@ -272,7 +272,6 @@ async def prepare_resource_spec(self) -> Tuple[KernelResourceSpec, Optional[Mapp current_resource_slots.set(known_slot_types) slots = slots.normalize_slots(ignore_unknown=True) resource_spec = KernelResourceSpec( - container_id="", allocations={}, slots={**slots}, # copy mounts=[], @@ -623,7 +622,7 @@ async def generate_accelerator_mounts( src_path.mkdir() return await computer.generate_mounts(src_path, device_alloc) - async def spawn( + async def prepare_container( self, resource_spec: KernelResourceSpec, environ: Mapping[str, str], @@ -975,19 +974,11 @@ async def _rollback_container_creation() -> None: ) assert container is not None cid = cast(str, container._id) - resource_spec.container_id = cid - # Write resource.txt again to update the container id. - with open(self.config_dir / "resource.txt", "w") as f: - await loop.run_in_executor(None, resource_spec.write_to_file, f) async with AsyncFileWriter( target_filename=self.config_dir / "resource.txt", access_mode="a", ) as writer: - for dev_name, device_alloc in resource_spec.allocations.items(): - computer_ctx = self.computers[dev_name] - kvpairs = await computer_ctx.instance.generate_resource_data(device_alloc) - for k, v in kvpairs.items(): - await writer.write(f"{k}={v}\n") + await writer.write(f"CID={cid}\n") except asyncio.CancelledError: if container is not None: diff --git a/src/ai/backend/agent/dummy/agent.py b/src/ai/backend/agent/dummy/agent.py index 1d5b1e06be..8e4431e749 100644 --- a/src/ai/backend/agent/dummy/agent.py +++ b/src/ai/backend/agent/dummy/agent.py @@ -98,7 +98,6 @@ async def prepare_resource_spec( current_resource_slots.set(known_slot_types) slots = slots.normalize_slots(ignore_unknown=True) resource_spec = KernelResourceSpec( - container_id="", allocations={}, slots={**slots}, # copy mounts=[], @@ -161,7 +160,7 @@ def get_runner_mount( ): return Mount(MountTypes.BIND, Path(), Path()) - async def spawn( + async def prepare_container( self, resource_spec: KernelResourceSpec, environ: Mapping[str, str], diff --git a/src/ai/backend/agent/kubernetes/agent.py b/src/ai/backend/agent/kubernetes/agent.py index 0eb85487d3..9f903fc9f0 100644 --- a/src/ai/backend/agent/kubernetes/agent.py +++ b/src/ai/backend/agent/kubernetes/agent.py @@ -182,7 +182,6 @@ def _kernel_resource_spec_read(): current_resource_slots.set(known_slot_types) slots = slots.normalize_slots(ignore_unknown=True) resource_spec = KernelResourceSpec( - container_id="", allocations={}, slots={**slots}, # copy mounts=[], @@ -521,7 +520,7 @@ async def generate_deployment_object( }, } - async def spawn( + async def prepare_container( self, resource_spec: KernelResourceSpec, environ: Mapping[str, str], diff --git a/src/ai/backend/agent/resources.py b/src/ai/backend/agent/resources.py index dfd1201791..31ba144547 100644 --- a/src/ai/backend/agent/resources.py +++ b/src/ai/backend/agent/resources.py @@ -81,9 +81,6 @@ class KernelResourceSpec: while kernel containers are running. """ - container_id: str - """The container ID to refer inside containers.""" - slots: Mapping[SlotName, str] """Stores the original user-requested resource slots.""" @@ -114,7 +111,7 @@ def write_to_string(self) -> str: mounts_str = ",".join(map(str, self.mounts)) slots_str = json.dumps({k: str(v) for k, v in self.slots.items()}) - resource_str = f"CID={self.container_id}\n" + resource_str = "" resource_str += f"SCRATCH_SIZE={BinarySize(self.scratch_disk_size):m}\n" resource_str += f"MOUNTS={mounts_str}\n" resource_str += f"SLOTS={slots_str}\n" @@ -180,7 +177,6 @@ def read_from_string(cls, text: str) -> "KernelResourceSpec": allocations[device_name][slot_name] = per_device_alloc mounts = [Mount.from_str(m) for m in kvpairs["MOUNTS"].split(",") if m] return cls( - container_id=kvpairs.get("CID", "unknown"), scratch_disk_size=BinarySize.finite_from_str(kvpairs["SCRATCH_SIZE"]), allocations=dict(allocations), slots=ResourceSlot(json.loads(kvpairs["SLOTS"])), diff --git a/tests/agent/test_resources.py b/tests/agent/test_resources.py index fba4b2e8ec..c50985dd58 100644 --- a/tests/agent/test_resources.py +++ b/tests/agent/test_resources.py @@ -222,7 +222,6 @@ async def test_allocate_rollback(monkeypatch): affinity_policy = AffinityPolicy.PREFER_SINGLE_NODE resource_spec = resources.KernelResourceSpec( - "a0001", ResourceSlot.from_json({ "cpu": "1", "mem": "512", @@ -242,7 +241,6 @@ async def test_allocate_rollback(monkeypatch): DeviceId("root") ] == Decimal(512) resource_spec = resources.KernelResourceSpec( - "a0001", ResourceSlot.from_json({ "cpu": "1", "mem": "1024", # should fail to allocate @@ -273,7 +271,6 @@ async def test_allocate_rollback(monkeypatch): monkeypatch.setattr(resources.copy, "deepcopy", lambda o: o) resource_spec = resources.KernelResourceSpec( - "a0001", ResourceSlot.from_json({ "cpu": "1", "mem": "512", @@ -293,7 +290,6 @@ async def test_allocate_rollback(monkeypatch): DeviceId("root") ] == Decimal(512) resource_spec = resources.KernelResourceSpec( - "a0001", ResourceSlot.from_json({ "cpu": "1", "mem": "1024", # should fail to allocate