Skip to content

Commit

Permalink
Merge pull request #502 from Nitrokey/nkpk-update
Browse files Browse the repository at this point in the history
nkpk: Add fetch-update, validate-update commands
  • Loading branch information
robin-nitrokey authored Feb 7, 2024
2 parents 8e11334 + a4b82ce commit 6f6b041
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 163 deletions.
104 changes: 4 additions & 100 deletions pynitrokey/cli/nk3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# http://opensource.org/licenses/MIT>, at your option. This file may not be
# copied, modified, or distributed except according to those terms.

import os.path
import sys
from typing import List, Optional

Expand All @@ -16,26 +15,17 @@
from pynitrokey.cli import trussed
from pynitrokey.cli.exceptions import CliException
from pynitrokey.cli.trussed.test import TestCase
from pynitrokey.helpers import DownloadProgressBar, check_experimental_flag, local_print
from pynitrokey.helpers import check_experimental_flag
from pynitrokey.nk3 import NK3_DATA
from pynitrokey.nk3.bootloader import Nitrokey3Bootloader
from pynitrokey.nk3.device import Nitrokey3Device
from pynitrokey.nk3.updates import REPOSITORY, get_firmware_update
from pynitrokey.trussed.base import NitrokeyTrussedBase
from pynitrokey.trussed.bootloader import (
Device,
FirmwareContainer,
parse_firmware_image,
)
from pynitrokey.updates import OverwriteError
from pynitrokey.trussed.bootloader import Device


class Context(trussed.Context[Nitrokey3Bootloader, Nitrokey3Device]):
def __init__(self, path: Optional[str]) -> None:
super().__init__(path, Nitrokey3Bootloader, Nitrokey3Device) # type: ignore[type-abstract]

@property
def device_name(self) -> str:
return "Nitrokey 3"
super().__init__(path, Nitrokey3Bootloader, Nitrokey3Device, Device.NITROKEY3, NK3_DATA) # type: ignore[type-abstract]

@property
def test_cases(self) -> list[TestCase]:
Expand Down Expand Up @@ -79,92 +69,6 @@ def _list() -> None:
trussed._list(Context(None))


@nk3.command()
@click.argument("path", default=".")
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Overwrite the firmware image if it already exists",
)
@click.option("--version", help="Download this version instead of the latest one")
def fetch_update(path: str, force: bool, version: Optional[str]) -> None:
"""
Fetches a firmware update for the Nitrokey 3 and stores it at the given path.
If no path is given, the firmware image stored in the current working
directory. If the given path is a directory, the image is stored under
that directory. Otherwise it is written to the path. Existing files are
only overwritten if --force is set.
Per default, the latest firmware release is fetched. If you want to
download a specific version, use the --version option.
"""
try:
release = REPOSITORY.get_release_or_latest(version)
update = get_firmware_update(release)
except Exception as e:
if version:
raise CliException(f"Failed to find firmware update {version}", e)
else:
raise CliException("Failed to find latest firmware update", e)

bar = DownloadProgressBar(desc=update.tag)

try:
if os.path.isdir(path):
path = update.download_to_dir(path, overwrite=force, callback=bar.update)
else:
if not force and os.path.exists(path):
raise OverwriteError(path)
with open(path, "wb") as f:
update.download(f, callback=bar.update)

bar.close()

local_print(f"Successfully downloaded firmware release {update.tag} to {path}")
except OverwriteError as e:
raise CliException(
f"{e.path} already exists. Use --force to overwrite the file.",
support_hint=False,
)
except Exception as e:
raise CliException(f"Failed to download firmware update {update.tag}", e)


@nk3.command()
@click.argument("image", type=click.Path(exists=True, dir_okay=False))
def validate_update(image: str) -> None:
"""
Validates the given firmware image and prints the firmware version and the signer for all
available variants.
"""
container = FirmwareContainer.parse(image, Device.NITROKEY3)
print(f"version: {container.version}")
if container.pynitrokey:
print(f"pynitrokey: >= {container.pynitrokey}")

for variant in container.images:
data = container.images[variant]
try:
metadata = parse_firmware_image(variant, data)
except Exception as e:
raise CliException("Failed to parse and validate firmware image", e)

signed_by = metadata.signed_by or "unsigned"

print(f"variant: {variant.value}")
print(f" version: {metadata.version}")
print(f" signed by: {signed_by}")

if container.version != metadata.version:
raise CliException(
f"The firmware image for the {variant} variant and the release "
f"{container.version} has an unexpected product version ({metadata.version})."
)


@nk3.command()
@click.argument("image", type=click.Path(exists=True, dir_okay=False), required=False)
@click.option(
Expand Down
13 changes: 11 additions & 2 deletions pynitrokey/cli/nkpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,31 @@
# http://opensource.org/licenses/MIT>, at your option. This file may not be
# copied, modified, or distributed except according to those terms.

import re
from typing import Optional, Sequence

import click

from pynitrokey.cli.trussed.test import TestCase
from pynitrokey.helpers import local_print
from pynitrokey.nkpk import NitrokeyPasskeyBootloader, NitrokeyPasskeyDevice
from pynitrokey.nkpk import NKPK_DATA, NitrokeyPasskeyBootloader, NitrokeyPasskeyDevice
from pynitrokey.trussed.base import NitrokeyTrussedBase
from pynitrokey.trussed.bootloader import Device
from pynitrokey.trussed.device import NitrokeyTrussedDevice
from pynitrokey.updates import Repository

from . import trussed


class Context(trussed.Context[NitrokeyPasskeyBootloader, NitrokeyPasskeyDevice]):
def __init__(self, path: Optional[str]) -> None:
super().__init__(path, NitrokeyPasskeyBootloader, NitrokeyPasskeyDevice)
super().__init__(
path,
NitrokeyPasskeyBootloader,
NitrokeyPasskeyDevice,
Device.NITROKEY_PASSKEY,
NKPK_DATA,
)

@property
def test_cases(self) -> list[TestCase]:
Expand Down
137 changes: 123 additions & 14 deletions pynitrokey/cli/trussed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
# copied, modified, or distributed except according to those terms.

import logging
import os.path
from abc import ABC, abstractmethod
from hashlib import sha256
from re import Pattern
from typing import BinaryIO, Callable, Generic, Optional, Sequence, TypeVar

import click
Expand All @@ -19,13 +21,25 @@
from ecdsa import NIST256p, SigningKey

from pynitrokey.cli.exceptions import CliException
from pynitrokey.helpers import Retries, local_print, require_windows_admin
from pynitrokey.helpers import (
DownloadProgressBar,
Retries,
local_print,
require_windows_admin,
)
from pynitrokey.trussed import DeviceData
from pynitrokey.trussed.admin_app import BootMode
from pynitrokey.trussed.base import NitrokeyTrussedBase
from pynitrokey.trussed.bootloader import NitrokeyTrussedBootloader
from pynitrokey.trussed.bootloader import Device as BootloaderDevice
from pynitrokey.trussed.bootloader import (
FirmwareContainer,
NitrokeyTrussedBootloader,
parse_firmware_image,
)
from pynitrokey.trussed.device import NitrokeyTrussedDevice
from pynitrokey.trussed.exceptions import TimeoutException
from pynitrokey.trussed.provisioner_app import ProvisionerApp
from pynitrokey.updates import OverwriteError, Repository

from .test import TestCase

Expand All @@ -42,15 +56,14 @@ def __init__(
path: Optional[str],
bootloader_type: type[Bootloader],
device_type: type[Device],
bootloader_device: BootloaderDevice,
data: DeviceData,
) -> None:
self.path = path
self.bootloader_type = bootloader_type
self.device_type = device_type

@property
@abstractmethod
def device_name(self) -> str:
...
self.bootloader_device = bootloader_device
self.data = data

@property
@abstractmethod
Expand All @@ -76,20 +89,20 @@ def list(self) -> Sequence[NitrokeyTrussedBase]:
return self.list_all()

def connect(self) -> NitrokeyTrussedBase:
return self._select_unique(self.device_name, self.list())
return self._select_unique(self.data.name, self.list())

def connect_device(self) -> Device:
devices = [
device for device in self.list() if isinstance(device, self.device_type)
]
return self._select_unique(self.device_name, devices)
return self._select_unique(self.data.name, devices)

def await_device(
self,
retries: Optional[int] = None,
callback: Optional[Callable[[int, int], None]] = None,
) -> Device:
return self._await(self.device_name, self.device_type, retries, callback)
return self._await(self.data.name, self.device_type, retries, callback)

def await_bootloader(
self,
Expand All @@ -98,7 +111,7 @@ def await_bootloader(
) -> Bootloader:
# mypy does not allow abstract types here, but this is still valid
return self._await(
f"{self.device_name} bootloader", self.bootloader_type, retries, callback
f"{self.data.name} bootloader", self.bootloader_type, retries, callback
)

def _select_unique(self, name: str, devices: Sequence[T]) -> T:
Expand Down Expand Up @@ -146,15 +159,74 @@ def prepare_group() -> None:


def add_commands(group: click.Group) -> None:
group.add_command(fetch_update)
group.add_command(list)
group.add_command(provision)
group.add_command(reboot)
group.add_command(rng)
group.add_command(status)
group.add_command(test)
group.add_command(validate_update)
group.add_command(version)


@click.command()
@click.argument("path", default=".")
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Overwrite the firmware image if it already exists",
)
@click.option("--version", help="Download this version instead of the latest one")
@click.pass_obj
def fetch_update(
ctx: Context[Bootloader, Device], path: str, force: bool, version: Optional[str]
) -> None:
"""
Fetches a firmware update and stores it at the given path.
If no path is given, the firmware image stored in the current working
directory. If the given path is a directory, the image is stored under
that directory. Otherwise it is written to the path. Existing files are
only overwritten if --force is set.
Per default, the latest firmware release is fetched. If you want to
download a specific version, use the --version option.
"""
try:
release = ctx.data.firmware_repository.get_release_or_latest(version)
update = release.require_asset(ctx.data.firmware_pattern)
except Exception as e:
if version:
raise CliException(f"Failed to find firmware update {version}", e)
else:
raise CliException("Failed to find latest firmware update", e)

bar = DownloadProgressBar(desc=update.tag)

try:
if os.path.isdir(path):
path = update.download_to_dir(path, overwrite=force, callback=bar.update)
else:
if not force and os.path.exists(path):
raise OverwriteError(path)
with open(path, "wb") as f:
update.download(f, callback=bar.update)

bar.close()

local_print(f"Successfully downloaded firmware release {update.tag} to {path}")
except OverwriteError as e:
raise CliException(
f"{e.path} already exists. Use --force to overwrite the file.",
support_hint=False,
)
except Exception as e:
raise CliException(f"Failed to download firmware update {update.tag}", e)


@click.command()
@click.pass_obj
def list(ctx: Context[Bootloader, Device]) -> None:
Expand All @@ -163,7 +235,7 @@ def list(ctx: Context[Bootloader, Device]) -> None:


def _list(ctx: Context[Bootloader, Device]) -> None:
local_print(f":: '{ctx.device_name}' keys")
local_print(f":: '{ctx.data.name}' keys")
for device in ctx.list_all():
with device as device:
uuid = device.uuid()
Expand Down Expand Up @@ -432,9 +504,9 @@ def test(

if len(devices) == 0:
log_devices()
raise CliException(f"No connected {ctx.device_name} devices found")
raise CliException(f"No connected {ctx.data.name} devices found")

local_print(f"Found {len(devices)} {ctx.device_name} device(s):")
local_print(f"Found {len(devices)} {ctx.data.name} device(s):")
for device in devices:
local_print(f"- {device.name} at {device.path}")

Expand Down Expand Up @@ -463,6 +535,43 @@ def test(
raise CliException(f"Test failed for {failure} device(s)")


@click.command()
@click.argument("image", type=click.Path(exists=True, dir_okay=False))
@click.pass_obj
def validate_update(ctx: Context[Bootloader, Device], image: str) -> None:
"""
Validates the given firmware image and prints the firmware version and the signer for all
available variants.
"""
try:
container = FirmwareContainer.parse(image, ctx.bootloader_device)
except ValueError as e:
raise CliException("Failed to validate firmware image", e, support_hint=False)

print(f"version: {container.version}")
if container.pynitrokey:
print(f"pynitrokey: >= {container.pynitrokey}")

for variant in container.images:
data = container.images[variant]
try:
metadata = parse_firmware_image(variant, data, ctx.data)
except Exception as e:
raise CliException("Failed to parse and validate firmware image", e)

signed_by = metadata.signed_by or "unsigned"

print(f"variant: {variant.value}")
print(f" version: {metadata.version}")
print(f" signed by: {signed_by}")

if container.version != metadata.version:
raise CliException(
f"The firmware image for the {variant} variant and the release "
f"{container.version} has an unexpected product version ({metadata.version})."
)


@click.command()
@click.pass_obj
def version(ctx: Context[Bootloader, Device]) -> None:
Expand Down
Loading

0 comments on commit 6f6b041

Please sign in to comment.