diff --git a/examples/qemu_message_from_aleph.json b/examples/qemu_message_from_aleph.json new file mode 100644 index 000000000..65220c198 --- /dev/null +++ b/examples/qemu_message_from_aleph.json @@ -0,0 +1,66 @@ +{ + "chain": "ETH", + "item_hash": "fake-hash-fake-hash-fake-hash-fake-hash-fake-hash-fake-hash-hash", + "sender": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba", + "type": "INSTANCE", + "channel": "Fun-dApps", + "confirmed": true, + "content": { + "address": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba", + "allow_amend": false, + "variables": { + "VM_CUSTOM_NUMBER": "32" + }, + "environment": { + "reproducible": true, + "internet": true, + "aleph_api": true, + "shared_cache": true, + "hypervisor": "qemu" + }, + "resources": { + "vcpus": 1, + "memory": 512, + "seconds": 30 + }, + "rootfs": { + "parent": { + "ref": "549ec451d9b099cad112d4aaa2c00ac40fb6729a92ff252ff22eef0b5c3cb613", + "use_latest": false + }, + "persistence": "host", + "size_mib": 5000 + }, + "authorized_keys": [ + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDj95BHGUx0/z2G/tTrEi8o49i70xvjcEUdSs3j4A33jE7pAphrfRVbuFMgFubcm8n9r5ftd/H8SjjTL4hY9YvWV5ZuMf92GUga3n4wgevvPlBszYZCy/idxFl0vtHYC1CcK9v4tVb9onhDt8FOJkf2m6PmDyvC+6tl6LwoerXTeeiKr5VnTB4KOBkammtFmix3d1X1SZd/cxdwZIHcQ7BNsqBm2w/YzVba6Z4ZnFUelBkQtMQqNs2aV51O1pFFqtZp2mM71D5d8vn9pOtqJ5QmY5IW6NypcyqKJZg5o6QguK5rdXLkc7AWro27BiaHIENl3w0wazp9EDO9zPAGJ6lz olivier@lanius" + ], + "volumes": [ + { + "mount": "/opt/venv", + "ref": "5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51", + "use_latest": false + }, + { + "comment": "Working data persisted on the VM supervisor, not available on other nodes", + "mount": "/var/lib/example", + "name": "data", + "persistence": "host", + "size_mib": 5 + } + ], + "replaces": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba", + "time": 1619017773.8950517 + }, + "item_content": "{\"address\":\"0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba\",\"allow_amend\":false,\"variables\":{\"VM_CUSTOM_NUMBER\":\"32\"},\"environment\":{\"reproducible\":true,\"internet\":true,\"aleph_api\":true,\"shared_cache\":true},\"resources\":{\"vcpus\":1,\"memory\":128,\"seconds\":30},\"rootfs\":{\"parent\":{\"ref\":\"549ec451d9b099cad112d4aaa2c00ac40fb6729a92ff252ff22eef0b5c3cb613\",\"use_latest\":true},\"persistence\":\"host\",\"size_mib\":20000},\"cloud_config\":{\"password\":\"password\",\"chpasswd\":{\"expire\":\"False\"}},\"volumes\":[{\"mount\":\"/opt/venv\",\"ref\":\"5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51\",\"use_latest\":false},{\"comment\":\"Working data persisted on the VM supervisor, not available on other nodes\",\"mount\":\"/var/lib/example\",\"name\":\"data\",\"persistence\":\"host\",\"size_mib\":5}],\"replaces\":\"0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba\",\"time\":1619017773.8950517}", + "item_type": "inline", + "signature": "0x372da8230552b8c3e65c05b31a0ff3a24666d66c575f8e11019f62579bf48c2b7fe2f0bbe907a2a5bf8050989cdaf8a59ff8a1cbcafcdef0656c54279b4aa0c71b", + "size": 749, + "time": 1619017773.8950577, + "confirmations": [ + { + "chain": "ETH", + "height": 12284734, + "hash": "0x67f2f3cde5e94e70615c92629c70d22dc959a118f46e9411b29659c2fce87cdc" + } + ] +} diff --git a/pyproject.toml b/pyproject.toml index e225f6714..b9dce0891 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "msgpack~=1.0.7", "packaging~=23.2", "jsonschema==4.19.1", + "qmp==0.0.1" ] [project.urls] diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 2044368e2..c11b860da 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -212,6 +212,8 @@ class Settings(BaseSettings): # hashlib.sha256(b"secret-token").hexdigest() ALLOCATION_TOKEN_HASH = "151ba92f2eb90bce67e912af2f7a5c17d8654b3d29895b042107ea312a7eebda" + ENABLE_QEMU_SUPPORT: bool = Field(default=False) + # Tests on programs FAKE_DATA_PROGRAM: Optional[Path] = None @@ -291,6 +293,18 @@ def check(self): if self.USE_NDP_PROXY: assert is_command_available("ndppd"), "Command `ndppd` not found, run `apt install ndppd`" + # Necessary for cloud-init customisation of instance + assert is_command_available( + "cloud-localds" + ), "Command `cloud-localds` not found, run `apt install cloud-image-utils`" + + if settings.ENABLE_QEMU_SUPPORT: + # Qemu support + assert is_command_available("qemu-img"), "Command `qemu-img` not found, run `apt install qemu-utils`" + assert is_command_available( + "qemu-system-x86_64" + ), "Command `qemu-system-x86_64` not found, run `apt install qemu-system-x86`" + def setup(self): os.makedirs(self.MESSAGE_CACHE, exist_ok=True) os.makedirs(self.CODE_CACHE, exist_ok=True) diff --git a/src/aleph/vm/controllers/firecracker/executable.py b/src/aleph/vm/controllers/firecracker/executable.py index 46055a1cd..ed301cec3 100644 --- a/src/aleph/vm/controllers/firecracker/executable.py +++ b/src/aleph/vm/controllers/firecracker/executable.py @@ -4,7 +4,6 @@ import asyncio import logging -import subprocess from dataclasses import dataclass, field from multiprocessing import Process, set_start_method from os.path import exists, isfile @@ -17,6 +16,7 @@ from aleph.vm.conf import settings from aleph.vm.controllers.firecracker.snapshots import CompressedDiskVolumeSnapshot +from aleph.vm.controllers.interface import AlephControllerInterface from aleph.vm.guest_api.__main__ import run_guest_api from aleph.vm.hypervisors.firecracker.microvm import FirecrackerConfig, MicroVM from aleph.vm.network.firewall import teardown_nftables_for_vm @@ -137,7 +137,7 @@ class VmInitNotConnectedError(Exception): ConfigurationType = TypeVar("ConfigurationType") -class AlephFirecrackerExecutable(Generic[ConfigurationType]): +class AlephFirecrackerExecutable(Generic[ConfigurationType], AlephControllerInterface): vm_id: int vm_hash: ItemHash resources: AlephFirecrackerResources @@ -150,6 +150,7 @@ class AlephFirecrackerExecutable(Generic[ConfigurationType]): guest_api_process: Optional[Process] = None is_instance: bool _firecracker_config: Optional[FirecrackerConfig] = None + support_snapshot: bool def __init__( self, @@ -185,26 +186,6 @@ def __init__( self.guest_api_process = None self._firecracker_config = None - def get_vm_ip(self) -> Optional[str]: - if self.tap_interface: - return self.tap_interface.guest_ip.with_prefixlen - return None - - def get_vm_route(self) -> Optional[str]: - if self.tap_interface: - return str(self.tap_interface.host_ip).split("/", 1)[0] - return None - - def get_vm_ipv6(self) -> Optional[str]: - if self.tap_interface: - return self.tap_interface.guest_ipv6.with_prefixlen - return None - - def get_vm_ipv6_gateway(self) -> Optional[str]: - if self.tap_interface: - return str(self.tap_interface.host_ipv6.ip) - return None - def to_dict(self): """Dict representation of the virtual machine. Used to record resource usage and for JSON serialization.""" if self.fvm.proc and psutil: @@ -300,3 +281,18 @@ async def teardown(self): async def create_snapshot(self) -> CompressedDiskVolumeSnapshot: raise NotImplementedError() + + async def get_log_queue(self) -> asyncio.Queue: + queue: asyncio.Queue = asyncio.Queue(maxsize=1000) + # Limit the number of queues per VM + + if len(self.fvm.log_queues) > 20: + logger.warning("Too many log queues, dropping the oldest one") + self.fvm.log_queues.pop(0) + self.fvm.log_queues.append(queue) + return queue + + async def unregister_queue(self, queue: asyncio.Queue): + if queue in self.fvm.log_queues: + self.fvm.log_queues.remove(queue) + queue.empty() diff --git a/src/aleph/vm/controllers/firecracker/instance.py b/src/aleph/vm/controllers/firecracker/instance.py index 485f5e6a4..8630de72e 100644 --- a/src/aleph/vm/controllers/firecracker/instance.py +++ b/src/aleph/vm/controllers/firecracker/instance.py @@ -58,6 +58,7 @@ class AlephFirecrackerInstance(AlephFirecrackerExecutable): resources: AlephInstanceResources latest_snapshot: Optional[DiskVolumeSnapshot] is_instance = True + support_snapshot = False def __init__( self, diff --git a/src/aleph/vm/controllers/firecracker/program.py b/src/aleph/vm/controllers/firecracker/program.py index c98e817d5..f43f6f4c1 100644 --- a/src/aleph/vm/controllers/firecracker/program.py +++ b/src/aleph/vm/controllers/firecracker/program.py @@ -258,6 +258,7 @@ class AlephFirecrackerProgram(AlephFirecrackerExecutable[ProgramVmConfiguration] vm_configuration: ProgramVmConfiguration | None resources: AlephProgramResources is_instance = False + support_snapshot = False def __init__( self, diff --git a/src/aleph/vm/controllers/firecracker/snapshot_manager.py b/src/aleph/vm/controllers/firecracker/snapshot_manager.py index e3fd42032..2a42774e0 100644 --- a/src/aleph/vm/controllers/firecracker/snapshot_manager.py +++ b/src/aleph/vm/controllers/firecracker/snapshot_manager.py @@ -96,7 +96,7 @@ def run_snapshots(self) -> None: job_thread.start() async def start_for(self, vm: AlephFirecrackerExecutable, frequency: Optional[int] = None) -> None: - if not vm.is_instance: + if not vm.support_snapshot: msg = "Snapshots are not implemented for programs." raise NotImplementedError(msg) diff --git a/src/aleph/vm/controllers/interface.py b/src/aleph/vm/controllers/interface.py new file mode 100644 index 000000000..511cd96d7 --- /dev/null +++ b/src/aleph/vm/controllers/interface.py @@ -0,0 +1,91 @@ +import asyncio +import logging +from abc import ABC +from asyncio.subprocess import Process +from typing import Any, Coroutine, Optional + +from aleph_message.models import ItemHash +from aleph_message.models.execution.environment import MachineResources + +from aleph.vm.controllers.firecracker.snapshots import CompressedDiskVolumeSnapshot +from aleph.vm.network.interfaces import TapInterface + +logger = logging.getLogger(__name__) + + +class AlephControllerInterface(ABC): + vm_id: int + "id in the VMPool, attributed at execution" + vm_hash: ItemHash + "identifier for the VM definition, linked to an Aleph Message" + resources: Any + "local resource for the machine" + enable_console: bool + enable_networking: bool + "enable networking for this VM" + hardware_resources: MachineResources + support_snapshot: bool + "Does this controller support snapshotting" + guest_api_process: Optional[Process] = None + tap_interface: Optional[TapInterface] = None + "Network interface used for this VM" + + def get_vm_ip(self) -> Optional[str]: + if self.tap_interface: + return self.tap_interface.guest_ip.with_prefixlen + return None + + def get_vm_route(self) -> Optional[str]: + if self.tap_interface: + return str(self.tap_interface.host_ip).split("/", 1)[0] + return None + + def get_vm_ipv6(self) -> Optional[str]: + if self.tap_interface: + return self.tap_interface.guest_ipv6.with_prefixlen + return None + + def get_vm_ipv6_gateway(self) -> Optional[str]: + if self.tap_interface: + return str(self.tap_interface.host_ipv6.ip) + return None + + def to_dict(self): + """Dict representation of the virtual machine. Used to record resource usage and for JSON serialization.""" + raise NotImplementedError() + + async def setup(self): + """Configuration done before the VM process is started""" + raise NotImplementedError() + + async def start(self): + """Start the VM process""" + raise NotImplementedError() + + async def wait_for_init(self) -> None: + """Wait for the init process of the virtual machine to be ready. + May be empty.""" + pass + + async def configure(self) -> None: + """Configuration done after the VM process is started""" + raise NotImplementedError() + + async def start_guest_api(self): + raise NotImplementedError() + + async def stop_guest_api(self): + raise NotImplementedError() + + async def teardown(self) -> Coroutine: + raise NotImplementedError() + + async def create_snapshot(self) -> CompressedDiskVolumeSnapshot: + "Must be implement if self.support_snapshot is True" + raise NotImplementedError() + + async def get_log_queue(self) -> asyncio.Queue: + raise NotImplementedError() + + async def unregister_queue(self, queue: asyncio.Queue): + raise NotImplementedError() diff --git a/src/aleph/vm/controllers/qemu/QEMU.md b/src/aleph/vm/controllers/qemu/QEMU.md new file mode 100644 index 000000000..b529ad0b6 --- /dev/null +++ b/src/aleph/vm/controllers/qemu/QEMU.md @@ -0,0 +1,132 @@ +# Qemu support + +## Requirements +Commands : qemu, cloud-ds, qemu-img + +These are installable via +`apt install cloud-image-utils qemu-utils qemu-system-x86` + +At this moment this branch depends on branch `olethanh-qemu-message-format` of aleph-message which add the new temporary format. https://github.com/olethanh/aleph-message/tree/olethanh-qemu-message-format + +The easiest way is to check the branch locally and install it in your venv using `pip install -e .` + +## To test launching a VM instance + +Launch aleph.vm.orchestrator with the following environment variables + + +```environ +ALEPH_VM_FAKE_INSTANCE_BASE=/home/olivier/Projects/qemu-quickstart/jammy-server-cloudimg-amd64.img +ALEPH_VM_FAKE_INSTANCE_MESSAGE=/home/olivier/Projects/aleph/aleph-vm/examples/qemu_message_from_aleph.json +ALEPH_VM_USE_FAKE_INSTANCE_BASE=1 +# set test as the allocation password +ALEPH_VM_ALLOCATION_TOKEN_HASH=9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08 + +``` + +Where `ALEPH_VM_FAKE_INSTANCE_BASE` is the path to the base disk image. You can get the Ubuntu one via: +`wget https://cloud-images.ubuntu.com/jammy/current/jammy-server-cloudimg-amd64.img` + +You can use any base VM image supporting cloud-init. cloud-init support is mandatory because it is used to set up the network. + + +To only launch the VM instance, use the parameter: +`--run-fake-instance` + +You can then try to connect via ssh to it's ip. Wait a minute or so for it to set up properly with the network + +Or launching the whole supervisor server (no params), then launch the VM via http + +```http request +### Start fake VM +POST http://localhost:4020/control/allocations +Content-Type: application/json +X-Auth-Signature: test +Accept: application/json + +{"persistent_vms": [], "instances": ["decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca"]} +``` + +After a minutes or two you should be able to SSH into the VM. Check in the log for the VM ip. +If you used an Ubuntu image the username should be ubuntu + +You can then stop the VM using +```http request +### Stop the VM +POST http://localhost:4020/control/machine/decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca/stop +Accept: application/json +``` +(you will need to comment @require_jwk_authentication) + +# Connecting to the VM via your own ssh key +In local development, if you want to connect via ssh to the VM and you don't have your + key already included in you base image or inside the aleph message, you can configure it in the following way. + +First set your key in the environment variable ALEPH_VM_DEVELOPER_SSH_KEYS in the json format. You can add it directly in the `.env` file +```env +ALEPH_VM_DEVELOPER_SSH_KEYS=["ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDj95BHGUx0/z2G/tTrEi8o49i70xvjcEUdSs3j4A33jE7pAphrfRVbuFMgFubcm8n9r5ftd/H8SjjTL4hY9YvWV5ZuMf92GUga3n4wgevvPlBszYZCy/idxFl0vtHYC1CcK9v4tVb9onhDt8FOJkf2m6PmDyvC+6tl6LwoerXTeeiKr5VnTB4KOBkammtFmix3d1X1SZd/cxdwZIHcQ7BNsqBm2w/YzVba6Z4ZnFUelBkQtMQqNs2aV51O1pFFqtZp2mM71D5d8vn9pOtqJ5QmY5IW6NypcyqKJZg5o6QguK5rdXLkc7AWro27BiaHIENl3w0wazp9EDO9zPAGJ6lz olivier@lanius"] +``` + +Then pass the `--developer-ssh-keys` as an argument when starting the supervisor. + +Cloud init support for settings the ssh key in the VM image is required, this is the same mechanism and settings as for firecracker program, of course this is not for production use. + +# Check the log via Websocket +You can stream the logs from the VM using, the following python example script. +Caveat: This requires to temporarly disable auth on this endpoint, you need the print system log settings to be active `ALEPH_VM_PRINT_SYSTEM_LOGS=1`. The system only stream new log content from the VM not the old one. +```python +import json +import sys + +import asyncio +import aiohttp + + +def on_message(content): + try: + msg = json.loads(content) + fd = sys.stderr if msg["type"] == "stderr" else sys.stdout + print("<", msg["message"], file=fd, end="") + except: + print("unable to parse", content) + + +async def tail_websocket(url): + async with aiohttp.ClientSession() as session: + async with session.ws_connect(url) as ws: + print(f"connected to {url}") + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + on_message(msg.data) + elif msg.type == aiohttp.WSMsgType.CLOSED: + print("closed") + break + elif msg.type == aiohttp.WSMsgType.ERROR: + print("Error", msg) + break + + +vm_hash = "decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca" +url = f"ws://localhost:4020/control/machine/{vm_hash}/logs" +loop = asyncio.get_event_loop() +loop.run_until_complete(tail_websocket(url)) +``` + + +# TODO +- [x] Launch +- [x] Message format +- [x] Network +- [x] Cloud init support +- [x] Download ressource +- [ ] snapshot +- [ ] Multi volume +- [x] fix logs +- [ ] Testing +- [x] Support raw format for base image +- [ ] More testing with different Distro: Fedora, debian, alpine +- [ ] Document for user how to build their own images +- [x] Allow ssh developer key +- [ ] Automated testing in CI +- [x] Output the whole serial console in logs +- [x] Test code for websocket logs diff --git a/src/aleph/vm/controllers/qemu/__init__.py b/src/aleph/vm/controllers/qemu/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/aleph/vm/controllers/qemu/cloudinit.py b/src/aleph/vm/controllers/qemu/cloudinit.py new file mode 100644 index 000000000..ba3e67563 --- /dev/null +++ b/src/aleph/vm/controllers/qemu/cloudinit.py @@ -0,0 +1,136 @@ +import base64 +import json +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Union + +import yaml +from aleph_message.models import ItemHash + +from aleph.vm.conf import settings +from aleph.vm.controllers.interface import AlephControllerInterface +from aleph.vm.hypervisors.firecracker.config import Drive +from aleph.vm.utils import is_command_available, run_in_subprocess + +# https://cloudinit.readthedocs.io/en/latest/reference/datasources/nocloud.html + + +def get_hostname_from_hash(vm_hash: ItemHash) -> str: + item_hash_binary: bytes = base64.b16decode(vm_hash.encode().upper()) + return base64.b32encode(item_hash_binary).decode().strip("=").lower() + + +def encode_user_data(hostname, ssh_authorized_keys) -> bytes: + """Creates user data configuration file for cloud-init tool""" + config: dict[str, Union[str, bool, list[str]]] = { + "hostname": hostname, + "disable_root": False, + "ssh_pwauth": False, + "ssh_authorized_keys": ssh_authorized_keys, + # Avoid the resize error because we already do it on the VM disk creation stage + "resize_rootfs": False, + } + cloud_config_header = "#cloud-config\n" + config_output = yaml.safe_dump(config, default_flow_style=False, sort_keys=False) + content = (cloud_config_header + config_output).encode() + return content + + +def create_metadata_file(hostname, vm_id) -> bytes: + """Creates metadata configuration file for cloud-init tool""" + metadata = { + "instance-id": f"iid-instance-{vm_id}", + "local-hostname": hostname, + } + return json.dumps(metadata).encode() + + +def create_network_file(ip, ipv6, ipv6_gateway, nameservers, route) -> bytes: + """Creates network configuration file for cloud-init tool""" + # TODO : had to change from eth0 -> ens3 for qemu + # check for portable solution + + network = { + "ethernets": { + "ens3": { + "dhcp4": False, + "dhcp6": False, + "addresses": [ip, ipv6], + "gateway4": route, + "gateway6": ipv6_gateway, + "nameservers": { + "addresses": nameservers, + }, + }, + }, + "version": 2, + } + return yaml.safe_dump(network, default_flow_style=False, sort_keys=False).encode() + + +async def create_cloud_init_drive_image( + disk_image_path, hostname, vm_id, ip, ipv6, ipv6_gateway, nameservers, route, ssh_authorized_keys +): + with ( + NamedTemporaryFile() as user_data_config_file, + NamedTemporaryFile() as network_config_file, + NamedTemporaryFile() as metadata_config_file, + ): + user_data = encode_user_data(hostname, ssh_authorized_keys) + user_data_config_file.write(user_data) + user_data_config_file.flush() + network_config = create_network_file(ip, ipv6, ipv6_gateway, nameservers, route) + network_config_file.write(network_config) + network_config_file.flush() + + metadata_config = create_metadata_file(hostname, vm_id) + metadata_config_file.write(metadata_config) + metadata_config_file.flush() + + await run_in_subprocess( + [ + "cloud-localds", + f"--network-config={network_config_file.name}", + str(disk_image_path), + user_data_config_file.name, + metadata_config_file.name, + ] + ) + + +class CloudInitMixin(AlephControllerInterface): + async def _create_cloud_init_drive(self) -> Drive: + """Creates the cloud-init volume to configure and setup the VM""" + # assert self.enable_networking and self.tap_interface, f"Network not enabled for VM {self.vm_id}" + ssh_authorized_keys = self.resources.message_content.authorized_keys or [] + if settings.USE_DEVELOPER_SSH_KEYS: + ssh_authorized_keys += settings.DEVELOPER_SSH_KEYS + ip = self.get_vm_ip() + route = self.get_vm_route() + ipv6 = self.get_vm_ipv6() + ipv6_gateway = self.get_vm_ipv6_gateway() + vm_id = self.vm_id + nameservers = settings.DNS_NAMESERVERS + hostname = get_hostname_from_hash(self.vm_hash) + + disk_image_path: Path = settings.EXECUTION_ROOT / f"cloud-init-{self.vm_hash}.img" + assert is_command_available("cloud-localds") + + await create_cloud_init_drive_image( + disk_image_path, + hostname, + vm_id, + ip, + ipv6, + ipv6_gateway, + nameservers, + route, + ssh_authorized_keys, + ) + + return Drive( + drive_id="Fake", + path_on_host=disk_image_path, + is_root_device=False, + is_read_only=True, + ) diff --git a/src/aleph/vm/controllers/qemu/instance.py b/src/aleph/vm/controllers/qemu/instance.py new file mode 100644 index 000000000..378a64cfb --- /dev/null +++ b/src/aleph/vm/controllers/qemu/instance.py @@ -0,0 +1,322 @@ +import asyncio +import json +import logging +import shutil +import sys +from asyncio import Task +from asyncio.subprocess import Process +from typing import Generic, Optional, TypeVar + +import psutil +import qmp +from aleph_message.models import ItemHash +from aleph_message.models.execution.environment import MachineResources +from aleph_message.models.execution.instance import RootfsVolume +from aleph_message.models.execution.volume import PersistentVolume + +from aleph.vm.conf import settings +from aleph.vm.controllers.firecracker.executable import AlephFirecrackerResources +from aleph.vm.controllers.interface import AlephControllerInterface +from aleph.vm.controllers.qemu.cloudinit import CloudInitMixin +from aleph.vm.network.firewall import teardown_nftables_for_vm +from aleph.vm.network.interfaces import TapInterface +from aleph.vm.storage import get_rootfs_base_path +from aleph.vm.utils import run_in_subprocess + +logger = logging.getLogger(__name__) + + +class AlephQemuResources(AlephFirecrackerResources): + async def download_all(self): + volume = self.message_content.rootfs + parent_image_path = await get_rootfs_base_path(volume.parent.ref) + self.rootfs_path = await self.make_writable_volume(parent_image_path, volume) + return + + async def make_writable_volume(self, parent_image_path, volume: PersistentVolume | RootfsVolume): + "Create a new qcow2 image file based on the passed one, that we give to the VM to write onto" + qemu_img_path = shutil.which("qemu-img") + volume_name = volume.name if isinstance(volume, PersistentVolume) else "rootfs" + + # detect the image format + out_json = await run_in_subprocess([qemu_img_path, "info", str(parent_image_path), "--output=json"]) + out = json.loads(out_json) + parent_format = out.get("format", "") + + dest_path = settings.PERSISTENT_VOLUMES_DIR / self.namespace / f"{volume_name}.qcow2" + dest_path.parent.mkdir(parents=True, exist_ok=True) + + await run_in_subprocess( + [ + qemu_img_path, + "create", + "-f", # Format + "qcow2", + "-F", + parent_format, + "-b", + str(parent_image_path), + str(dest_path), + ] + ) + return dest_path + + +ConfigurationType = TypeVar("ConfigurationType") + + +class AlephQemuInstance(Generic[ConfigurationType], CloudInitMixin, AlephControllerInterface): + vm_id: int + vm_hash: ItemHash + resources: AlephQemuResources + enable_console: bool + enable_networking: bool + hardware_resources: MachineResources + tap_interface: Optional[TapInterface] = None + vm_configuration: Optional[ConfigurationType] + is_instance: bool + qemu_process: Optional[Process] + support_snapshot = False + qmp_socket_path = None + + def __str__(self): + return f"" + + def __init__( + self, + vm_id: int, + vm_hash: ItemHash, + resources: AlephQemuResources, + enable_networking: bool = False, + enable_console: Optional[bool] = None, + hardware_resources: MachineResources = MachineResources(), + tap_interface: Optional[TapInterface] = None, + ): + self.vm_id = vm_id + self.vm_hash = vm_hash + self.resources = resources + if enable_console is None: + enable_console = settings.PRINT_SYSTEM_LOGS + self.enable_console = enable_console + self.enable_networking = enable_networking and settings.ALLOW_VM_NETWORKING + self.hardware_resources = hardware_resources + self.tap_interface = tap_interface + + def to_dict(self): + """Dict representation of the virtual machine. Used to record resource usage and for JSON serialization.""" + if self.qemu_process and psutil: + # The firecracker process is still running and process information can be obtained from `psutil`. + try: + p = psutil.Process(self.qemu_process.pid) + pid_info = { + "status": p.status(), + "create_time": p.create_time(), + "cpu_times": p.cpu_times(), + "cpu_percent": p.cpu_percent(), + "memory_info": p.memory_info(), + "io_counters": p.io_counters(), + "open_files": p.open_files(), + "connections": p.connections(), + "num_threads": p.num_threads(), + "num_ctx_switches": p.num_ctx_switches(), + } + except psutil.NoSuchProcess: + logger.warning("Cannot read process metrics (process %s not found)", self.qemu_process) + pid_info = None + else: + pid_info = None + + return { + "process": pid_info, + **self.__dict__, + } + + async def setup(self): + pass + + async def start(self): + logger.debug(f"Starting Qemu: {self} ") + # Based on the command + # qemu-system-x86_64 -enable-kvm -m 2048 -net nic,model=virtio + # -net tap,ifname=tap0,script=no,downscript=no -drive file=alpine.qcow2,media=disk,if=virtio -nographic + + qemu_path = shutil.which("qemu-system-x86_64") + image_path = self.resources.rootfs_path + vcpu_count = self.hardware_resources.vcpus + mem_size_mib = self.hardware_resources.memory + mem_size_mb = int(mem_size_mib / 1024 / 1024 * 1000 * 1000) + # hardware_resources.published ports -> not implemented at the moment + # hardware_resources.seconds -> only for microvm + + monitor_socket_path = settings.EXECUTION_ROOT / (str(self.vm_id) + "-monitor.socket") + self.qmp_socket_path = qmp_socket_path = settings.EXECUTION_ROOT / (str(self.vm_id) + "-qmp.socket") + + args = [ + qemu_path, + "-enable-kvm", + "-m", + str(mem_size_mb), + "-smp", + str(vcpu_count), + # Disable floppy + "-fda", + "", + # "-snapshot", # Do not save anything to disk + "-drive", + f"file={image_path},media=disk,if=virtio", + # To debug you can pass gtk or curses instead + "-display", + "none", + "--no-reboot", # Rebooting from inside the VM shuts down the machine + # Listen for commands on this socket + "-monitor", + f"unix:{monitor_socket_path},server,nowait", + # Listen for commands on this socket (QMP protocol in json). Supervisor use it to send shutdown or start + # command + "-qmp", + f"unix:{qmp_socket_path},server,nowait", + # Tell to put the output to std fd, so we can include them in the log + "-serial", + "stdio", + # Uncomment for debug + # "-serial", "telnet:localhost:4321,server,nowait", + ] + if self.tap_interface: + interface_name = self.tap_interface.device_name + # script=no, downscript=no tell qemu not to try to set up the network itself + args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={interface_name},script=no,downscript=no"] + + cloud_init_drive = await self._create_cloud_init_drive() + if cloud_init_drive: + args += ["-cdrom", f"{cloud_init_drive.path_on_host}"] + + try: + print(*args) + self.qemu_process = proc = await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + logger.debug(f"setup done {self}, {proc}") + + async def handle_termination(proc: Process): + await proc.wait() + logger.info(f"{self} Process terminated with {proc.returncode} : {str(args)}") + + loop = asyncio.get_running_loop() + loop.create_task(handle_termination(proc)) + except Exception: + # Stop the VM and clear network interfaces in case any error prevented the start of the virtual machine. + logger.error("VM startup failed, cleaning up network") + if self.enable_networking: + teardown_nftables_for_vm(self.vm_id) + if self.tap_interface: + await self.tap_interface.delete() + raise + + if self.enable_console: + self.process_logs() + + await self.wait_for_init() + logger.debug(f"started qemu vm {self} on {self.get_vm_ip()}") + + async def wait_for_init(self) -> None: + """Wait for the init process of the virtual machine to be ready. + May be empty.""" + + return + + async def configure(self): + "Nothing to configure, we do the configuration via cloud init" + pass + + async def start_guest_api(self): + pass + + async def stop_guest_api(self): + pass + + stdout_task: Optional[Task] = None + stderr_task: Optional[Task] = None + log_queues: list[asyncio.Queue] = [] + + async def teardown(self): + if self.stdout_task: + self.stdout_task.cancel() + if self.stderr_task: + self.stderr_task.cancel() + + self._shutdown() + + if self.enable_networking: + teardown_nftables_for_vm(self.vm_id) + if self.tap_interface: + await self.tap_interface.delete() + await self.stop_guest_api() + + async def _process_stdout(self): + while not self.qemu_process: + await asyncio.sleep(0.01) # Todo: Use signal here + while True: + line = await self.qemu_process.stdout.readline() + if not line: # FD is closed nothing more will come + print(self, "EOF") + return + for queue in self.log_queues: + await queue.put(("stdout", line)) + print(self, "stdout", line.decode().strip()) + + async def _process_stderr(self): + while not self.qemu_process: + await asyncio.sleep(0.01) # Todo: Use signal here + while True: + line = await self.qemu_process.stderr.readline() + if not line: # FD is closed nothing more will come + print(self, "EOF") + return + for queue in self.log_queues: + await queue.put(("stderr", line)) + print(self, "stderr", line.decode().strip(), file=sys.stderr) + + def process_logs(self) -> tuple[Task, Task]: + """Start two tasks to process the stdout and stderr + + It will stream their content to queues registered on self.log_queues + It will also print them""" + + loop = asyncio.get_running_loop() + self.stdout_task = loop.create_task(self._process_stdout()) + self.stderr_task = loop.create_task(self._process_stderr()) + return self.stdout_task, self.stderr_task + + def _get_qmpclient(self) -> Optional[qmp.QEMUMonitorProtocol]: + if not self.qmp_socket_path: + return None + client = qmp.QEMUMonitorProtocol(str(self.qmp_socket_path)) + client.connect() + return client + + def _shutdown(self): + client = self._get_qmpclient() + if client: + resp = client.command("system_powerdown") + if not resp == {}: + logger.warning("unexpected answer from VM", resp) + client.close() + self.qmp_socket_path = None + + async def get_log_queue(self) -> asyncio.Queue: + queue: asyncio.Queue = asyncio.Queue(maxsize=1000) + # Limit the number of queues per VM + if len(self.log_queues) > 20: + logger.warning("Too many log queues, dropping the oldest one") + self.log_queues.pop(0) + self.log_queues.append(queue) + return queue + + async def unregister_queue(self, queue: asyncio.Queue): + if queue in self.log_queues: + self.log_queues.remove(queue) + queue.empty() diff --git a/src/aleph/vm/hypervisors/firecracker/microvm.py b/src/aleph/vm/hypervisors/firecracker/microvm.py index 5b30655da..309c07bff 100644 --- a/src/aleph/vm/hypervisors/firecracker/microvm.py +++ b/src/aleph/vm/hypervisors/firecracker/microvm.py @@ -51,6 +51,10 @@ def system(command): async def setfacl(): + """Give current user permission to access /dev/kvm via acl""" + if os.access("/dev/kvm", os.R_OK | os.W_OK): + return + user = getuid() cmd = f"sudo setfacl -m u:{user}:rw /dev/kvm" proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) diff --git a/src/aleph/vm/models.py b/src/aleph/vm/models.py index a4e3d4d8d..4b70592e9 100644 --- a/src/aleph/vm/models.py +++ b/src/aleph/vm/models.py @@ -4,14 +4,16 @@ from asyncio import Task from dataclasses import dataclass from datetime import datetime, timezone -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Optional from aleph_message.models import ( ExecutableContent, InstanceContent, ItemHash, + MessageType, ProgramContent, ) +from aleph_message.models.execution.environment import HypervisorType from aleph.vm.conf import settings from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable @@ -21,6 +23,8 @@ AlephFirecrackerResources, AlephProgramResources, ) +from aleph.vm.controllers.interface import AlephControllerInterface +from aleph.vm.controllers.qemu.instance import AlephQemuInstance, AlephQemuResources from aleph.vm.network.interfaces import TapInterface from aleph.vm.orchestrator.metrics import ( ExecutionRecord, @@ -34,7 +38,6 @@ if TYPE_CHECKING: from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager - logger = logging.getLogger(__name__) @@ -64,7 +67,7 @@ class VmExecution: original: ExecutableContent message: ExecutableContent resources: Optional[AlephFirecrackerResources] = None - vm: Optional[AlephFirecrackerExecutable] = None + vm: Optional[AlephFirecrackerExecutable | AlephQemuInstance] = None times: VmExecutionTimes @@ -89,6 +92,11 @@ def is_program(self): def is_instance(self): return isinstance(self.message, InstanceContent) + @property + def hypervisor(self): + # default to firecracker for retro compat + return getattr(self.message.environment, "hypervisor", HypervisorType.firecracker) + @property def becomes_ready(self): return self.ready_event.wait @@ -133,24 +141,29 @@ async def prepare(self): return self.times.preparing_at = datetime.now(tz=timezone.utc) + resources = None if self.is_program: resources = AlephProgramResources(self.message, namespace=self.vm_hash) elif self.is_instance: - resources = AlephInstanceResources(self.message, namespace=self.vm_hash) - else: + if self.hypervisor == HypervisorType.firecracker: + resources = AlephInstanceResources(self.message, namespace=self.vm_hash) + elif self.hypervisor == HypervisorType.qemu: + resources = AlephQemuResources(self.message, namespace=self.vm_hash) + + if not resources: msg = "Unknown executable message type" - raise ValueError(msg) + raise ValueError(msg, repr(self.message)) await resources.download_all() self.times.prepared_at = datetime.now(tz=timezone.utc) self.resources = resources - async def create(self, vm_id: int, tap_interface: Optional[TapInterface] = None) -> AlephFirecrackerExecutable: + async def create(self, vm_id: int, tap_interface: Optional[TapInterface] = None) -> AlephControllerInterface: if not self.resources: msg = "Execution resources must be configured first" raise ValueError(msg) self.times.starting_at = datetime.now(tz=timezone.utc) - vm: Union[AlephFirecrackerProgram, AlephFirecrackerInstance] + vm: AlephControllerInterface if self.is_program: assert isinstance(self.resources, AlephProgramResources) self.vm = vm = AlephFirecrackerProgram( @@ -161,17 +174,32 @@ async def create(self, vm_id: int, tap_interface: Optional[TapInterface] = None) hardware_resources=self.message.resources, tap_interface=tap_interface, ) + elif self.is_instance: + if self.hypervisor == HypervisorType.firecracker: + assert isinstance(self.resources, AlephInstanceResources) + self.vm = vm = AlephFirecrackerInstance( + vm_id=vm_id, + vm_hash=self.vm_hash, + resources=self.resources, + enable_networking=self.message.environment.internet, + hardware_resources=self.message.resources, + tap_interface=tap_interface, + ) + elif self.hypervisor == HypervisorType.qemu: + assert isinstance(self.resources, AlephQemuResources) + self.vm = vm = AlephQemuInstance( + vm_id=vm_id, + vm_hash=self.vm_hash, + resources=self.resources, + enable_networking=self.message.environment.internet, + hardware_resources=self.message.resources, + tap_interface=tap_interface, + ) + else: + raise Exception("Unknown VM") else: - assert self.is_instance - assert isinstance(self.resources, AlephInstanceResources) - self.vm = vm = AlephFirecrackerInstance( - vm_id=vm_id, - vm_hash=self.vm_hash, - resources=self.resources, - enable_networking=self.message.environment.internet, - hardware_resources=self.message.resources, - tap_interface=tap_interface, - ) + raise Exception("Unknown VM") + try: await vm.setup() await vm.start() @@ -180,7 +208,8 @@ async def create(self, vm_id: int, tap_interface: Optional[TapInterface] = None) self.times.started_at = datetime.now(tz=timezone.utc) self.ready_event.set() return vm - except Exception: + except Exception as exception: + print(exception) await vm.teardown() raise @@ -235,7 +264,7 @@ async def stop(self): self.cancel_expiration() self.cancel_update() - if isinstance(self.message, InstanceContent): + if self.vm.support_snapshot: await self.snapshot_manager.stop_for(self.vm_hash) def start_watching_for_updates(self, pubsub: PubSub): @@ -288,7 +317,7 @@ async def record_usage(self): io_write_bytes=pid_info["process"]["io_counters"][3], vcpus=self.vm.hardware_resources.vcpus, memory=self.vm.hardware_resources.memory, - network_tap=self.vm.tap_interface.device_name, + network_tap=self.vm.tap_interface.device_name if self.vm.tap_interface else "", ) ) else: diff --git a/src/aleph/vm/orchestrator/cli.py b/src/aleph/vm/orchestrator/cli.py index 4a19d934f..9dbfc93d2 100644 --- a/src/aleph/vm/orchestrator/cli.py +++ b/src/aleph/vm/orchestrator/cli.py @@ -253,6 +253,7 @@ async def run_instances(instances: list[ItemHash]) -> None: await asyncio.gather(*[start_instance(item_hash=instance_id) for instance_id in instances]) await asyncio.Event().wait() # wait forever + # TODO : should we really wait forever? @contextlib.contextmanager diff --git a/src/aleph/vm/orchestrator/views/operator.py b/src/aleph/vm/orchestrator/views/operator.py index 75b9a024b..529d17b06 100644 --- a/src/aleph/vm/orchestrator/views/operator.py +++ b/src/aleph/vm/orchestrator/views/operator.py @@ -61,8 +61,7 @@ async def stream_logs(request: web.Request) -> web.StreamResponse: if execution.vm is None: raise web.HTTPBadRequest(body=f"VM {vm_hash} is not running") - - queue: asyncio.Queue = asyncio.Queue(maxsize=1000) + queue = None try: ws = web.WebSocketResponse() await ws.prepare(request) @@ -82,11 +81,7 @@ async def stream_logs(request: web.Request) -> web.StreamResponse: await ws.send_json({"status": "connected"}) - # Limit the number of queues per VM - if len(execution.vm.fvm.log_queues) > 20: - logger.warning("Too many log queues, dropping the oldest one") - execution.vm.fvm.log_queues.pop(0) - execution.vm.fvm.log_queues.append(queue) + queue = await execution.vm.get_log_queue() while True: log_type, message = await queue.get() @@ -96,9 +91,8 @@ async def stream_logs(request: web.Request) -> web.StreamResponse: finally: await ws.close() finally: - if queue in execution.vm.fvm.log_queues: - execution.vm.fvm.log_queues.remove(queue) - queue.empty() + if queue: + await execution.vm.unregister_queue(queue) @require_jwk_authentication diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index 0eb18e592..f1966a360 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -94,8 +94,9 @@ async def create_a_vm( await execution.create(vm_id=vm_id, tap_interface=tap_interface) + assert execution.vm # Start VM snapshots automatically - if isinstance(message, InstanceContent): + if execution.vm.support_snapshot: await self.snapshot_manager.start_for(vm=execution.vm) return execution