Skip to content

Commit

Permalink
Add json logs endpoint which allow for past record
Browse files Browse the repository at this point in the history
  • Loading branch information
olethanh committed Nov 8, 2024
1 parent 7461a49 commit b887add
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 6 deletions.
6 changes: 4 additions & 2 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ class Settings(BaseSettings):
CONNECTIVITY_DNS_HOSTNAME = "example.org"

USE_JAILER = True
# System logs make boot ~2x slower
PRINT_SYSTEM_LOGS = False
# Changelog: PRINT_SYSTEM_LOGS use to print the MicroVM logs with the supervisor output.
# They are now in separate journald entries, disabling the settings disable the logs output of Firecracker VM (only)
# via the serial console. This break the logs endpoint for program, as such disabling it in prod is not recommended.
PRINT_SYSTEM_LOGS = True
IGNORE_TRACEBACK_FROM_DIAGNOSTICS = True
DEBUG_ASYNCIO = False

Expand Down
9 changes: 9 additions & 0 deletions src/aleph/vm/orchestrator/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,12 @@ async def get_execution_records() -> Iterable[ExecutionRecord]:
executions = result.scalars().all()
await session.commit()
return executions


async def get_last_record_for_vm(vm_hash) -> ExecutionRecord:
"""Get the execution records from the database."""
async with AsyncSessionMaker() as session: # Use AsyncSession in a context manager
result = await session.execute(

Check warning on line 123 in src/aleph/vm/orchestrator/metrics.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/metrics.py#L122-L123

Added lines #L122 - L123 were not covered by tests
select(ExecutionRecord).where(ExecutionRecord.vm_hash == vm_hash).limit(1)
) # Use execute for querying
return result.scalar()

Check warning on line 126 in src/aleph/vm/orchestrator/metrics.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/metrics.py#L126

Added line #L126 was not covered by tests
2 changes: 2 additions & 0 deletions src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
operate_erase,
operate_expire,
operate_logs,
operate_logs_json,
operate_reboot,
operate_stop,
stream_logs,
Expand Down Expand Up @@ -104,6 +105,7 @@ def setup_webapp():
# /control APIs are used to control the VMs and access their logs
web.post("/control/allocation/notify", notify_allocation),
web.get("/control/machine/{ref}/stream_logs", stream_logs),
web.get("/control/machine/{ref}/logs.json", operate_logs_json),
web.get("/control/machine/{ref}/logs", operate_logs),
web.post("/control/machine/{ref}/expire", operate_expire),
web.post("/control/machine/{ref}/stop", operate_stop),
Expand Down
3 changes: 1 addition & 2 deletions src/aleph/vm/orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from aiohttp import web
from aleph_message.models import (
AlephMessage,
ItemHash,
PaymentType,
ProgramMessage,
parse_message,
Expand All @@ -23,7 +22,7 @@
from aleph.vm.pool import VmPool
from aleph.vm.utils import create_task_log_exceptions

from .messages import get_message_status, load_updated_message
from .messages import get_message_status
from .payment import (
compute_required_balance,
compute_required_flow,
Expand Down
53 changes: 52 additions & 1 deletion src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
from aleph.vm.conf import settings
from aleph.vm.controllers.qemu.client import QemuVmClient
from aleph.vm.models import VmExecution
from aleph.vm.orchestrator import metrics
from aleph.vm.orchestrator.run import create_vm_execution_or_raise_http_error
from aleph.vm.orchestrator.views.authentication import (
authenticate_websocket_message,
require_jwk_authentication,
)
from aleph.vm.pool import VmPool
from aleph.vm.utils import cors_allow_all, dumps_for_json
from aleph.vm.utils import (
cors_allow_all,
dumps_for_json,
get_message_executable_content,
)
from aleph.vm.utils.logs import get_past_vm_logs

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -116,6 +122,51 @@ async def operate_logs(request: web.Request, authenticated_sender: str) -> web.S
return response


@cors_allow_all
@require_jwk_authentication
async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
"""Logs of a VM (not streaming)"""
vm_hash = get_itemhash_or_400(request.match_info)

# This endpoint allow logs for past executions, so we look into the database if any execution by that hash
# occurred, which we can then use to look for right
record = await metrics.get_last_record_for_vm(vm_hash=vm_hash)
if not record:
raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM")

Check warning on line 135 in src/aleph/vm/orchestrator/views/operator.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/operator.py#L135

Added line #L135 was not covered by tests

message = get_message_executable_content(json.loads(record.message))
print(record.message)
# if not is_sender_authorized(authenticated_sender, message):
# return web.Response(status=403, body="Unauthorized sender")

_journal_stdout_name = f"vm-{vm_hash}-stdout"
_journal_stderr_name = f"vm-{vm_hash}-stderr"

response = web.StreamResponse()
response.headers["Transfer-encoding"] = "chunked"
response.headers["Content-Type"] = "application/json"
await response.prepare(request)
await response.write(b"[")

first = True
for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name):
if not first:
await response.write(b",\n")
first = False
log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr"
msg = {
"SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"],
"MESSAGE": entry["MESSAGE"],
"file": log_type,
"__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"],
}
await response.write(dumps_for_json(msg).encode())
await response.write(b"]")

await response.write_eof()
return response


async def authenticate_websocket_for_vm_or_403(execution: VmExecution, vm_hash: ItemHash, ws: web.WebSocketResponse):
"""Authenticate a websocket connection.
Expand Down
110 changes: 109 additions & 1 deletion tests/supervisor/views/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import aiohttp
import pytest
from aiohttp.test_utils import TestClient
from aleph_message.models import ItemHash
from aleph_message.models import ItemHash, ProgramMessage

from aleph.vm.conf import settings
from aleph.vm.orchestrator.metrics import ExecutionRecord
from aleph.vm.orchestrator.supervisor import setup_webapp
from aleph.vm.pool import VmPool
from aleph.vm.storage import get_message
Expand Down Expand Up @@ -453,3 +454,110 @@ async def test_websocket_logs_good_auth(aiohttp_client, mocker, patch_datetime_n

await websocket.close()
assert websocket.closed


@pytest.mark.asyncio
async def test_get_past_logs(aiohttp_client, mocker, patch_datetime_now):
mock_address = "0x40684b43B88356F62DCc56017547B6A7AC68780B"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value=mock_address,
)
mocker.patch(
"aleph.vm.orchestrator.metrics.get_last_record_for_vm",
return_value=ExecutionRecord(
message="""{
"address": "0x40684b43B88356F62DCc56017547B6A7AC68780B",
"time": 1720816744.639107,
"allow_amend": false,
"metadata": null,
"authorized_keys": null,
"variables": null,
"environment": {
"reproducible": false,
"internet": true,
"aleph_api": true,
"shared_cache": false
},
"resources": {
"vcpus": 1,
"memory": 1024,
"seconds": 300,
"published_ports": null
},
"payment": null,
"requirements": null,
"volumes": [
{
"comment": null,
"mount": "/opt/packages",
"ref": "7338478721e2e966da6395dbfa37dab7b017b48da55b1be22d4eccf3487b836c",
"use_latest": true
}
],
"replaces": null,
"type": "vm-function",
"code": {
"encoding": "squashfs",
"entrypoint": "main:app",
"ref": "c4253bf514d2e0a271456c9023c4b3f13f324e53c176e9ec29b98b5972b02bc7",
"interface": null,
"args": null,
"use_latest": true
},
"runtime": {
"ref": "63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696",
"use_latest": true,
"comment": ""
},
"data": null,
"export": null,
"on": {
"http": true,
"message": null,
"persistent": false
}
}"""
),
)
mocker.patch(
"aleph.vm.orchestrator.views.operator.get_past_vm_logs",
return_value=[
EntryDict(
SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stdout",
MESSAGE="logline1",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2),
),
EntryDict(
SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stderr",
MESSAGE="logline2",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3),
),
],
)

app = setup_webapp()
pool = mocker.MagicMock()
app["vm_pool"] = pool
app["pubsub"] = mocker.MagicMock()
client = await aiohttp_client(app)
response = await client.get(
f"/control/machine/{mock_hash}/logs.json",
)

assert response.status == 200
assert await response.json() == [
{
"MESSAGE": "logline1",
"SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stdout",
"__REALTIME_TIMESTAMP": "2020-10-12 01:02:00",
"file": "stdout",
},
{
"MESSAGE": "logline2",
"SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stderr",
"__REALTIME_TIMESTAMP": "2020-10-12 01:03:00",
"file": "stderr",
},
]

0 comments on commit b887add

Please sign in to comment.