diff --git a/vm_supervisor/run.py b/vm_supervisor/run.py index cc7ed7430..4d51fc238 100644 --- a/vm_supervisor/run.py +++ b/vm_supervisor/run.py @@ -79,6 +79,7 @@ async def create_vm_execution(vm_hash: ItemHash) -> VmExecution: except HostNotFoundError as error: logger.exception(error) pool.forget_vm(vm_hash=vm_hash) + raise HTTPInternalServerError(reason="Host did not respond to ping") if not execution.vm: raise ValueError("The VM has not been created") @@ -86,6 +87,29 @@ async def create_vm_execution(vm_hash: ItemHash) -> VmExecution: return execution +async def create_vm_execution_or_raise_http_error(vm_hash: ItemHash) -> VmExecution: + try: + return await create_vm_execution(vm_hash=vm_hash) + except ResourceDownloadError as error: + logger.exception(error) + pool.forget_vm(vm_hash=vm_hash) + raise HTTPBadRequest(reason="Code, runtime or data not available") + except FileTooLargeError as error: + raise HTTPInternalServerError(reason=error.args[0]) + except VmSetupError as error: + logger.exception(error) + pool.forget_vm(vm_hash=vm_hash) + raise HTTPInternalServerError(reason="Error during vm initialisation") + except MicroVMFailedInit as error: + logger.exception(error) + pool.forget_vm(vm_hash=vm_hash) + raise HTTPInternalServerError(reason="Error during runtime initialisation") + except HostNotFoundError as error: + logger.exception(error) + pool.forget_vm(vm_hash=vm_hash) + raise HTTPInternalServerError(reason="Host did not respond to ping") + + async def run_code_on_request( vm_hash: ItemHash, path: str, request: web.Request ) -> web.Response: @@ -96,7 +120,7 @@ async def run_code_on_request( execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash) if not execution: - execution = await create_vm_execution(vm_hash=vm_hash) + execution = await create_vm_execution_or_raise_http_error(vm_hash=vm_hash) logger.debug(f"Using vm={execution.vm_id}") @@ -190,7 +214,7 @@ async def run_code_on_request( if settings.REUSE_TIMEOUT > 0: if settings.WATCH_FOR_UPDATES: execution.start_watching_for_updates(pubsub=request.app["pubsub"]) - execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT) + _ = execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT) else: await execution.stop() @@ -203,7 +227,7 @@ async def run_code_on_event(vm_hash: ItemHash, event, pubsub: PubSub): execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash) if not execution: - execution = await create_vm_execution(vm_hash=vm_hash) + execution = await create_vm_execution_or_raise_http_error(vm_hash=vm_hash) logger.debug(f"Using vm={execution.vm_id}") diff --git a/vm_supervisor/views/__init__.py b/vm_supervisor/views/__init__.py index 5165a5b92..8d0cd0c38 100644 --- a/vm_supervisor/views/__init__.py +++ b/vm_supervisor/views/__init__.py @@ -3,15 +3,17 @@ from hashlib import sha256 from pathlib import Path from string import Template -from typing import Awaitable, Optional +from typing import Awaitable, Dict, Optional import aiodns import aiohttp from aiohttp import web from aiohttp.web_exceptions import HTTPNotFound +from aleph_message.exceptions import UnknownHashError from aleph_message.models import ItemHash from pydantic import ValidationError +from firecracker.microvm import MicroVMFailedInit from packaging.version import InvalidVersion, Version from vm_supervisor import status from vm_supervisor.conf import settings @@ -19,8 +21,15 @@ from vm_supervisor.pubsub import PubSub from vm_supervisor.resources import Allocation from vm_supervisor.run import pool, run_code_on_request, start_persistent_vm -from vm_supervisor.utils import b32_to_b16, dumps_for_json, get_ref_from_dns +from vm_supervisor.utils import ( + HostNotFoundError, + b32_to_b16, + dumps_for_json, + get_ref_from_dns, +) from vm_supervisor.version import __version__ +from vm_supervisor.vm.firecracker.executable import ResourceDownloadError, VmSetupError +from vm_supervisor.vm.firecracker.program import FileTooLargeError logger = logging.getLogger(__name__) @@ -201,20 +210,7 @@ async def update_allocations(request: web.Request): pubsub: PubSub = request.app["pubsub"] - # Start VMs - for vm_hash in allocation.persistent_vms: - vm_hash = ItemHash(vm_hash) - logger.info(f"Starting long running VM {vm_hash}") - await start_persistent_vm(vm_hash, pubsub) - - # Start Instances - for instance_hash in allocation.instances: - instance_hash = ItemHash(instance_hash) - logger.info(f"Starting instance {instance_hash}") - await start_persistent_vm(instance_hash, pubsub) - - # Stop unscheduled persistent programs and instances. - # Instances are also marked with persistent = True. + # First free resources from persistent programs and instances that are not scheduled anymore. allocations = allocation.persistent_vms | allocation.instances for execution in pool.get_persistent_executions(): if execution.vm_hash not in allocations: @@ -223,10 +219,65 @@ async def update_allocations(request: web.Request): await execution.stop() execution.persistent = False + # Second start persistent VMs and instances sequentially to limit resource usage. + + # Exceptions that can be raised when starting a VM: + vm_creation_exceptions = ( + UnknownHashError, + ResourceDownloadError, + FileTooLargeError, + VmSetupError, + MicroVMFailedInit, + HostNotFoundError, + ) + + scheduling_errors: Dict[ItemHash, Exception] = {} + + # Schedule the start of persistent VMs: + for vm_hash in allocation.persistent_vms: + try: + logger.info(f"Starting long running VM '{vm_hash}'") + vm_hash = ItemHash(vm_hash) + await start_persistent_vm(vm_hash, pubsub) + except vm_creation_exceptions as error: + logger.exception(error) + scheduling_errors[vm_hash] = error + + # Schedule the start of instances: + for instance_hash in allocation.instances: + logger.info(f"Starting instance '{instance_hash}'") + try: + instance_hash = ItemHash(instance_hash) + await start_persistent_vm(instance_hash, pubsub) + except vm_creation_exceptions as error: + logger.exception(error) + scheduling_errors[instance_hash] = error + # Log unsupported features if allocation.on_demand_vms: logger.warning("Not supported yet: 'allocation.on_demand_vms'") if allocation.jobs: logger.warning("Not supported yet: 'allocation.on_demand_vms'") - return web.json_response(data={"success": True}) + failing = set(scheduling_errors.keys()) + successful = allocations - failing + + status_code: int + if not failing: + status_code = 200 # OK + elif not successful: + status_code = 503 # Service Unavailable + else: + status_code = 207 # Multi-Status + + return web.json_response( + data={ + "success": not failing, + "successful": list(successful), + "failing": list(failing), + "errors": { + vm_hash: repr(error) for vm_hash, error in scheduling_errors.items() + }, + }, + status=status_code, + ) diff --git a/vm_supervisor/vm/firecracker/program.py b/vm_supervisor/vm/firecracker/program.py index 9171bad8f..407a1dcc6 100644 --- a/vm_supervisor/vm/firecracker/program.py +++ b/vm_supervisor/vm/firecracker/program.py @@ -34,6 +34,7 @@ from .executable import ( AlephFirecrackerExecutable, AlephFirecrackerResources, + ResourceDownloadError, VmInitNotConnected, VmSetupError, Volume, @@ -46,19 +47,6 @@ class FileTooLargeError(Exception): pass -class ResourceDownloadError(ClientResponseError): - """An error occurred while downloading a VM resource file""" - - def __init__(self, error: ClientResponseError): - super().__init__( - request_info=error.request_info, - history=error.history, - status=error.status, - message=error.message, - headers=error.headers, - ) - - def read_input_data(path_to_data: Optional[Path]) -> Optional[bytes]: if not path_to_data: return None