Skip to content

Commit

Permalink
feat: Per-image rescan with improved multi-arch support (#1712)
Browse files Browse the repository at this point in the history
  • Loading branch information
achimnol authored Nov 13, 2023
1 parent 90382df commit 5f87043
Show file tree
Hide file tree
Showing 10 changed files with 474 additions and 229 deletions.
1 change: 1 addition & 0 deletions changes/1712.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement per-image metadata sync in the `mgr image rescan` command and deprecate scanning a whole Docker Hub account to avoid the API rate limit
48 changes: 46 additions & 2 deletions src/ai/backend/manager/cli/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from ai.backend.common import redis_helper
from ai.backend.common.config import redis_config_iv
from ai.backend.common.defs import REDIS_IMAGE_DB, REDIS_LIVE_DB, REDIS_STAT_DB, REDIS_STREAM_DB
from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
from ai.backend.common.exception import ConfigurationError
from ai.backend.common.logging import AbstractLogger, LocalLogger
from ai.backend.common.types import RedisConnectionInfo
from ai.backend.manager.config import SharedConfig

from ..config import LocalConfig
from ..config import LocalConfig, SharedConfig
from ..config import load as load_config


Expand Down Expand Up @@ -61,6 +61,50 @@ def __exit__(self, *exc_info) -> None:
self._logger.__exit__()


@contextlib.asynccontextmanager
async def etcd_ctx(cli_ctx: CLIContext) -> AsyncIterator[AsyncEtcd]:
local_config = cli_ctx.local_config
creds = None
if local_config["etcd"]["user"]:
creds = {
"user": local_config["etcd"]["user"],
"password": local_config["etcd"]["password"],
}
scope_prefix_map = {
ConfigScopes.GLOBAL: "",
# TODO: provide a way to specify other scope prefixes
}
etcd = AsyncEtcd(
local_config["etcd"]["addr"],
local_config["etcd"]["namespace"],
scope_prefix_map,
credentials=creds,
)
try:
yield etcd
finally:
await etcd.close()


@contextlib.asynccontextmanager
async def config_ctx(cli_ctx: CLIContext) -> AsyncIterator[SharedConfig]:
local_config = cli_ctx.local_config
# scope_prefix_map is created inside ConfigServer
shared_config = SharedConfig(
local_config["etcd"]["addr"],
local_config["etcd"]["user"],
local_config["etcd"]["password"],
local_config["etcd"]["namespace"],
)
await shared_config.reload()
raw_redis_config = await shared_config.etcd.get_prefix("config/redis")
local_config["redis"] = redis_config_iv.check(raw_redis_config)
try:
yield shared_config
finally:
await shared_config.close()


@attrs.define(auto_attribs=True, frozen=True, slots=True)
class RedisConnectionSet:
live: RedisConnectionInfo
Expand Down
75 changes: 19 additions & 56 deletions src/ai/backend/manager/cli/etcd.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
from __future__ import annotations

import asyncio
import contextlib
import json
import logging
import sys
from typing import TYPE_CHECKING, AsyncIterator
from typing import TYPE_CHECKING

import click

from ai.backend.cli.types import ExitCode
from ai.backend.common.cli import EnumChoice, MinMaxRange
from ai.backend.common.config import redis_config_iv
from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
from ai.backend.common.etcd import ConfigScopes
from ai.backend.common.etcd import quote as etcd_quote
from ai.backend.common.etcd import unquote as etcd_unquote
from ai.backend.common.logging import BraceStyleAdapter

from ..config import SharedConfig
from .context import etcd_ctx
from .image_impl import alias as alias_impl
from .image_impl import dealias as dealias_impl
from .image_impl import forget_image as forget_image_impl
Expand All @@ -37,50 +35,6 @@ def cli() -> None:
pass


@contextlib.asynccontextmanager
async def etcd_ctx(cli_ctx: CLIContext) -> AsyncIterator[AsyncEtcd]:
local_config = cli_ctx.local_config
creds = None
if local_config["etcd"]["user"]:
creds = {
"user": local_config["etcd"]["user"],
"password": local_config["etcd"]["password"],
}
scope_prefix_map = {
ConfigScopes.GLOBAL: "",
# TODO: provide a way to specify other scope prefixes
}
etcd = AsyncEtcd(
local_config["etcd"]["addr"],
local_config["etcd"]["namespace"],
scope_prefix_map,
credentials=creds,
)
try:
yield etcd
finally:
await etcd.close()


@contextlib.asynccontextmanager
async def config_ctx(cli_ctx: CLIContext) -> AsyncIterator[SharedConfig]:
local_config = cli_ctx.local_config
# scope_prefix_map is created inside ConfigServer
shared_config = SharedConfig(
local_config["etcd"]["addr"],
local_config["etcd"]["user"],
local_config["etcd"]["password"],
local_config["etcd"]["namespace"],
)
await shared_config.reload()
raw_redis_config = await shared_config.etcd.get_prefix("config/redis")
local_config["redis"] = redis_config_iv.check(raw_redis_config)
try:
yield shared_config
finally:
await shared_config.close()


@cli.command()
@click.argument("key")
@click.argument("value")
Expand Down Expand Up @@ -300,7 +254,7 @@ def set_image_resource_limit(
@cli.command()
@click.argument("registry")
@click.pass_obj
def rescan_images(cli_ctx: CLIContext, registry) -> None:
def rescan_images(cli_ctx: CLIContext, registry: str) -> None:
"""
Update the kernel image metadata from all configured docker registries.
Expand All @@ -315,7 +269,7 @@ def rescan_images(cli_ctx: CLIContext, registry) -> None:
@click.argument("target")
@click.argument("architecture")
@click.pass_obj
def alias(cli_ctx, alias, target, architecture) -> None:
def alias(cli_ctx: CLIContext, alias: str, target: str, architecture: str) -> None:
"""Add an image alias from the given alias to the target image reference."""
log.warn("etcd alias command is deprecated, use image alias instead")
asyncio.run(alias_impl(cli_ctx, alias, target, architecture))
Expand All @@ -324,7 +278,7 @@ def alias(cli_ctx, alias, target, architecture) -> None:
@cli.command()
@click.argument("alias")
@click.pass_obj
def dealias(cli_ctx, alias) -> None:
def dealias(cli_ctx: CLIContext, alias: str) -> None:
"""Remove an alias."""
log.warn("etcd dealias command is deprecated, use image dealias instead")
asyncio.run(dealias_impl(cli_ctx, alias))
Expand All @@ -333,7 +287,7 @@ def dealias(cli_ctx, alias) -> None:
@cli.command()
@click.argument("value")
@click.pass_obj
def quote(cli_ctx: CLIContext, value) -> None:
def quote(cli_ctx: CLIContext, value: str) -> None:
"""
Quote the given string for use as a URL piece in etcd keys.
Use this to generate argument inputs for aliases and raw image keys.
Expand All @@ -344,7 +298,7 @@ def quote(cli_ctx: CLIContext, value) -> None:
@cli.command()
@click.argument("value")
@click.pass_obj
def unquote(cli_ctx: CLIContext, value) -> None:
def unquote(cli_ctx: CLIContext, value: str) -> None:
"""
Unquote the given string used as a URL piece in etcd keys.
"""
Expand All @@ -362,7 +316,12 @@ def unquote(cli_ctx: CLIContext, value) -> None:
help="The configuration scope to put the value.",
)
@click.pass_obj
def set_storage_sftp_scaling_group(cli_ctx: CLIContext, proxy, scaling_groups, scope) -> None:
def set_storage_sftp_scaling_group(
cli_ctx: CLIContext,
proxy: str,
scaling_groups: str,
scope: ConfigScopes,
) -> None:
"""
Updates storage proxy node config's SFTP desginated scaling groups.
To enter multiple scaling groups concatenate names with comma(,).
Expand Down Expand Up @@ -392,7 +351,11 @@ async def _impl():
help="The configuration scope to put the value.",
)
@click.pass_obj
def remove_storage_sftp_scaling_group(cli_ctx: CLIContext, proxy, scope) -> None:
def remove_storage_sftp_scaling_group(
cli_ctx: CLIContext,
proxy: str,
scope: ConfigScopes,
) -> None:
"""
Removes storage proxy node config's SFTP desginated scaling groups.
"""
Expand Down
8 changes: 5 additions & 3 deletions src/ai/backend/manager/cli/image.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import asyncio
import logging

Expand Down Expand Up @@ -75,21 +77,21 @@ def set_resource_limit(


@cli.command()
@click.argument("registry", required=False, default="")
@click.argument("registry_or_image", required=False, default="")
@click.option(
"--local",
is_flag=True,
default=False,
help="Scan the local Docker daemon instead of a registry",
)
@click.pass_obj
def rescan(cli_ctx, registry: str, local: bool) -> None:
def rescan(cli_ctx, registry_or_image: str, local: bool) -> None:
"""
Update the kernel image metadata from all configured docker registries.
Pass the name (usually hostname or "lablup") of the Docker registry configured as REGISTRY.
"""
asyncio.run(rescan_images_impl(cli_ctx, registry, local))
asyncio.run(rescan_images_impl(cli_ctx, registry_or_image, local))


@cli.command()
Expand Down
47 changes: 11 additions & 36 deletions src/ai/backend/manager/cli/image_impl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import contextlib
from __future__ import annotations

import logging
from pprint import pformat, pprint
from typing import AsyncIterator

import click
import sqlalchemy as sa
Expand All @@ -10,40 +10,15 @@

from ai.backend.common import redis_helper
from ai.backend.common.docker import ImageRef
from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
from ai.backend.common.exception import UnknownImageReference
from ai.backend.common.logging import BraceStyleAdapter
from ai.backend.manager.cli.context import CLIContext, redis_ctx
from ai.backend.manager.models.image import ImageAliasRow, ImageRow
from ai.backend.manager.models.image import rescan_images as rescan_images_func
from ai.backend.manager.models.utils import connect_database

log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined]

from ..models.image import ImageAliasRow, ImageRow
from ..models.image import rescan_images as rescan_images_func
from ..models.utils import connect_database
from .context import CLIContext, etcd_ctx, redis_ctx

@contextlib.asynccontextmanager
async def etcd_ctx(cli_ctx: CLIContext) -> AsyncIterator[AsyncEtcd]:
local_config = cli_ctx.local_config
creds = None
if local_config["etcd"]["user"]:
creds = {
"user": local_config["etcd"]["user"],
"password": local_config["etcd"]["password"],
}
scope_prefix_map = {
ConfigScopes.GLOBAL: "",
# TODO: provide a way to specify other scope prefixes
}
etcd = AsyncEtcd(
local_config["etcd"]["addr"],
local_config["etcd"]["namespace"],
scope_prefix_map,
credentials=creds,
)
try:
yield etcd
finally:
await etcd.close()
log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined]


async def list_images(cli_ctx, short, installed_only):
Expand Down Expand Up @@ -175,15 +150,15 @@ async def set_image_resource_limit(
log.exception("An error occurred.")


async def rescan_images(cli_ctx: CLIContext, registry: str, local: bool) -> None:
if not registry and not local:
raise click.BadArgumentUsage("Please specify a valid registry name.")
async def rescan_images(cli_ctx: CLIContext, registry_or_image: str, local: bool) -> None:
if not registry_or_image and not local:
raise click.BadArgumentUsage("Please specify a valid registry or full image name.")
async with (
connect_database(cli_ctx.local_config) as db,
etcd_ctx(cli_ctx) as etcd,
):
try:
await rescan_images_func(etcd, db, registry=registry, local=local)
await rescan_images_func(etcd, db, registry_or_image, local=local)
except Exception:
log.exception("An error occurred.")

Expand Down
Loading

0 comments on commit 5f87043

Please sign in to comment.