Skip to content

Commit

Permalink
Fix: Retrieve the most recent snapshot and fix un-compression and mou…
Browse files Browse the repository at this point in the history
…nting issues.
  • Loading branch information
nesitor committed Jul 26, 2023
1 parent ce883e9 commit e23b2f3
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 11 deletions.
5 changes: 4 additions & 1 deletion vm_supervisor/messages.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import copy
import logging
from typing import List, Tuple

from aiohttp import ClientConnectorError, ClientResponseError
Expand All @@ -9,6 +10,8 @@

from .storage import get_latest_amend, get_message

logger = logging.getLogger(__name__)


async def try_get_message(ref: str) -> ExecutableMessage:
"""Get the message or raise an aiohttp HTTP error"""
Expand Down Expand Up @@ -85,6 +88,6 @@ async def try_get_store_messages_sdk(ref: str) -> List[StoreMessage]:
async with AlephClient(api_server="https://official.aleph.cloud") as client:
response = await client.get_messages(
message_type=MessageType.store,
refs=ref,
refs=[ref],
)
return response.messages
23 changes: 15 additions & 8 deletions vm_supervisor/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from aleph_message.models import ItemHash

from .conf import SnapshotCompressionAlgorithm
from .conf import SnapshotCompressionAlgorithm, settings
from .ipfs import (
ipfs_remove_file,
ipfs_upload_file,
Expand Down Expand Up @@ -105,15 +105,22 @@ async def take_snapshot(self) -> DiskVolumeSnapshot:
return DiskVolumeSnapshot(snapshot)


async def get_last_snapshot_by_ref(ref: str) -> Optional[DiskVolumeSnapshot]:
async def get_last_snapshot_by_ref(ref: str, namespace: str) -> Optional[DiskVolumeSnapshot]:
messages = await try_get_store_messages_sdk(ref)
if len(messages) == 0:
return None

message = messages.pop()
compressed_snapshot_path = await get_persistent_path(message.item_hash)
compressed_snapshot = CompressedDiskVolumeSnapshot(
compressed_snapshot_path, SnapshotCompressionAlgorithm.gz
)
snapshot = await compressed_snapshot.decompress(SnapshotCompressionAlgorithm.gz)
message = messages[0]
logger.debug(f"Last snapshot message found: {message}")
snapshot_path = Path(settings.PERSISTENT_VOLUMES_DIR) / namespace / message.item_hash
if not snapshot_path.is_file():
compressed_snapshot_path = Path(f"{snapshot_path}.gz")
downloaded_snapshot_path = await get_persistent_path(message.item_hash)
downloaded_snapshot_path.rename(compressed_snapshot_path)
compressed_snapshot = CompressedDiskVolumeSnapshot(
compressed_snapshot_path, SnapshotCompressionAlgorithm.gz
)
snapshot = await compressed_snapshot.decompress(SnapshotCompressionAlgorithm.gz)
else:
snapshot = DiskVolumeSnapshot(snapshot_path)
return snapshot
9 changes: 8 additions & 1 deletion vm_supervisor/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datetime import datetime
from pathlib import Path
from shutil import copy2, disk_usage, make_archive
from subprocess import CalledProcessError
from typing import Optional, Union

import aiohttp
Expand Down Expand Up @@ -252,7 +253,13 @@ async def create_mapped_device(device_name: str, table_command: str) -> None:
async def resize_and_tune_file_system(device_path: Path, mount_path: Path) -> None:
# This tune is needed to assign a random fsid to BTRFS device to be able to mount it
await run_in_subprocess(["btrfstune", "-m", str(device_path)])
await run_in_subprocess(["mount", str(device_path), str(mount_path)])
try:
await run_in_subprocess(["mount", str(device_path), str(mount_path)])
except CalledProcessError:
# Sometime BTRFS don't unmount well, for this cases, try to rescue it cleaning disk logs and mount it again
await run_in_subprocess(["btrfs", "rescue", "zero-log", str(device_path)])
await run_in_subprocess(["mount", str(device_path), str(mount_path)])

await run_in_subprocess(["btrfs", "filesystem", "resize", "max", str(mount_path)])
await run_in_subprocess(["umount", str(mount_path)])

Expand Down
4 changes: 3 additions & 1 deletion vm_supervisor/vm/firecracker/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ class AlephInstanceResources(AlephFirecrackerResources):
async def download_runtime(self):
ref = f"snapshot_{self.namespace}"
snapshot_path = None
snapshot = await get_last_snapshot_by_ref(ref)
snapshot = await get_last_snapshot_by_ref(ref, self.namespace)
if snapshot:
logger.debug(f"Snapshot found on path {snapshot.path}")
snapshot_path = snapshot.path

self.rootfs_path = await create_devmapper(
self.message_content.rootfs, self.namespace, snapshot_path
)
Expand Down

0 comments on commit e23b2f3

Please sign in to comment.