Skip to content

Commit

Permalink
Restore existing snapshots (#383)
Browse files Browse the repository at this point in the history
* Feature: Restore existing snapshot from reference.

* Feature: Forget old snapshots also before delete the file.

* Fix: Use python SDK instead the connector to retrieve the snapshot messages by ref.

* Fix: Fixed code quality issues.

* Fix: Save snapshots in the correct path.

* Fix: Retrieve the most recent snapshot and fix un-compression and mounting issues.

* Fix: Code quality issues.

* Fix: Fixed missing aioipfs dependency.

* Fix: Downgraded aioipfs version.

* Fix: Added missing python SDK dependency.

* Fix: Added missing wheel dependency for Debian 12.

* Fix: Fixing dependency error for Debian 12.

* Fix: Fixing dependency error for Debian 12.

* Fix: Build dependency error for Debian 12.

---------

Co-authored-by: Andres D. Molins <[email protected]>
  • Loading branch information
nesitor and Andres D. Molins authored Jul 27, 2023
1 parent f36b18b commit fa1e8a7
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 10 deletions.
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ debian-package-code:
cp ../examples/instance_message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/instance_message_from_aleph.json
cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data
mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.0' 'jwskate==0.8.0' 'eth-account==0.9.0'
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.0' 'jwskate==0.8.0' 'eth-account==0.9.0' 'aioipfs==0.6.2' 'aleph-sdk-python==0.7.0'
python3 -m compileall ./aleph-vm/opt/aleph-vm/

debian-package-resources: firecracker-bins vmlinux
Expand Down
1 change: 1 addition & 0 deletions packaging/debian-12.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y \
git \
curl \
sudo \
libssl-dev build-essential automake pkg-config libtool libffi-dev libgmp-dev libyaml-cpp-dev \
python3-pip \
&& rm -rf /var/lib/apt/lists/*

Expand Down
17 changes: 15 additions & 2 deletions vm_supervisor/messages.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import asyncio
import copy
from typing import Tuple
import logging
from typing import List, Tuple

from aiohttp import ClientConnectorError, ClientResponseError
from aiohttp.web_exceptions import HTTPNotFound, HTTPServiceUnavailable
from aleph_message.models import ExecutableMessage, ItemHash, MessageType
from aleph.sdk.client import AlephClient
from aleph_message.models import ExecutableMessage, ItemHash, MessageType, StoreMessage

from .storage import get_latest_amend, get_message

logger = logging.getLogger(__name__)


async def try_get_message(ref: str) -> ExecutableMessage:
"""Get the message or raise an aiohttp HTTP error"""
Expand Down Expand Up @@ -78,3 +82,12 @@ async def load_updated_message(
message = copy.deepcopy(original_message)
await update_message(message)
return message, original_message


async def try_get_store_messages_sdk(ref: str) -> List[StoreMessage]:
async with AlephClient(api_server="https://official.aleph.cloud") as client:
response = await client.get_messages(
message_type=MessageType.store,
refs=[ref],
)
return response.messages
1 change: 1 addition & 0 deletions vm_supervisor/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async def do_execution_snapshot(

return None
except ValueError:
execution.snapshot_running = False
raise ValueError("Something failed taking an snapshot")


Expand Down
40 changes: 38 additions & 2 deletions vm_supervisor/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@

from aleph_message.models import ItemHash

from .conf import SnapshotCompressionAlgorithm
from .conf import SnapshotCompressionAlgorithm, settings
from .ipfs import (
ipfs_remove_file,
ipfs_upload_file,
send_forget_ipfs_message,
send_store_ipfs_message,
)
from .storage import compress_volume_snapshot, create_volume_snapshot
from .messages import try_get_store_messages_sdk
from .storage import (
compress_volume_snapshot,
create_volume_snapshot,
decompress_volume_snapshot,
get_persistent_path,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,6 +51,11 @@ def __init__(
def delete(self) -> None:
self.path.unlink(missing_ok=True)

async def decompress(self, algorithm: SnapshotCompressionAlgorithm):
decompressed_snapshot = await decompress_volume_snapshot(self.path, algorithm)
decompressed = DiskVolumeSnapshot(path=decompressed_snapshot)
return decompressed

async def upload(self, vm_hash: ItemHash) -> ItemHash:
ref = f"snapshot_{vm_hash}"
snapshot_hash = await ipfs_upload_file(self.path)
Expand Down Expand Up @@ -92,3 +103,28 @@ class DiskVolume(DiskVolumeFile):
async def take_snapshot(self) -> DiskVolumeSnapshot:
snapshot = await create_volume_snapshot(self.path)
return DiskVolumeSnapshot(snapshot)


async def get_last_snapshot_by_ref(
ref: str, namespace: str
) -> Optional[DiskVolumeSnapshot]:
messages = await try_get_store_messages_sdk(ref)
if len(messages) == 0:
return None

message = messages[0]
logger.debug(f"Last snapshot message found: {message}")
snapshot_path = (
Path(settings.PERSISTENT_VOLUMES_DIR) / namespace / message.item_hash
)
if not snapshot_path.is_file():
compressed_snapshot_path = Path(f"{snapshot_path}.gz")
downloaded_snapshot_path = await get_persistent_path(message.item_hash)
downloaded_snapshot_path.rename(compressed_snapshot_path)
compressed_snapshot = CompressedDiskVolumeSnapshot(
compressed_snapshot_path, SnapshotCompressionAlgorithm.gz
)
snapshot = await compressed_snapshot.decompress(SnapshotCompressionAlgorithm.gz)
else:
snapshot = DiskVolumeSnapshot(snapshot_path)
return snapshot
47 changes: 43 additions & 4 deletions vm_supervisor/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from datetime import datetime
from pathlib import Path
from shutil import copy2, disk_usage, make_archive
from typing import Union
from subprocess import CalledProcessError
from typing import Optional, Union

import aiohttp
from aleph_message.models import (
Expand Down Expand Up @@ -188,6 +189,13 @@ async def get_rootfs_base_path(ref: ItemHash) -> Path:
return cache_path


async def get_persistent_path(ref: str) -> Path:
cache_path = Path(settings.PERSISTENT_VOLUMES_DIR) / ref
url = f"{settings.CONNECTOR_URL}/download/data/{ref}"
await download_file(url, cache_path)
return cache_path


async def create_ext4(path: Path, size_mib: int) -> bool:
if path.is_file():
logger.debug(f"File already exists, skipping ext4 creation on {path}")
Expand Down Expand Up @@ -245,13 +253,21 @@ async def create_mapped_device(device_name: str, table_command: str) -> None:
async def resize_and_tune_file_system(device_path: Path, mount_path: Path) -> None:
# This tune is needed to assign a random fsid to BTRFS device to be able to mount it
await run_in_subprocess(["btrfstune", "-m", str(device_path)])
await run_in_subprocess(["mount", str(device_path), str(mount_path)])
try:
await run_in_subprocess(["mount", str(device_path), str(mount_path)])
except CalledProcessError:
# Sometime BTRFS don't unmount well, for this cases, try to rescue it cleaning disk logs and mount it again
await run_in_subprocess(["btrfs", "rescue", "zero-log", str(device_path)])
await run_in_subprocess(["mount", str(device_path), str(mount_path)])

await run_in_subprocess(["btrfs", "filesystem", "resize", "max", str(mount_path)])
await run_in_subprocess(["umount", str(mount_path)])


async def create_devmapper(
volume: Union[PersistentVolume, RootfsVolume], namespace: str
volume: Union[PersistentVolume, RootfsVolume],
namespace: str,
snapshot_path: Optional[Path] = None,
) -> Path:
"""It creates a /dev/mapper/DEVICE inside the VM, that is an extended mapped device of the volume specified.
We follow the steps described here: https://community.aleph.im/t/deploying-mutable-vm-instances-on-aleph/56/2
Expand All @@ -277,7 +293,10 @@ async def create_devmapper(
base_table_command = f"0 {image_block_size} linear {image_loop_device} 0"
await create_mapped_device(image_volume_name, base_table_command)

volume_path = await create_volume_file(volume, namespace)
if snapshot_path:
volume_path = snapshot_path
else:
volume_path = await create_volume_file(volume, namespace)
extended_block_size: int = await get_block_size(volume_path)

mapped_volume_name_base = f"{namespace}_base"
Expand Down Expand Up @@ -368,6 +387,26 @@ async def compress_volume_snapshot(
return new_path


async def decompress_volume_snapshot(
path: Path,
algorithm: SnapshotCompressionAlgorithm = SnapshotCompressionAlgorithm.gz,
) -> Path:
if algorithm != SnapshotCompressionAlgorithm.gz:
raise NotImplementedError

new_path = Path(str(path).split(".gz")[0])

await run_in_subprocess(
[
"gzip",
"-d",
str(path),
]
)

return new_path


def check_disk_space(bytes_to_use: int) -> bool:
host_disk_usage = disk_usage("/")
return host_disk_usage.free >= bytes_to_use
10 changes: 9 additions & 1 deletion vm_supervisor/vm/firecracker/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
CompressedDiskVolumeSnapshot,
DiskVolume,
DiskVolumeSnapshot,
get_last_snapshot_by_ref,
)
from vm_supervisor.storage import (
NotEnoughDiskSpace,
Expand All @@ -44,8 +45,15 @@

class AlephInstanceResources(AlephFirecrackerResources):
async def download_runtime(self):
ref = f"snapshot_{self.namespace}"
snapshot_path = None
snapshot = await get_last_snapshot_by_ref(ref, self.namespace)
if snapshot:
logger.debug(f"Snapshot found on path {snapshot.path}")
snapshot_path = snapshot.path

self.rootfs_path = await create_devmapper(
self.message_content.rootfs, self.namespace
self.message_content.rootfs, self.namespace, snapshot_path
)
assert (
self.rootfs_path.is_block_device()
Expand Down

0 comments on commit fa1e8a7

Please sign in to comment.