From 8b5b9e8a56047692cd8b230bcb01ba215ef7e25a Mon Sep 17 00:00:00 2001 From: Robin Krahl Date: Tue, 14 Nov 2023 10:17:46 +0100 Subject: [PATCH] Update nethsm dependency to 0.5.0 This update pulls in multiple relevant changes, mainly: - The auto-generated client is only loaded if required. This improves execution times significantly, especially for simple commands. - Key certificates are now always stored as raw bytes, not as strings, so the get-certificate and set-certificate commands had to be updated. - Changing the backup and unlock passphrases now requires passing the current passphrase. - Using enums instead of strings. - Using stronger types and dataclasses for some methods. - NetHSM API changes that are not visible in the public Python API. On my machine, this reduces the execution time for simple commands like version or --help from 0.9 s to 0.3 s. --- pynitrokey/cli/nethsm.py | 436 +++++++++++++++++++++++---------------- pyproject.toml | 3 +- 2 files changed, 264 insertions(+), 175 deletions(-) diff --git a/pynitrokey/cli/nethsm.py b/pynitrokey/cli/nethsm.py index 30aaf407..a28a7952 100644 --- a/pynitrokey/cli/nethsm.py +++ b/pynitrokey/cli/nethsm.py @@ -14,16 +14,26 @@ import mimetypes import os.path import sys +from dataclasses import dataclass +from enum import Enum +from typing import Any, Iterable, Iterator, Optional, Protocol, Sequence import click import nethsm as nethsm_sdk +from click import Context +from nethsm import Authentication, NetHSM from nethsm.backup import EncryptedBackup from pynitrokey.cli.exceptions import CliException from pynitrokey.helpers import prompt -def make_enum_type(enum_cls): +class EnumMeta(Protocol): + def __iter__(self) -> Iterator[Enum]: + ... + + +def make_enum_type(enum_cls: EnumMeta) -> click.Choice: return click.Choice([variant.value for variant in enum_cls], case_sensitive=False) @@ -47,35 +57,47 @@ def make_enum_type(enum_cls): SIGN_MODE_TYPE = make_enum_type(nethsm_sdk.SignMode) -def print_row(values, widths): +def prompt_str( + msg: str, default: Optional[str] = None, hide_input: bool = False +) -> str: + value = prompt(msg, default=default, hide_input=hide_input) + assert isinstance(value, str) + return value + + +def print_row(values: Iterable[str], widths: Iterable[int]) -> None: row = [value.ljust(width) for (value, width) in zip(values, widths)] print(*row, sep="\t") -def print_table(headers, data): +def print_table(headers: Sequence[str], data: Iterable[Sequence[Any]]) -> None: widths = [len(header) for header in headers] + str_data = [] for row in data: + str_row = [] for i in range(len(widths)): - row[i] = str(row[i]) + str_row.append(str(row[i])) widths[i] = max(widths[i], len(row[i])) + str_data.append(str_row) print_row(headers, widths) print_row(["-" * width for width in widths], widths) - for row in data: + for row in str_data: print_row(row, widths) +@dataclass +class Config: + host: str + username: Optional[str] + password: Optional[str] + verify_tls: bool + + @click.group() @click.option( "-h", "--host", "host", required=True, help="Set the host of the NetHSM API" ) -@click.option( - "-v", - "--api-version", - "version", - default="v1", - help="Set the version of the NetHSM API", -) @click.option("-u", "--username", "username", help="The NetHSM user name") @click.option("-p", "--password", "password", help="The NetHSM password") @click.option( @@ -84,40 +106,44 @@ def print_table(headers, data): help="Whether to verify the TLS certificate of the NetHSM", ) @click.pass_context -def nethsm(ctx, host, version, username, password, verify_tls): +def nethsm( + ctx: Context, + host: str, + username: Optional[str], + password: Optional[str], + verify_tls: bool, +) -> None: """Interact with NetHSM devices, see subcommands.""" - ctx.ensure_object(dict) - - ctx.obj["NETHSM_HOST"] = host - ctx.obj["NETHSM_VERSION"] = version - ctx.obj["NETHSM_USERNAME"] = username - ctx.obj["NETHSM_PASSWORD"] = password - ctx.obj["NETHSM_VERIFY_TLS"] = verify_tls + ctx.obj = Config( + host=host, username=username, password=password, verify_tls=verify_tls + ) @contextlib.contextmanager -def connect(ctx, require_auth=True): - host = ctx.obj["NETHSM_HOST"] - version = ctx.obj["NETHSM_VERSION"] - username = None - password = None - verify_tls = ctx.obj["NETHSM_VERIFY_TLS"] +def connect(ctx: Context, require_auth: bool = True) -> Iterator[NetHSM]: + config = ctx.obj + assert isinstance(config, Config) + auth = None if require_auth: - username = ctx.obj["NETHSM_USERNAME"] - password = ctx.obj["NETHSM_PASSWORD"] + username = config.username + password = config.password if not username: - username = prompt(f"[auth] User name for NetHSM {host}") + username = prompt_str(f"[auth] User name for NetHSM {config.host}") if not password: - password = prompt( - f"[auth] Password for user {username} on NetHSM {host}", hide_input=True + password = prompt_str( + f"[auth] Password for user {username} on NetHSM {config.host}", + hide_input=True, ) + auth = Authentication(username=username, password=password) - with nethsm_sdk.connect(host, version, username, password, verify_tls) as nethsm: + with nethsm_sdk.connect( + config.host, auth=auth, verify_tls=config.verify_tls + ) as nethsm: try: yield nethsm except nethsm_sdk.NetHSMError as e: - raise click.ClickException(e) + raise click.ClickException(f"NetHSM request failed: {e}") except nethsm_sdk.NetHSMRequestError as e: if e.type == nethsm_sdk.RequestErrorType.SSL_ERROR: raise click.ClickException( @@ -132,11 +158,11 @@ def connect(ctx, require_auth=True): @nethsm.command() @click.argument("passphrase", required=False) @click.pass_context -def unlock(ctx, passphrase): +def unlock(ctx: Context, passphrase: Optional[str]) -> None: """Bring a locked NetHSM into operational state.""" with connect(ctx, require_auth=False) as nethsm: if not passphrase: - passphrase = prompt( + passphrase = prompt_str( f"Unlock passphrase for NetHSM {nethsm.host}", hide_input=True ) nethsm.unlock(passphrase) @@ -145,7 +171,7 @@ def unlock(ctx, passphrase): @nethsm.command() @click.pass_context -def lock(ctx): +def lock(ctx: Context) -> None: """Bring an operational NetHSM into locked state. This command requires authentication as a user with the Administrator @@ -179,7 +205,12 @@ def lock(ctx): help="The system time to set (default: the time of this system)", ) @click.pass_context -def provision(ctx, unlock_passphrase, admin_passphrase, system_time): +def provision( + ctx: Context, + unlock_passphrase: str, + admin_passphrase: str, + system_time: Optional[datetime.datetime], +) -> None: """Initial provisioning of a NetHSM. If the unlock or admin passphrases are not set, they have to be entered @@ -199,7 +230,7 @@ def provision(ctx, unlock_passphrase, admin_passphrase, system_time): help="Also query the real name and role of the user", ) @click.pass_context -def list_users(ctx, details): +def list_users(ctx: Context, details: bool) -> None: """List all users on the NetHSM. This command requires authentication as a user with the Administrator @@ -226,7 +257,7 @@ def list_users(ctx, details): @nethsm.command() @click.argument("user-id") @click.pass_context -def get_user(ctx, user_id): +def get_user(ctx: Context, user_id: str) -> None: """Query the real name and role for a user ID on the NetHSM. This command requires authentication as a user with the Administrator or @@ -253,7 +284,9 @@ def get_user(ctx, user_id): ) @click.option("-u", "--user-id", help="The user ID of the new user") @click.pass_context -def add_user(ctx, real_name, role, passphrase, user_id): +def add_user( + ctx: Context, real_name: str, role: str, passphrase: str, user_id: Optional[str] +) -> None: """Create a new user on the NetHSM. If the real name, role or passphrase are not specified, they have to be @@ -263,14 +296,16 @@ def add_user(ctx, real_name, role, passphrase, user_id): This command requires authentication as a user with the Administrator role.""" with connect(ctx) as nethsm: - user_id = nethsm.add_user(real_name, role, passphrase, user_id) + user_id = nethsm.add_user( + real_name, nethsm_sdk.Role.from_string(role), passphrase, user_id + ) print(f"User {user_id} added to NetHSM {nethsm.host}") @nethsm.command() @click.argument("user-id") @click.pass_context -def delete_user(ctx, user_id): +def delete_user(ctx: Context, user_id: str) -> None: """Delete the user with the given user ID on the NetHSM. This command requires authentication as a user with the Administrator @@ -291,7 +326,7 @@ def delete_user(ctx, user_id): help="The new passphrase of the user", ) @click.pass_context -def set_passphrase(ctx, user_id, passphrase): +def set_passphrase(ctx: Context, user_id: Optional[str], passphrase: str) -> None: """Set the passphrase for the user with the given ID (or the current user). This command requires authentication as a user with the Administrator or @@ -299,7 +334,8 @@ def set_passphrase(ctx, user_id, passphrase): passphrase.""" with connect(ctx) as nethsm: if not user_id: - user_id = nethsm.username + assert nethsm.auth is not None + user_id = nethsm.auth.username nethsm.set_passphrase(user_id, passphrase) print(f"Updated the passphrase for user {user_id} on NetHSM {nethsm.host}") @@ -307,7 +343,7 @@ def set_passphrase(ctx, user_id, passphrase): @nethsm.command() @click.argument("user-id") @click.pass_context -def list_operator_tags(ctx, user_id): +def list_operator_tags(ctx: Context, user_id: str) -> None: """List the tags for an operator user ID on the NetHSM. This command requires authentication as a user with the Administrator role.""" @@ -325,7 +361,7 @@ def list_operator_tags(ctx, user_id): @click.argument("user-id") @click.argument("tag") @click.pass_context -def add_operator_tag(ctx, user_id, tag): +def add_operator_tag(ctx: Context, user_id: str, tag: str) -> None: """Add a tag for an operator user on the NetHSM. This command requires authentication as a user with the Administrator @@ -339,7 +375,7 @@ def add_operator_tag(ctx, user_id, tag): @click.argument("user-id") @click.argument("tag") @click.pass_context -def delete_operator_tag(ctx, user_id, tag): +def delete_operator_tag(ctx: Context, user_id: str, tag: str) -> None: """Delete a tag for an operator user on the NetHSM. This command requires authentication as a user with the Administrator @@ -353,7 +389,7 @@ def delete_operator_tag(ctx, user_id, tag): @click.argument("key_id") @click.argument("tag") @click.pass_context -def add_key_tag(ctx, key_id, tag): +def add_key_tag(ctx: Context, key_id: str, tag: str) -> None: """Add a tag for a key on the NetHSM. This command requires authentication as a user with the Administrator @@ -367,7 +403,7 @@ def add_key_tag(ctx, key_id, tag): @click.argument("key_id") @click.argument("tag") @click.pass_context -def delete_key_tag(ctx, key_id, tag): +def delete_key_tag(ctx: Context, key_id: str, tag: str) -> None: """Delete a tag for a key on the NetHSM. This command requires authentication as a user with the Administrator @@ -379,18 +415,18 @@ def delete_key_tag(ctx, key_id, tag): @nethsm.command() @click.pass_context -def info(ctx): +def info(ctx: Context) -> None: """Query the vendor and product information for a NetHSM.""" with connect(ctx, require_auth=False) as nethsm: - (vendor, product) = nethsm.get_info() + info = nethsm.get_info() print(f"Host: {nethsm.host}") - print(f"Vendor: {vendor}") - print(f"Product: {product}") + print(f"Vendor: {info.vendor}") + print(f"Product: {info.product}") @nethsm.command() @click.pass_context -def state(ctx): +def state(ctx: Context) -> None: """Query the state of a NetHSM.""" with connect(ctx, require_auth=False) as nethsm: state = nethsm.get_state() @@ -400,7 +436,7 @@ def state(ctx): @nethsm.command() @click.argument("length", type=int) @click.pass_context -def random(ctx, length): +def random(ctx: Context, length: int) -> None: """Retrieve random bytes from the NetHSM as a Base64 string. This command requires authentication as a user with the Operator role.""" @@ -410,7 +446,7 @@ def random(ctx, length): @nethsm.command() @click.pass_context -def metrics(ctx): +def metrics(ctx: Context) -> None: """Query the metrics of a NetHSM. This command requires authentication as a user with the Metrics role.""" @@ -433,7 +469,7 @@ def metrics(ctx): help="Filter keys by tags for respective user", ) @click.pass_context -def list_keys(ctx, details, filter): +def list_keys(ctx: Context, details: bool, filter: Optional[str]) -> None: """List all keys on the NetHSM. This command requires authentication as a user with the Administrator or @@ -454,7 +490,7 @@ def list_keys(ctx, details, filter): [ key_id, key.type, - ", ".join(key.mechanisms), + ", ".join([str(m) for m in key.mechanisms]), key.operations, ", ".join(key.tags) if key.tags is not None else "", ] @@ -469,7 +505,7 @@ def list_keys(ctx, details, filter): @click.argument("key_id") @click.option("--public-key", is_flag=True, help="Query the public key as a PEM file") @click.pass_context -def get_key(ctx, key_id, public_key): +def get_key(ctx: Context, key_id: str, public_key: bool) -> None: """Get information about a key on the NetHSM. This command requires authentication as a user with the Administrator or @@ -479,7 +515,7 @@ def get_key(ctx, key_id, public_key): print(nethsm.get_key_public_key(key_id)) else: key = nethsm.get_key(key_id) - mechanisms = ", ".join(key.mechanisms) + mechanisms = ", ".join([str(m) for m in key.mechanisms]) print(f"Key {key_id} on NetHSM {nethsm.host}:") print(f"Type: {key.type}") print(f"Mechanisms: {mechanisms}") @@ -487,18 +523,20 @@ def get_key(ctx, key_id, public_key): if key.tags: tags = ", ".join(key.tags) print(f"Tags: {tags}") - if key.modulus: - print(f"Modulus: {key.modulus}") - if key.public_exponent: - print(f"Public exponent: {key.public_exponent}") - if key.data: - print(f"Data: {key.data}") + + if isinstance(key.public_key, nethsm_sdk.RsaPublicKey): + print(f"Modulus: {key.public_key.modulus}") + print(f"Public exponent: {key.public_key.public_exponent}") + elif isinstance(key.public_key, nethsm_sdk.EcPublicKey): + print(f"Data: {key.public_key.data}") + elif key.public_key is not None: + print(f"Public key: {key.public_key}") @nethsm.command() @click.argument("key-id") @click.pass_context -def delete_key(ctx, key_id): +def delete_key(ctx: Context, key_id: str) -> None: """Delete the key pair with the given key ID on the NetHSM. This command requires authentication as a user with the Administrator @@ -508,7 +546,7 @@ def delete_key(ctx, key_id): print(f"Key {key_id} deleted on NetHSM {nethsm.host}") -def prompt_mechanisms(type): +def prompt_mechanisms(type: str) -> list[str]: # We assume that key type X corresponds to the mechanisms starting with X. # This is no longer true for curves, so we have to adapt the type if type == nethsm_sdk.KeyType.CURVE25519.value: @@ -535,7 +573,7 @@ def prompt_mechanisms(type): ) mechanism_type = click.Choice(available_mechanisms, case_sensitive=False) - mechanisms = [] + mechanisms: list[str] = [] cont = True while cont: default = None @@ -550,6 +588,7 @@ def prompt_mechanisms(type): show_choices=False, show_default=False, ) + assert isinstance(mechanism, str) if mechanism: mechanisms.append(mechanism) else: @@ -610,25 +649,38 @@ def prompt_mechanisms(type): ) @click.pass_context def add_key( - ctx, type, mechanisms, tags, prime_p, prime_q, public_exponent, data, key_id -): + ctx: Context, + type: str, + mechanisms: list[str], + tags: list[str], + prime_p: Optional[str], + prime_q: Optional[str], + public_exponent: Optional[str], + data: Optional[str], + key_id: Optional[str], +) -> None: """Add a key pair on the NetHSM. If the key ID is not set, it is generated by the NetHSM. This command requires authentication as a user with the Administrator role.""" + key_type = nethsm_sdk.KeyType.from_string(type) mechanisms = list(mechanisms) or prompt_mechanisms(type) - if type == "RSA": + private_key: nethsm_sdk.PrivateKey + if key_type == nethsm_sdk.KeyType.RSA: if data: raise click.ClickException("-d/--data must not be set for RSA keys") if not prime_p: - prime_p = prompt("Prime p") + prime_p = prompt_str("Prime p") if not prime_q: - prime_q = prompt("Prime q") + prime_q = prompt_str("Prime q") if not public_exponent: - public_exponent = prompt("Public exponent") + public_exponent = prompt_str("Public exponent") + private_key = nethsm_sdk.RsaPrivateKey( + prime_p=prime_p, prime_q=prime_q, public_exponent=public_exponent + ) else: if prime_p: raise click.ClickException("-p/--prime-p may only be set for RSA keys") @@ -639,18 +691,16 @@ def add_key( "-e/--public-exponent may only be set for RSA keys" ) if not data: - data = prompt("Key data") + data = prompt_str("Key data") + private_key = nethsm_sdk.GenericPrivateKey(data=data) with connect(ctx) as nethsm: key_id = nethsm.add_key( key_id=key_id, - type=type, - mechanisms=mechanisms, + type=key_type, + mechanisms=[nethsm_sdk.KeyMechanism.from_string(m) for m in mechanisms], tags=tags, - prime_p=prime_p, - prime_q=prime_q, - public_exponent=public_exponent, - data=data, + private_key=private_key, ) print(f"Key {key_id} added to NetHSM {nethsm.host}") @@ -685,14 +735,21 @@ def add_key( help="The ID of the generated key", ) @click.pass_context -def generate_key(ctx, type, mechanisms, length, key_id): +def generate_key( + ctx: Context, type: str, mechanisms: list[str], length: int, key_id: Optional[str] +) -> None: """Generate a key pair on the NetHSM. This command requires authentication as a user with the Administrator role.""" mechanisms = list(mechanisms) or prompt_mechanisms(type) with connect(ctx) as nethsm: - key_id = nethsm.generate_key(type, mechanisms, length, key_id) + key_id = nethsm.generate_key( + nethsm_sdk.KeyType.from_string(type), + [nethsm_sdk.KeyMechanism.from_string(m) for m in mechanisms], + length, + key_id, + ) print(f"Key {key_id} generated on NetHSM {nethsm.host}") @@ -706,7 +763,7 @@ def generate_key(ctx, type, mechanisms, length, key_id): @click.option("--public-key", is_flag=True, help="Query the public key") @click.option("--certificate", is_flag=True, help="Query the certificate") @click.pass_context -def get_config(ctx, **kwargs): +def get_config(ctx: Context, **kwargs: bool) -> None: """Query the configuration of a NetHSM. Only the configuration items selected with the corresponding option are @@ -719,18 +776,18 @@ def get_config(ctx, **kwargs): show_all = not any(kwargs.values()) if show_all or kwargs["logging"]: - data = nethsm.get_config_logging() + logging = nethsm.get_config_logging() print(" Logging:") - print(" IP address: ", data.ipAddress) - print(" Port: ", data.port) - print(" Log level: ", data.logLevel) + print(" IP address: ", logging.ip_address) + print(" Port: ", logging.port) + print(" Log level: ", logging.log_level) if show_all or kwargs["network"]: - data = nethsm.get_config_network() + network = nethsm.get_config_network() print(" Network:") - print(" IP address: ", data.ipAddress) - print(" Netmask: ", data.netmask) - print(" Gateway: ", data.gateway) + print(" IP address: ", network.ip_address) + print(" Netmask: ", network.netmask) + print(" Gateway: ", network.gateway) if show_all or kwargs["time"]: time = nethsm.get_config_time() @@ -755,41 +812,72 @@ def get_config(ctx, **kwargs): @nethsm.command() @click.option( - "-p", - "--passphrase", + "-n", + "--new-passphrase", hide_input=True, confirmation_prompt=True, prompt=True, help="The new backup passphrase", ) +@click.option( + "-p", + "--current-passphrase", + help="The current backup passphrase (or an empty string if not set)", +) @click.pass_context -def set_backup_passphrase(ctx, passphrase): +def set_backup_passphrase( + ctx: Context, new_passphrase: str, current_passphrase: Optional[str] +) -> None: """Set the backup passphrase of a NetHSM. + Changing the backup passphrase requires the current passphrase (if set, + empty string otherwise). + This command requires authentication as a user with the Administrator role.""" + if not current_passphrase: + current_passphrase = prompt_str( + "The current backup passphrase (or an empty string if not set)", + hide_input=True, + default="", + ) with connect(ctx) as nethsm: - nethsm.set_backup_passphrase(passphrase) + nethsm.set_backup_passphrase( + new_passphrase=new_passphrase, current_passphrase=current_passphrase + ) print(f"Updated the backup passphrase for NetHSM {nethsm.host}") @nethsm.command() @click.option( - "-p", - "--passphrase", + "-n", + "--new-passphrase", hide_input=True, confirmation_prompt=True, prompt=True, help="The new unlock passphrase", ) +@click.option( + "-p", + "--current-passphrase", + hide_input=True, + prompt=True, + help="The current unlock passphrase", +) @click.pass_context -def set_unlock_passphrase(ctx, passphrase): +def set_unlock_passphrase( + ctx: Context, new_passphrase: str, current_passphrase: str +) -> None: """Set the unlock passphrase of a NetHSM. + Changing the unlock passphrase requires the current passphrase. + This command requires authentication as a user with the Administrator role.""" with connect(ctx) as nethsm: - nethsm.set_unlock_passphrase(passphrase) + nethsm.set_unlock_passphrase( + new_passphrase=new_passphrase, current_passphrase=current_passphrase + ) print(f"Updated the unlock passphrase for NetHSM {nethsm.host}") @@ -815,13 +903,17 @@ def set_unlock_passphrase(ctx, passphrase): required=True, ) @click.pass_context -def set_logging_config(ctx, ip_address, port, log_level): +def set_logging_config( + ctx: Context, ip_address: str, port: int, log_level: str +) -> None: """Set the logging configuration of a NetHSM. This command requires authentication as a user with the Administrator role.""" with connect(ctx) as nethsm: - nethsm.set_logging_config(ip_address, port, log_level) + nethsm.set_logging_config( + ip_address, port, nethsm_sdk.LogLevel.from_string(log_level) + ) print(f"Updated the logging configuration for NetHSM {nethsm.host}") @@ -845,7 +937,9 @@ def set_logging_config(ctx, ip_address, port, log_level): required=True, ) @click.pass_context -def set_network_config(ctx, ip_address, netmask, gateway): +def set_network_config( + ctx: Context, ip_address: str, netmask: str, gateway: str +) -> None: """Set the network configuration of a NetHSM. This command requires authentication as a user with the Administrator @@ -862,7 +956,7 @@ def set_network_config(ctx, ip_address, netmask, gateway): required=False, ) @click.pass_context -def set_time(ctx, time): +def set_time(ctx: Context, time: Optional[datetime.datetime]) -> None: """Set the system time of a NetHSM. If the time is not given as an argument, the system time of this system is used. @@ -882,17 +976,17 @@ def set_time(ctx, time): type=UNATTENDED_BOOT_STATUS_TYPE, ) @click.pass_context -def set_unattended_boot(ctx, status): +def set_unattended_boot(ctx: Context, status: str) -> None: """Set the unattended boot configuration of a NetHSM. This command requires authentication as a user with the Administrator role.""" with connect(ctx) as nethsm: - nethsm.set_unattended_boot(status) + nethsm.set_unattended_boot(nethsm_sdk.UnattendedBootStatus.from_string(status)) print(f"Updated the unattended boot configuration for NetHSM {nethsm.host}") -def get_api_or_key_id(api, key_id): +def get_api_or_key_id(api: bool, key_id: Optional[str]) -> tuple[bool, Optional[str]]: """Helper method for operations that can be executed either for the API certificate or for the certificate stored for a key.""" if api and key_id: @@ -919,15 +1013,11 @@ def get_api_or_key_id(api, key_id): "-a", "--api", is_flag=True, help="Set the certificate for the NetHSM TLS interface" ) @click.option("-k", "--key-id", help="The ID of the key to set the certificate for") -@click.option( - "-m", - "--mime-type", - type=click.Choice(KEY_CERTIFICATE_MIME_TYPES), - help="The MIME type of the certificate (only with --key-id)", -) @click.argument("filename") @click.pass_context -def set_certificate(ctx, api, key_id, mime_type, filename): +def set_certificate( + ctx: Context, api: bool, key_id: Optional[str], filename: str +) -> None: """Set a certificate on the NetHSM. If the --api option is set, the certificate used for the NetHSM TLS interface @@ -940,25 +1030,11 @@ def set_certificate(ctx, api, key_id, mime_type, filename): with connect(ctx) as nethsm: with open(filename, "rb") as f: if key_id: - if not mime_type: - (mime_type, _) = mimetypes.guess_type(filename) - if not mime_type: - raise click.ClickException( - f"Failed to detect MIME type for {filename}. Use --mime-type to " - "explicitly set the MIME type." - ) - if mime_type not in KEY_CERTIFICATE_MIME_TYPES: - raise click.ClickException( - f"Unsupported certificate mime type {mime_type} detected for " - f"{filename}" - ) - nethsm.set_key_certificate(key_id, f, mime_type) + nethsm.set_key_certificate(key_id, f) print( f"Updated the certificate for key {key_id} on NetHSM {nethsm.host}" ) else: - if mime_type: - raise click.ClickException("--mime-type cannot be used with --api") nethsm.set_certificate(f) print(f"Updated the API certificate for NetHSM {nethsm.host}") @@ -969,7 +1045,7 @@ def set_certificate(ctx, api, key_id, mime_type, filename): ) @click.option("-k", "--key-id", help="The ID of the key to get the certificate for") @click.pass_context -def get_certificate(ctx, api, key_id): +def get_certificate(ctx: Context, api: bool, key_id: Optional[str]) -> None: """Get a certificate from the NetHSM. If the --api option is set, the certificate used for the NetHSM TLS interface @@ -983,9 +1059,12 @@ def get_certificate(ctx, api, key_id): with connect(ctx) as nethsm: if key_id: cert = nethsm.get_key_certificate(key_id) + try: + print(cert.decode()) + except UnicodeError: + print(cert) else: - cert = nethsm.get_certificate() - print(cert) + print(nethsm.get_certificate()) @nethsm.command() @@ -996,7 +1075,7 @@ def get_certificate(ctx, api, key_id): help="The ID of the key to delete the certificate for", ) @click.pass_context -def delete_certificate(ctx, key_id): +def delete_certificate(ctx: Context, key_id: str) -> None: """Delete a certificate for a stored key from the NetHSM. This command requires authentication as a user with the Administrator @@ -1024,17 +1103,17 @@ def delete_certificate(ctx, key_id): @click.option("--email-address", default="", prompt=True, help="The email address") @click.pass_context def csr( - ctx, - api, - key_id, - country, - state_or_province, - locality, - organization, - organizational_unit, - common_name, - email_address, -): + ctx: Context, + api: bool, + key_id: Optional[str], + country: str, + state_or_province: str, + locality: str, + organization: str, + organizational_unit: str, + common_name: str, + email_address: str, +) -> None: """Generate a certificate signing request. If the --api option is set, the CSR is generated for the NetHSM, for @@ -1085,12 +1164,13 @@ def csr( help="The length of the generated key", ) @click.pass_context -def generate_tls_key(ctx, type, length): +def generate_tls_key(ctx: Context, type: str, length: Optional[int]) -> None: """Generate key pair for NetHSM TLS interface. This command requires authentication as a user with the Administrator role.""" - if type == "RSA": + key_type = nethsm_sdk.TlsKeyType.from_string(type) + if key_type == nethsm_sdk.TlsKeyType.RSA: if not length: length = click.prompt("Length", type=int) else: @@ -1098,13 +1178,13 @@ def generate_tls_key(ctx, type, length): raise click.ClickException("-l/--length may only be set for RSA keys") with connect(ctx) as nethsm: - nethsm.generate_tls_key(type, length) + nethsm.generate_tls_key(key_type, length) print(f"Key for TLS interface generated on NetHSM {nethsm.host}") @nethsm.command() @click.pass_context -def system_info(ctx): +def system_info(ctx: Context) -> None: """Get system information for a NetHSM instance. This command requires authentication as a user with the Administrator @@ -1121,7 +1201,7 @@ def system_info(ctx): @nethsm.command() @click.argument("filename") @click.pass_context -def backup(ctx, filename): +def backup(ctx: Context, filename: str) -> None: """Make a backup of a NetHSM instance and write it to a file. This command requires authentication as a user with the Backup role.""" @@ -1160,7 +1240,13 @@ def backup(ctx, filename): ) @click.argument("filename") @click.pass_context -def restore(ctx, backup_passphrase, system_time, force, filename): +def restore( + ctx: Context, + backup_passphrase: str, + system_time: Optional[datetime.datetime], + force: bool, + filename: str, +) -> None: """Restore a backup of a NetHSM instance from a file. If the system time is not set, the current system time is used.""" @@ -1194,7 +1280,7 @@ def restore(ctx, backup_passphrase, system_time, force, filename): help="The backup passphrase for decryption (default: only the unencrypted metadata is validated)", ) @click.argument("filename") -def validate_backup(backup_passphrase, filename): +def validate_backup(backup_passphrase: Optional[str], filename: str) -> None: """Validate a NetHSM backup file. Per default, only the metadata of the encrypted backup is validated. If @@ -1231,7 +1317,7 @@ def validate_backup(backup_passphrase, filename): help="The backup passphrase", ) @click.argument("filename") -def export_backup(backup_passphrase, filename): +def export_backup(backup_passphrase: str, filename: str) -> None: """Export the content of a NetHSM backup file. The key-value data stored in the backup file is printed to the standard @@ -1251,20 +1337,20 @@ def export_backup(backup_passphrase, filename): except ValueError as e: raise CliException(f"Failed to decrypt backup content: {e}", support_hint=False) - data = {} - data[".locked-domain-key"] = base64.b64encode(decrypted.domain_key).decode() - data[".version"] = decrypted.version + values: dict[str, Any] = {} + values[".locked-domain-key"] = base64.b64encode(decrypted.domain_key).decode() + values[".version"] = decrypted.version for key, value in decrypted.data.items(): - data[key] = base64.b64encode(value).decode() + values[key] = base64.b64encode(value).decode() - json.dump(data, sys.stdout, indent=4) + json.dump(values, sys.stdout, indent=4) print() @nethsm.command() @click.argument("filename") @click.pass_context -def update(ctx, filename): +def update(ctx: Context, filename: str) -> None: """Load an update to a NetHSM instance. This command requires authentication as a user with the Administrator @@ -1280,7 +1366,7 @@ def update(ctx, filename): @nethsm.command() @click.pass_context -def cancel_update(ctx): +def cancel_update(ctx: Context) -> None: """Cancel a queued update on a NetHSM instance. This command requires authentication as a user with the Administrator @@ -1292,7 +1378,7 @@ def cancel_update(ctx): @nethsm.command() @click.pass_context -def commit_update(ctx): +def commit_update(ctx: Context) -> None: """Commit a queued update on a NetHSM instance. This command requires authentication as a user with the Administrator @@ -1310,7 +1396,7 @@ def commit_update(ctx): help="Force reboot", ) @click.pass_context -def reboot(ctx, force): +def reboot(ctx: Context, force: bool) -> None: """Reboot a NetHSM instance. This command requires authentication as a user with the Administrator @@ -1334,7 +1420,7 @@ def reboot(ctx, force): help="Force shutdown", ) @click.pass_context -def shutdown(ctx, force): +def shutdown(ctx: Context, force: bool) -> None: """Shutdown a NetHSM instance. This command requires authentication as a user with the Administrator @@ -1355,10 +1441,10 @@ def shutdown(ctx, force): "-f", "--force", is_flag=True, - help="Force shutdown", + help="Force factory reset", ) @click.pass_context -def factory_reset(ctx, force): +def factory_reset(ctx: Context, force: bool) -> None: """Perform a factory reset for a NetHSM instance. This command requires authentication as a user with the Administrator @@ -1404,14 +1490,16 @@ def factory_reset(ctx, force): help="The initialization vector", ) @click.pass_context -def encrypt(ctx, key_id, data, mode, iv): +def encrypt(ctx: Context, key_id: str, data: str, mode: str, iv: str) -> None: """Encrypt data with an asymmetric secret key on the NetHSM and print the encrypted message. This command requires authentication as a user with the Operator role.""" with connect(ctx) as nethsm: - encrypted = nethsm.encrypt(key_id, data, mode, iv) - print(f"Encrypted: {encrypted[0]}") - print(f"Initialization vector: {encrypted[1]}") + encrypted = nethsm.encrypt( + key_id, data, nethsm_sdk.EncryptMode.from_string(mode), iv + ) + print(f"Encrypted: {encrypted.encrypted}") + print(f"Initialization vector: {encrypted.iv}") @nethsm.command() @@ -1443,12 +1531,14 @@ def encrypt(ctx, key_id, data, mode, iv): help="The initialization vector", ) @click.pass_context -def decrypt(ctx, key_id, data, mode, iv): +def decrypt(ctx: Context, key_id: str, data: str, mode: str, iv: str) -> None: """Decrypt data with a secret key on the NetHSM and print the decrypted message. This command requires authentication as a user with the Operator role.""" with connect(ctx) as nethsm: - print(nethsm.decrypt(key_id, data, mode, iv)) + print( + nethsm.decrypt(key_id, data, nethsm_sdk.DecryptMode.from_string(mode), iv) + ) @nethsm.command() @@ -1472,10 +1562,10 @@ def decrypt(ctx, key_id, data, mode, iv): help="The sign mode", ) @click.pass_context -def sign(ctx, key_id, data, mode): +def sign(ctx: Context, key_id: str, data: str, mode: str) -> None: """Sign data with a secret key on the NetHSM and print the signature. This command requires authentication as a user with the Operator role.""" with connect(ctx) as nethsm: - signature = nethsm.sign(key_id, data, mode) + signature = nethsm.sign(key_id, data, nethsm_sdk.SignMode.from_string(mode)) print(signature) diff --git a/pyproject.toml b/pyproject.toml index 98259a38..49dc8973 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "protobuf >=3.17.3, < 4.0.0", "click-aliases", "semver", - "nethsm >= 0.4,<0.5", + "nethsm >=0.5, <0.6", ] dynamic = ["version", "description"] @@ -86,7 +86,6 @@ strict = true [[tool.mypy.overrides]] module = [ "pynitrokey.cli", - "pynitrokey.cli.nethsm", "pynitrokey.cli.pro", "pynitrokey.cli.program", "pynitrokey.cli.start",