From b887add01276f727c97b49a6ec2e1f7fcb2d7ac8 Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Tue, 5 Nov 2024 09:50:15 +0100 Subject: [PATCH] Add json logs endpoint which allow for past record --- src/aleph/vm/conf.py | 6 +- src/aleph/vm/orchestrator/metrics.py | 9 ++ src/aleph/vm/orchestrator/supervisor.py | 2 + src/aleph/vm/orchestrator/tasks.py | 3 +- src/aleph/vm/orchestrator/views/operator.py | 53 +++++++++- tests/supervisor/views/test_operator.py | 110 +++++++++++++++++++- 6 files changed, 177 insertions(+), 6 deletions(-) diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index b68ff9e88..01461de44 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -133,8 +133,10 @@ class Settings(BaseSettings): CONNECTIVITY_DNS_HOSTNAME = "example.org" USE_JAILER = True - # System logs make boot ~2x slower - PRINT_SYSTEM_LOGS = False + # Changelog: PRINT_SYSTEM_LOGS use to print the MicroVM logs with the supervisor output. + # They are now in separate journald entries, disabling the settings disable the logs output of Firecracker VM (only) + # via the serial console. This break the logs endpoint for program, as such disabling it in prod is not recommended. + PRINT_SYSTEM_LOGS = True IGNORE_TRACEBACK_FROM_DIAGNOSTICS = True DEBUG_ASYNCIO = False diff --git a/src/aleph/vm/orchestrator/metrics.py b/src/aleph/vm/orchestrator/metrics.py index 3b8cdf9f3..4ea023615 100644 --- a/src/aleph/vm/orchestrator/metrics.py +++ b/src/aleph/vm/orchestrator/metrics.py @@ -115,3 +115,12 @@ async def get_execution_records() -> Iterable[ExecutionRecord]: executions = result.scalars().all() await session.commit() return executions + + +async def get_last_record_for_vm(vm_hash) -> ExecutionRecord: + """Get the execution records from the database.""" + async with AsyncSessionMaker() as session: # Use AsyncSession in a context manager + result = await session.execute( + select(ExecutionRecord).where(ExecutionRecord.vm_hash == vm_hash).limit(1) + ) # Use execute for querying + return result.scalar() diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index 118706370..777674c2a 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -52,6 +52,7 @@ operate_erase, operate_expire, operate_logs, + operate_logs_json, operate_reboot, operate_stop, stream_logs, @@ -104,6 +105,7 @@ def setup_webapp(): # /control APIs are used to control the VMs and access their logs web.post("/control/allocation/notify", notify_allocation), web.get("/control/machine/{ref}/stream_logs", stream_logs), + web.get("/control/machine/{ref}/logs.json", operate_logs_json), web.get("/control/machine/{ref}/logs", operate_logs), web.post("/control/machine/{ref}/expire", operate_expire), web.post("/control/machine/{ref}/stop", operate_stop), diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index c7062d931..921a2265f 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -11,7 +11,6 @@ from aiohttp import web from aleph_message.models import ( AlephMessage, - ItemHash, PaymentType, ProgramMessage, parse_message, @@ -23,7 +22,7 @@ from aleph.vm.pool import VmPool from aleph.vm.utils import create_task_log_exceptions -from .messages import get_message_status, load_updated_message +from .messages import get_message_status from .payment import ( compute_required_balance, compute_required_flow, diff --git a/src/aleph/vm/orchestrator/views/operator.py b/src/aleph/vm/orchestrator/views/operator.py index af0e98f45..67f1d59f9 100644 --- a/src/aleph/vm/orchestrator/views/operator.py +++ b/src/aleph/vm/orchestrator/views/operator.py @@ -15,13 +15,19 @@ from aleph.vm.conf import settings from aleph.vm.controllers.qemu.client import QemuVmClient from aleph.vm.models import VmExecution +from aleph.vm.orchestrator import metrics from aleph.vm.orchestrator.run import create_vm_execution_or_raise_http_error from aleph.vm.orchestrator.views.authentication import ( authenticate_websocket_message, require_jwk_authentication, ) from aleph.vm.pool import VmPool -from aleph.vm.utils import cors_allow_all, dumps_for_json +from aleph.vm.utils import ( + cors_allow_all, + dumps_for_json, + get_message_executable_content, +) +from aleph.vm.utils.logs import get_past_vm_logs logger = logging.getLogger(__name__) @@ -116,6 +122,51 @@ async def operate_logs(request: web.Request, authenticated_sender: str) -> web.S return response +@cors_allow_all +@require_jwk_authentication +async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse: + """Logs of a VM (not streaming)""" + vm_hash = get_itemhash_or_400(request.match_info) + + # This endpoint allow logs for past executions, so we look into the database if any execution by that hash + # occurred, which we can then use to look for right + record = await metrics.get_last_record_for_vm(vm_hash=vm_hash) + if not record: + raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM") + + message = get_message_executable_content(json.loads(record.message)) + print(record.message) + # if not is_sender_authorized(authenticated_sender, message): + # return web.Response(status=403, body="Unauthorized sender") + + _journal_stdout_name = f"vm-{vm_hash}-stdout" + _journal_stderr_name = f"vm-{vm_hash}-stderr" + + response = web.StreamResponse() + response.headers["Transfer-encoding"] = "chunked" + response.headers["Content-Type"] = "application/json" + await response.prepare(request) + await response.write(b"[") + + first = True + for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name): + if not first: + await response.write(b",\n") + first = False + log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr" + msg = { + "SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"], + "MESSAGE": entry["MESSAGE"], + "file": log_type, + "__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"], + } + await response.write(dumps_for_json(msg).encode()) + await response.write(b"]") + + await response.write_eof() + return response + + async def authenticate_websocket_for_vm_or_403(execution: VmExecution, vm_hash: ItemHash, ws: web.WebSocketResponse): """Authenticate a websocket connection. diff --git a/tests/supervisor/views/test_operator.py b/tests/supervisor/views/test_operator.py index b8e370de7..c43c4b75d 100644 --- a/tests/supervisor/views/test_operator.py +++ b/tests/supervisor/views/test_operator.py @@ -9,9 +9,10 @@ import aiohttp import pytest from aiohttp.test_utils import TestClient -from aleph_message.models import ItemHash +from aleph_message.models import ItemHash, ProgramMessage from aleph.vm.conf import settings +from aleph.vm.orchestrator.metrics import ExecutionRecord from aleph.vm.orchestrator.supervisor import setup_webapp from aleph.vm.pool import VmPool from aleph.vm.storage import get_message @@ -453,3 +454,110 @@ async def test_websocket_logs_good_auth(aiohttp_client, mocker, patch_datetime_n await websocket.close() assert websocket.closed + + +@pytest.mark.asyncio +async def test_get_past_logs(aiohttp_client, mocker, patch_datetime_now): + mock_address = "0x40684b43B88356F62DCc56017547B6A7AC68780B" + mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_" + mocker.patch( + "aleph.vm.orchestrator.views.authentication.authenticate_jwk", + return_value=mock_address, + ) + mocker.patch( + "aleph.vm.orchestrator.metrics.get_last_record_for_vm", + return_value=ExecutionRecord( + message="""{ + "address": "0x40684b43B88356F62DCc56017547B6A7AC68780B", + "time": 1720816744.639107, + "allow_amend": false, + "metadata": null, + "authorized_keys": null, + "variables": null, + "environment": { + "reproducible": false, + "internet": true, + "aleph_api": true, + "shared_cache": false + }, + "resources": { + "vcpus": 1, + "memory": 1024, + "seconds": 300, + "published_ports": null + }, + "payment": null, + "requirements": null, + "volumes": [ + { + "comment": null, + "mount": "/opt/packages", + "ref": "7338478721e2e966da6395dbfa37dab7b017b48da55b1be22d4eccf3487b836c", + "use_latest": true + } + ], + "replaces": null, + "type": "vm-function", + "code": { + "encoding": "squashfs", + "entrypoint": "main:app", + "ref": "c4253bf514d2e0a271456c9023c4b3f13f324e53c176e9ec29b98b5972b02bc7", + "interface": null, + "args": null, + "use_latest": true + }, + "runtime": { + "ref": "63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696", + "use_latest": true, + "comment": "" + }, + "data": null, + "export": null, + "on": { + "http": true, + "message": null, + "persistent": false + } +}""" + ), + ) + mocker.patch( + "aleph.vm.orchestrator.views.operator.get_past_vm_logs", + return_value=[ + EntryDict( + SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stdout", + MESSAGE="logline1", + __REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2), + ), + EntryDict( + SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stderr", + MESSAGE="logline2", + __REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3), + ), + ], + ) + + app = setup_webapp() + pool = mocker.MagicMock() + app["vm_pool"] = pool + app["pubsub"] = mocker.MagicMock() + client = await aiohttp_client(app) + response = await client.get( + f"/control/machine/{mock_hash}/logs.json", + ) + + assert response.status == 200 + assert await response.json() == [ + { + "MESSAGE": "logline1", + "SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stdout", + "__REALTIME_TIMESTAMP": "2020-10-12 01:02:00", + "file": "stdout", + }, + { + "MESSAGE": "logline2", + "SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stderr", + "__REALTIME_TIMESTAMP": "2020-10-12 01:03:00", + "file": "stderr", + }, + ]