diff --git a/examples/uploader/_localrepo.py b/examples/uploader/_localrepo.py index 554c85ba5d..616fcb2096 100644 --- a/examples/uploader/_localrepo.py +++ b/examples/uploader/_localrepo.py @@ -63,7 +63,10 @@ def open(self, role: str) -> Metadata: # if there is a metadata version fetched from remote, use that # HACK: access Updater internals if role in self.updater._trusted_set: - return copy.deepcopy(self.updater._trusted_set[role]) + # NOTE: The original signature wrapper (Metadata) was verified and + # discarded upon inclusion in the trusted set. It is safe to use + # a fresh wrapper. `close` will override existing signatures anyway. + return Metadata(copy.deepcopy(self.updater._trusted_set[role])) # otherwise we're creating metadata from scratch md = Metadata(Targets()) diff --git a/tests/test_api.py b/tests/test_api.py index 1ebece61bb..c1076353dc 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -15,6 +15,7 @@ import unittest from copy import copy, deepcopy from datetime import datetime, timedelta +from pathlib import Path from typing import Any, ClassVar, Dict, Optional from securesystemslib import exceptions as sslib_exceptions @@ -33,6 +34,7 @@ from tests import utils from tuf.api import exceptions +from tuf.api.dsse import SimpleEnvelope from tuf.api.metadata import ( TOP_LEVEL_ROLE_NAMES, DelegatedRole, @@ -1144,6 +1146,95 @@ def test_delegations_get_delegated_role(self) -> None: ) +class TestSimpleEnvelope(unittest.TestCase): + """Tests for public API in 'tuf/api/dsse.py'.""" + + @classmethod + def setUpClass(cls) -> None: + repo_data_dir = Path(utils.TESTS_DIR) / "repository_data" + cls.metadata_dir = repo_data_dir / "repository" / "metadata" + cls.signer_store = {} + for role in [Snapshot, Targets, Timestamp]: + key_path = repo_data_dir / "keystore" / f"{role.type}_key" + key = import_ed25519_privatekey_from_file( + str(key_path), + password="password", + ) + cls.signer_store[role.type] = SSlibSigner(key) + + def test_serialization(self) -> None: + """Basic de/serialization test. + + 1. Load test metadata for each role + 2. Wrap metadata payloads in envelope serializing the payload + 3. Serialize envelope + 4. De-serialize envelope + 5. De-serialize payload + + """ + for role in [Root, Timestamp, Snapshot, Targets]: + metadata_path = self.metadata_dir / f"{role.type}.json" + metadata = Metadata.from_file(str(metadata_path)) + self.assertIsInstance(metadata.signed, role) + + envelope = SimpleEnvelope.from_signed(metadata.signed) + envelope_bytes = envelope.to_bytes() + + envelope2 = SimpleEnvelope.from_bytes(envelope_bytes) + payload = envelope2.get_signed() + self.assertEqual(metadata.signed, payload) + + def test_fail_envelope_serialization(self) -> None: + envelope = SimpleEnvelope(b"foo", "bar", ["baz"]) + with self.assertRaises(SerializationError): + envelope.to_bytes() + + def test_fail_envelope_deserialization(self) -> None: + with self.assertRaises(DeserializationError): + SimpleEnvelope.from_bytes(b"[") + + def test_fail_payload_serialization(self) -> None: + with self.assertRaises(SerializationError): + SimpleEnvelope.from_signed("foo") # type: ignore + + def test_fail_payload_deserialization(self) -> None: + payloads = [b"[", b'{"_type": "foo"}'] + for payload in payloads: + envelope = SimpleEnvelope(payload, "bar", []) + with self.assertRaises(DeserializationError): + envelope.get_signed() + + def test_verify_delegate(self) -> None: + """Basic verification test. + + 1. Load test metadata for each role + 2. Wrap non-root payloads in envelope serializing the payload + 3. Sign with correct delegated key + 4. Verify delegate with root + + """ + root_path = self.metadata_dir / "root.json" + root = Metadata[Root].from_file(str(root_path)).signed + + for role in [Timestamp, Snapshot, Targets]: + metadata_path = self.metadata_dir / f"{role.type}.json" + metadata = Metadata.from_file(str(metadata_path)) + self.assertIsInstance(metadata.signed, role) + + signer = self.signer_store[role.type] + self.assertIn( + signer.key_dict["keyid"], root.roles[role.type].keyids + ) + + envelope = SimpleEnvelope.from_signed(metadata.signed) + envelope.sign(signer) + self.assertTrue(len(envelope.signatures) == 1) + + root.verify_delegate( + role.type, envelope.pae(), envelope.signatures_dict + ) + + # Run unit test. if __name__ == "__main__": utils.configure_test_logging(sys.argv) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 13e7e540f2..b5ab042d7e 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -15,16 +15,22 @@ from tests import utils from tuf.api import exceptions +from tuf.api.dsse import SimpleEnvelope from tuf.api.metadata import ( Metadata, MetaFile, Root, + Signed, Snapshot, Targets, Timestamp, ) from tuf.api.serialization.json import JSONSerializer -from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet +from tuf.ngclient._internal.trusted_metadata_set import ( + TrustedMetadataSet, + _load_from_simple_envelope, +) +from tuf.ngclient.config import EnvelopeType logger = logging.getLogger(__name__) @@ -93,7 +99,9 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: ) def setUp(self) -> None: - self.trusted_set = TrustedMetadataSet(self.metadata[Root.type]) + self.trusted_set = TrustedMetadataSet( + self.metadata[Root.type], EnvelopeType.METADATA + ) def _update_all_besides_targets( self, @@ -132,7 +140,7 @@ def test_update(self) -> None: count = 0 for md in self.trusted_set: - self.assertIsInstance(md, Metadata) + self.assertIsInstance(md, Signed) count += 1 self.assertTrue(count, 6) @@ -149,11 +157,11 @@ def test_update_metadata_output(self) -> None: delegeted_targets_2 = self.trusted_set.update_delegated_targets( self.metadata["role2"], "role2", "role1" ) - self.assertIsInstance(timestamp.signed, Timestamp) - self.assertIsInstance(snapshot.signed, Snapshot) - self.assertIsInstance(targets.signed, Targets) - self.assertIsInstance(delegeted_targets_1.signed, Targets) - self.assertIsInstance(delegeted_targets_2.signed, Targets) + self.assertIsInstance(timestamp, Timestamp) + self.assertIsInstance(snapshot, Snapshot) + self.assertIsInstance(targets, Targets) + self.assertIsInstance(delegeted_targets_1, Targets) + self.assertIsInstance(delegeted_targets_2, Targets) def test_out_of_order_ops(self) -> None: # Update snapshot before timestamp @@ -192,25 +200,40 @@ def test_out_of_order_ops(self) -> None: self.metadata["role1"], "role1", Targets.type ) - def test_root_with_invalid_json(self) -> None: - # Test loading initial root and root update - for test_func in [TrustedMetadataSet, self.trusted_set.update_root]: - # root is not json - with self.assertRaises(exceptions.RepositoryError): - test_func(b"") + def test_bad_initial_root(self) -> None: + # root is not json + with self.assertRaises(exceptions.RepositoryError): + TrustedMetadataSet(b"", EnvelopeType.METADATA) - # root is invalid - root = Metadata.from_bytes(self.metadata[Root.type]) - root.signed.version += 1 - with self.assertRaises(exceptions.UnsignedMetadataError): - test_func(root.to_bytes()) + # root is invalid + root = Metadata.from_bytes(self.metadata[Root.type]) + root.signed.version += 1 + with self.assertRaises(exceptions.UnsignedMetadataError): + TrustedMetadataSet(root.to_bytes(), EnvelopeType.METADATA) - # metadata is of wrong type - with self.assertRaises(exceptions.RepositoryError): - test_func(self.metadata[Snapshot.type]) + # metadata is of wrong type + with self.assertRaises(exceptions.RepositoryError): + TrustedMetadataSet( + self.metadata[Snapshot.type], EnvelopeType.METADATA + ) + + def test_bad_root_update(self) -> None: + # root is not json + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_root(b"") + + # root is invalid + root = Metadata.from_bytes(self.metadata[Root.type]) + root.signed.version += 1 + with self.assertRaises(exceptions.UnsignedMetadataError): + self.trusted_set.update_root(root.to_bytes()) + + # metadata is of wrong type + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_root(self.metadata[Snapshot.type]) def test_top_level_md_with_invalid_json(self) -> None: - top_level_md: List[Tuple[bytes, Callable[[bytes], Metadata]]] = [ + top_level_md: List[Tuple[bytes, Callable[[bytes], Signed]]] = [ (self.metadata[Timestamp.type], self.trusted_set.update_timestamp), (self.metadata[Snapshot.type], self.trusted_set.update_snapshot), (self.metadata[Targets.type], self.trusted_set.update_targets), @@ -260,7 +283,7 @@ def root_expired_modifier(root: Root) -> None: # intermediate root can be expired root = self.modify_metadata(Root.type, root_expired_modifier) - tmp_trusted_set = TrustedMetadataSet(root) + tmp_trusted_set = TrustedMetadataSet(root, EnvelopeType.METADATA) # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type]) @@ -471,6 +494,52 @@ def target_expired_modifier(target: Targets) -> None: # TODO test updating over initial metadata (new keys, newer timestamp, etc) + def test_load_from_simple_envelope(self) -> None: + """Basic unit test for ``_load_from_simple_envelope`` helper. + + TODO: Test via trusted metadata set tests like for traditional metadata + """ + metadata = Metadata.from_bytes(self.metadata[Root.type]) + root = metadata.signed + envelope = SimpleEnvelope.from_signed(root) + + # Unwrap unsigned envelope without verification + envelope_bytes = envelope.to_bytes() + payload_obj, signed_bytes, signatures = _load_from_simple_envelope( + Root, envelope_bytes + ) + + self.assertEqual(payload_obj, root) + self.assertEqual(signed_bytes, envelope.pae()) + self.assertDictEqual(signatures, {}) + + # Unwrap correctly signed envelope (use default role name) + sig = envelope.sign(self.keystore[Root.type]) + envelope_bytes = envelope.to_bytes() + _, _, signatures = _load_from_simple_envelope( + Root, envelope_bytes, root + ) + self.assertDictEqual(signatures, {sig.keyid: sig}) + + # Load correctly signed envelope (with explicit role name) + _, _, signatures = _load_from_simple_envelope( + Root, envelope.to_bytes(), root, Root.type + ) + self.assertDictEqual(signatures, {sig.keyid: sig}) + + # Fail load envelope with unexpected 'payload_type' + envelope_bad_type = SimpleEnvelope.from_signed(root) + envelope_bad_type.payload_type = "foo" + envelope_bad_type_bytes = envelope_bad_type.to_bytes() + with self.assertRaises(exceptions.RepositoryError): + _load_from_simple_envelope(Root, envelope_bad_type_bytes) + + # Fail load envelope with unexpected payload type + envelope_bad_signed = SimpleEnvelope.from_signed(root) + envelope_bad_signed_bytes = envelope_bad_signed.to_bytes() + with self.assertRaises(exceptions.RepositoryError): + _load_from_simple_envelope(Targets, envelope_bad_signed_bytes) + if __name__ == "__main__": utils.configure_test_logging(sys.argv) diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 73ca703acc..2ace4bf958 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -285,7 +285,7 @@ def test_updating_root(self) -> None: # Bump root version, resign and refresh self._modify_repository_root(lambda root: None, bump_version=True) self.updater.refresh() - self.assertEqual(self.updater._trusted_set.root.signed.version, 2) + self.assertEqual(self.updater._trusted_set.root.version, 2) def test_missing_targetinfo(self) -> None: self.updater.refresh() diff --git a/tuf/api/_payload.py b/tuf/api/_payload.py new file mode 100644 index 0000000000..84097b6557 --- /dev/null +++ b/tuf/api/_payload.py @@ -0,0 +1,1780 @@ +# Copyright the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + + +"""Helper classes for low-level Metadata API. + +""" +import abc +import fnmatch +import io +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import ( + IO, + Any, + ClassVar, + Dict, + Iterator, + List, + Optional, + Tuple, + TypeVar, + Union, +) + +from securesystemslib import exceptions as sslib_exceptions +from securesystemslib import hash as sslib_hash +from securesystemslib.signer import Key, Signature + +from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError + +_ROOT = "root" +_SNAPSHOT = "snapshot" +_TARGETS = "targets" +_TIMESTAMP = "timestamp" + +# We aim to support SPECIFICATION_VERSION and require the input metadata +# files to have the same major version (the first number) as ours. +SPECIFICATION_VERSION = ["1", "0", "31"] +TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS} + +logger = logging.getLogger(__name__) + +# T is a Generic type constraint for container payloads +T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") + + +class Signed(metaclass=abc.ABCMeta): + """A base class for the signed part of TUF metadata. + + Objects with base class Signed are usually included in a ``Metadata`` object + on the signed attribute. This class provides attributes and methods that + are common for all TUF metadata types (roles). + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + version: Metadata version number. If None, then 1 is assigned. + spec_version: Supported TUF specification version. If None, then the + version currently supported by the library is assigned. + expires: Metadata expiry date. If None, then current date and time is + assigned. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + + Raises: + ValueError: Invalid arguments. + """ + + # type is required for static reference without changing the API + type: ClassVar[str] = "signed" + + # _type and type are identical: 1st replicates file format, 2nd passes lint + @property + def _type(self) -> str: + return self.type + + @property + def expires(self) -> datetime: + """Get the metadata expiry date. + + # Use 'datetime' module to e.g. expire in seven days from now + obj.expires = utcnow() + timedelta(days=7) + """ + return self._expires + + @expires.setter + def expires(self, value: datetime) -> None: + self._expires = value.replace(microsecond=0) + + # NOTE: Signed is a stupid name, because this might not be signed yet, but + # we keep it to match spec terminology (I often refer to this as "payload", + # or "inner metadata") + def __init__( + self, + version: Optional[int], + spec_version: Optional[str], + expires: Optional[datetime], + unrecognized_fields: Optional[Dict[str, Any]], + ): + if spec_version is None: + spec_version = ".".join(SPECIFICATION_VERSION) + # Accept semver (X.Y.Z) but also X.Y for legacy compatibility + spec_list = spec_version.split(".") + if len(spec_list) not in [2, 3] or not all( + el.isdigit() for el in spec_list + ): + raise ValueError(f"Failed to parse spec_version {spec_version}") + + # major version must match + if spec_list[0] != SPECIFICATION_VERSION[0]: + raise ValueError(f"Unsupported spec_version {spec_version}") + + self.spec_version = spec_version + + self.expires = expires or datetime.utcnow() + + if version is None: + version = 1 + elif version <= 0: + raise ValueError(f"version must be > 0, got {version}") + self.version = version + + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Signed): + return False + + return ( + self.type == other.type + and self.version == other.version + and self.spec_version == other.spec_version + and self.expires == other.expires + and self.unrecognized_fields == other.unrecognized_fields + ) + + @abc.abstractmethod + def to_dict(self) -> Dict[str, Any]: + """Serialize and return a dict representation of self.""" + raise NotImplementedError + + @classmethod + @abc.abstractmethod + def from_dict(cls, signed_dict: Dict[str, Any]) -> "Signed": + """Deserialization helper, creates object from json/dict + representation. + """ + raise NotImplementedError + + @classmethod + def _common_fields_from_dict( + cls, signed_dict: Dict[str, Any] + ) -> Tuple[int, str, datetime]: + """Return common fields of ``Signed`` instances from the passed dict + representation, and returns an ordered list to be passed as leading + positional arguments to a subclass constructor. + + See ``{Root, Timestamp, Snapshot, Targets}.from_dict`` + methods for usage. + + """ + _type = signed_dict.pop("_type") + if _type != cls.type: + raise ValueError(f"Expected type {cls.type}, got {_type}") + + version = signed_dict.pop("version") + spec_version = signed_dict.pop("spec_version") + expires_str = signed_dict.pop("expires") + # Convert 'expires' TUF metadata string to a datetime object, which is + # what the constructor expects and what we store. The inverse operation + # is implemented in '_common_fields_to_dict'. + expires = datetime.strptime(expires_str, "%Y-%m-%dT%H:%M:%SZ") + + return version, spec_version, expires + + def _common_fields_to_dict(self) -> Dict[str, Any]: + """Return a dict representation of common fields of + ``Signed`` instances. + + See ``{Root, Timestamp, Snapshot, Targets}.to_dict`` methods for usage. + + """ + return { + "_type": self._type, + "version": self.version, + "spec_version": self.spec_version, + "expires": self.expires.isoformat() + "Z", + **self.unrecognized_fields, + } + + def is_expired(self, reference_time: Optional[datetime] = None) -> bool: + """Check metadata expiration against a reference time. + + Args: + reference_time: Time to check expiration date against. A naive + datetime in UTC expected. Default is current UTC date and time. + + Returns: + ``True`` if expiration time is less than the reference time. + """ + if reference_time is None: + reference_time = datetime.utcnow() + + return reference_time >= self.expires + + +class Role: + """Container that defines which keys are required to sign roles metadata. + + Role defines how many keys are required to successfully sign the roles + metadata, and which keys are accepted. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + keyids: Roles signing key identifiers. + threshold: Number of keys required to sign this role's metadata. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + + Raises: + ValueError: Invalid arguments. + """ + + def __init__( + self, + keyids: List[str], + threshold: int, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + if len(set(keyids)) != len(keyids): + raise ValueError(f"Nonunique keyids: {keyids}") + if threshold < 1: + raise ValueError("threshold should be at least 1!") + self.keyids = keyids + self.threshold = threshold + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Role): + return False + + return ( + self.keyids == other.keyids + and self.threshold == other.threshold + and self.unrecognized_fields == other.unrecognized_fields + ) + + @classmethod + def from_dict(cls, role_dict: Dict[str, Any]) -> "Role": + """Create ``Role`` object from its json/dict representation. + + Raises: + ValueError, KeyError: Invalid arguments. + """ + keyids = role_dict.pop("keyids") + threshold = role_dict.pop("threshold") + # All fields left in the role_dict are unrecognized. + return cls(keyids, threshold, role_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of self.""" + return { + "keyids": self.keyids, + "threshold": self.threshold, + **self.unrecognized_fields, + } + + +@dataclass +class VerificationResult: + """Signature verification result for delegated role metadata. + + Attributes: + threshold: Number of required signatures. + signed: dict of keyid to Key, containing keys that have signed. + unsigned: dict of keyid to Key, containing keys that have not signed. + """ + + threshold: int + signed: Dict[str, Key] + unsigned: Dict[str, Key] + + def __bool__(self) -> bool: + return self.verified + + @property + def verified(self) -> bool: + """True if threshold of signatures is met.""" + return len(self.signed) >= self.threshold + + @property + def missing(self) -> int: + """Number of additional signatures required to reach threshold.""" + return max(0, self.threshold - len(self.signed)) + + +@dataclass +class RootVerificationResult: + """Signature verification result for root metadata. + + Root must be verified by itself and the previous root version. This + dataclass represents both results. For the edge case of first version + of root, these underlying results are identical. + + Note that `signed` and `unsigned` correctness requires the underlying + VerificationResult keys to not conflict (no reusing the same keyid for + different keys). + + Attributes: + first: First underlying VerificationResult + second: Second underlying VerificationResult + """ + + first: VerificationResult + second: VerificationResult + + def __bool__(self) -> bool: + return self.verified + + @property + def verified(self) -> bool: + """True if threshold of signatures is met in both underlying + VerificationResults. + """ + return self.first.verified and self.second.verified + + @property + def signed(self) -> Dict[str, Key]: + """Dictionary of all signing keys that have signed, from both + VerificationResults. + return a union of all signed (in python<3.9 this requires + dict unpacking) + """ + return {**self.first.signed, **self.second.signed} + + @property + def unsigned(self) -> Dict[str, Key]: + """Dictionary of all signing keys that have not signed, from both + VerificationResults. + return a union of all unsigned (in python<3.9 this requires + dict unpacking) + """ + return {**self.first.unsigned, **self.second.unsigned} + + +class _DelegatorMixin(metaclass=abc.ABCMeta): + """Class that implements verify_delegate() for Root and Targets""" + + @abc.abstractmethod + def get_delegated_role(self, delegated_role: str) -> Role: + """Return the role object for the given delegated role. + + Raises ValueError if delegated_role is not actually delegated. + """ + raise NotImplementedError + + @abc.abstractmethod + def get_key(self, keyid: str) -> Key: + """Return the key object for the given keyid. + + Raises ValueError if key is not found. + """ + raise NotImplementedError + + def get_verification_result( + self, + delegated_role: str, + payload: bytes, + signatures: Dict[str, Signature], + ) -> VerificationResult: + """Return signature threshold verification result for delegated role. + + NOTE: Unlike `verify_delegate()` this method does not raise, if the + role metadata is not fully verified. + + Args: + delegated_role: Name of the delegated role to verify + payload: Signed payload bytes for the delegated role + signatures: Signatures over payload bytes + + Raises: + ValueError: no delegation was found for ``delegated_role``. + """ + role = self.get_delegated_role(delegated_role) + + signed = {} + unsigned = {} + + for keyid in role.keyids: + try: + key = self.get_key(keyid) + except ValueError: + logger.info("No key for keyid %s", keyid) + continue + + if keyid not in signatures: + unsigned[keyid] = key + logger.info("No signature for keyid %s", keyid) + continue + + sig = signatures[keyid] + try: + key.verify_signature(sig, payload) + signed[keyid] = key + except sslib_exceptions.UnverifiedSignatureError: + unsigned[keyid] = key + logger.info("Key %s failed to verify %s", keyid, delegated_role) + + return VerificationResult(role.threshold, signed, unsigned) + + def verify_delegate( + self, + delegated_role: str, + payload: bytes, + signatures: Dict[str, Signature], + ) -> None: + """Verify signature threshold for delegated role. + + Verify that there are enough valid ``signatures`` over ``payload``, to + meet the threshold of keys for ``delegated_role``, as defined by the + delegator (``self``). + + Args: + delegated_role: Name of the delegated role to verify + payload: Signed payload bytes for the delegated role + signatures: Signatures over payload bytes + + Raises: + UnsignedMetadataError: ``delegated_role`` was not signed with + required threshold of keys for ``role_name``. + ValueError: no delegation was found for ``delegated_role``. + """ + result = self.get_verification_result( + delegated_role, payload, signatures + ) + if not result: + raise UnsignedMetadataError( + f"{delegated_role} was signed by {len(result.signed)}/" + f"{result.threshold} keys" + ) + + +class Root(Signed, _DelegatorMixin): + """A container for the signed part of root metadata. + + Parameters listed below are also instance attributes. + + Args: + version: Metadata version number. Default is 1. + spec_version: Supported TUF specification version. Default is the + version currently supported by the library. + expires: Metadata expiry date. Default is current date and time. + keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``. + Default is empty dictionary. + roles: Dictionary of role names to Roles. Defines which keys are + required to sign the metadata for a specific role. Default is + a dictionary of top level roles without keys and threshold of 1. + consistent_snapshot: ``True`` if repository supports consistent + snapshots. Default is True. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + + Raises: + ValueError: Invalid arguments. + """ + + type = _ROOT + + def __init__( + self, + version: Optional[int] = None, + spec_version: Optional[str] = None, + expires: Optional[datetime] = None, + keys: Optional[Dict[str, Key]] = None, + roles: Optional[Dict[str, Role]] = None, + consistent_snapshot: Optional[bool] = True, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + super().__init__(version, spec_version, expires, unrecognized_fields) + self.consistent_snapshot = consistent_snapshot + self.keys = keys if keys is not None else {} + + if roles is None: + roles = {r: Role([], 1) for r in TOP_LEVEL_ROLE_NAMES} + elif set(roles) != TOP_LEVEL_ROLE_NAMES: + raise ValueError("Role names must be the top-level metadata roles") + self.roles = roles + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Root): + return False + + return ( + super().__eq__(other) + and self.keys == other.keys + and self.roles == other.roles + and self.consistent_snapshot == other.consistent_snapshot + ) + + @classmethod + def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root": + """Create ``Root`` object from its json/dict representation. + + Raises: + ValueError, KeyError, TypeError: Invalid arguments. + """ + common_args = cls._common_fields_from_dict(signed_dict) + consistent_snapshot = signed_dict.pop("consistent_snapshot", None) + keys = signed_dict.pop("keys") + roles = signed_dict.pop("roles") + + for keyid, key_dict in keys.items(): + keys[keyid] = Key.from_dict(keyid, key_dict) + for role_name, role_dict in roles.items(): + roles[role_name] = Role.from_dict(role_dict) + + # All fields left in the signed_dict are unrecognized. + return cls(*common_args, keys, roles, consistent_snapshot, signed_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the dict representation of self.""" + root_dict = self._common_fields_to_dict() + keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()} + roles = {} + for role_name, role in self.roles.items(): + roles[role_name] = role.to_dict() + if self.consistent_snapshot is not None: + root_dict["consistent_snapshot"] = self.consistent_snapshot + + root_dict.update( + { + "keys": keys, + "roles": roles, + } + ) + return root_dict + + def add_key(self, key: Key, role: str) -> None: + """Add new signing key for delegated role ``role``. + + Args: + key: Signing key to be added for ``role``. + role: Name of the role, for which ``key`` is added. + + Raises: + ValueError: If the argument order is wrong or if ``role`` doesn't + exist. + """ + # Verify that our users are not using the old argument order. + if isinstance(role, Key): + raise ValueError("Role must be a string, not a Key instance") + + if role not in self.roles: + raise ValueError(f"Role {role} doesn't exist") + if key.keyid not in self.roles[role].keyids: + self.roles[role].keyids.append(key.keyid) + self.keys[key.keyid] = key + + def revoke_key(self, keyid: str, role: str) -> None: + """Revoke key from ``role`` and updates the key store. + + Args: + keyid: Identifier of the key to be removed for ``role``. + role: Name of the role, for which a signing key is removed. + + Raises: + ValueError: If ``role`` doesn't exist or if ``role`` doesn't include + the key. + """ + if role not in self.roles: + raise ValueError(f"Role {role} doesn't exist") + if keyid not in self.roles[role].keyids: + raise ValueError(f"Key with id {keyid} is not used by {role}") + self.roles[role].keyids.remove(keyid) + for keyinfo in self.roles.values(): + if keyid in keyinfo.keyids: + return + + del self.keys[keyid] + + def get_delegated_role(self, delegated_role: str) -> Role: + """Return the role object for the given delegated role. + + Raises ValueError if delegated_role is not actually delegated. + """ + if delegated_role not in self.roles: + raise ValueError(f"Delegated role {delegated_role} not found") + + return self.roles[delegated_role] + + def get_key(self, keyid: str) -> Key: # noqa: D102 + if keyid not in self.keys: + raise ValueError(f"Key {keyid} not found") + + return self.keys[keyid] + + def get_root_verification_result( + self, + previous: Optional["Root"], + payload: bytes, + signatures: Dict[str, Signature], + ) -> RootVerificationResult: + """Return signature threshold verification result for two root roles. + + Verify root metadata with two roles (`self` and optionally `previous`). + + If the repository has no root role versions yet, `previous` can be left + None. In all other cases, `previous` must be the previous version of + the Root. + + NOTE: Unlike `verify_delegate()` this method does not raise, if the + root metadata is not fully verified. + + Args: + previous: The previous `Root` to verify payload with, or None + payload: Signed payload bytes for root + signatures: Signatures over payload bytes + + Raises: + ValueError: no delegation was found for ``root`` or given Root + versions are not sequential. + """ + + if previous is None: + previous = self + elif self.version != previous.version + 1: + versions = f"v{previous.version} and v{self.version}" + raise ValueError( + f"Expected sequential root versions, got {versions}." + ) + + return RootVerificationResult( + previous.get_verification_result(Root.type, payload, signatures), + self.get_verification_result(Root.type, payload, signatures), + ) + + +class BaseFile: + """A base class of ``MetaFile`` and ``TargetFile``. + + Encapsulates common static methods for length and hash verification. + """ + + @staticmethod + def _verify_hashes( + data: Union[bytes, IO[bytes]], expected_hashes: Dict[str, str] + ) -> None: + """Verify that the hash of ``data`` matches ``expected_hashes``.""" + is_bytes = isinstance(data, bytes) + for algo, exp_hash in expected_hashes.items(): + try: + if is_bytes: + digest_object = sslib_hash.digest(algo) + digest_object.update(data) + else: + # if data is not bytes, assume it is a file object + digest_object = sslib_hash.digest_fileobject(data, algo) + except ( + sslib_exceptions.UnsupportedAlgorithmError, + sslib_exceptions.FormatError, + ) as e: + raise LengthOrHashMismatchError( + f"Unsupported algorithm '{algo}'" + ) from e + + observed_hash = digest_object.hexdigest() + if observed_hash != exp_hash: + raise LengthOrHashMismatchError( + f"Observed hash {observed_hash} does not match " + f"expected hash {exp_hash}" + ) + + @staticmethod + def _verify_length( + data: Union[bytes, IO[bytes]], expected_length: int + ) -> None: + """Verify that the length of ``data`` matches ``expected_length``.""" + if isinstance(data, bytes): + observed_length = len(data) + else: + # if data is not bytes, assume it is a file object + data.seek(0, io.SEEK_END) + observed_length = data.tell() + + if observed_length != expected_length: + raise LengthOrHashMismatchError( + f"Observed length {observed_length} does not match " + f"expected length {expected_length}" + ) + + @staticmethod + def _validate_hashes(hashes: Dict[str, str]) -> None: + if not hashes: + raise ValueError("Hashes must be a non empty dictionary") + for key, value in hashes.items(): + if not (isinstance(key, str) and isinstance(value, str)): + raise TypeError("Hashes items must be strings") + + @staticmethod + def _validate_length(length: int) -> None: + if length < 0: + raise ValueError(f"Length must be >= 0, got {length}") + + @staticmethod + def _get_length_and_hashes( + data: Union[bytes, IO[bytes]], hash_algorithms: Optional[List[str]] + ) -> Tuple[int, Dict[str, str]]: + """Calculate length and hashes of ``data``.""" + if isinstance(data, bytes): + length = len(data) + else: + data.seek(0, io.SEEK_END) + length = data.tell() + + hashes = {} + + if hash_algorithms is None: + hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM] + + for algorithm in hash_algorithms: + try: + if isinstance(data, bytes): + digest_object = sslib_hash.digest(algorithm) + digest_object.update(data) + else: + digest_object = sslib_hash.digest_fileobject( + data, algorithm + ) + except ( + sslib_exceptions.UnsupportedAlgorithmError, + sslib_exceptions.FormatError, + ) as e: + raise ValueError(f"Unsupported algorithm '{algorithm}'") from e + + hashes[algorithm] = digest_object.hexdigest() + + return (length, hashes) + + +class MetaFile(BaseFile): + """A container with information about a particular metadata file. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + version: Version of the metadata file. + length: Length of the metadata file in bytes. + hashes: Dictionary of hash algorithm names to hashes of the metadata + file content. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + + Raises: + ValueError, TypeError: Invalid arguments. + """ + + def __init__( + self, + version: int = 1, + length: Optional[int] = None, + hashes: Optional[Dict[str, str]] = None, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + if version <= 0: + raise ValueError(f"Metafile version must be > 0, got {version}") + if length is not None: + self._validate_length(length) + if hashes is not None: + self._validate_hashes(hashes) + + self.version = version + self.length = length + self.hashes = hashes + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, MetaFile): + return False + + return ( + self.version == other.version + and self.length == other.length + and self.hashes == other.hashes + and self.unrecognized_fields == other.unrecognized_fields + ) + + @classmethod + def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile": + """Create ``MetaFile`` object from its json/dict representation. + + Raises: + ValueError, KeyError: Invalid arguments. + """ + version = meta_dict.pop("version") + length = meta_dict.pop("length", None) + hashes = meta_dict.pop("hashes", None) + + # All fields left in the meta_dict are unrecognized. + return cls(version, length, hashes, meta_dict) + + @classmethod + def from_data( + cls, + version: int, + data: Union[bytes, IO[bytes]], + hash_algorithms: List[str], + ) -> "MetaFile": + """Creates MetaFile object from bytes. + This constructor should only be used if hashes are wanted. + By default, MetaFile(ver) should be used. + Args: + version: Version of the metadata file. + data: Metadata bytes that the metafile represents. + hash_algorithms: Hash algorithms to create the hashes with. If not + specified, the securesystemslib default hash algorithm is used. + + Raises: + ValueError: The hash algorithms list contains an unsupported + algorithm. + """ + length, hashes = cls._get_length_and_hashes(data, hash_algorithms) + return cls(version, length, hashes) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of self.""" + res_dict: Dict[str, Any] = { + "version": self.version, + **self.unrecognized_fields, + } + + if self.length is not None: + res_dict["length"] = self.length + + if self.hashes is not None: + res_dict["hashes"] = self.hashes + + return res_dict + + def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: + """Verify that the length and hashes of ``data`` match expected values. + + Args: + data: File object or its content in bytes. + + Raises: + LengthOrHashMismatchError: Calculated length or hashes do not + match expected values or hash algorithm is not supported. + """ + if self.length is not None: + self._verify_length(data, self.length) + + if self.hashes is not None: + self._verify_hashes(data, self.hashes) + + +class Timestamp(Signed): + """A container for the signed part of timestamp metadata. + + TUF file format uses a dictionary to contain the snapshot information: + this is not the case with ``Timestamp.snapshot_meta`` which is a + ``MetaFile``. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + version: Metadata version number. Default is 1. + spec_version: Supported TUF specification version. Default is the + version currently supported by the library. + expires: Metadata expiry date. Default is current date and time. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + snapshot_meta: Meta information for snapshot metadata. Default is a + MetaFile with version 1. + + Raises: + ValueError: Invalid arguments. + """ + + type = _TIMESTAMP + + def __init__( + self, + version: Optional[int] = None, + spec_version: Optional[str] = None, + expires: Optional[datetime] = None, + snapshot_meta: Optional[MetaFile] = None, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + super().__init__(version, spec_version, expires, unrecognized_fields) + self.snapshot_meta = snapshot_meta or MetaFile(1) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Timestamp): + return False + + return ( + super().__eq__(other) and self.snapshot_meta == other.snapshot_meta + ) + + @classmethod + def from_dict(cls, signed_dict: Dict[str, Any]) -> "Timestamp": + """Create ``Timestamp`` object from its json/dict representation. + + Raises: + ValueError, KeyError: Invalid arguments. + """ + common_args = cls._common_fields_from_dict(signed_dict) + meta_dict = signed_dict.pop("meta") + snapshot_meta = MetaFile.from_dict(meta_dict["snapshot.json"]) + # All fields left in the timestamp_dict are unrecognized. + return cls(*common_args, snapshot_meta, signed_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the dict representation of self.""" + res_dict = self._common_fields_to_dict() + res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()} + return res_dict + + +class Snapshot(Signed): + """A container for the signed part of snapshot metadata. + + Snapshot contains information about all target Metadata files. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + version: Metadata version number. Default is 1. + spec_version: Supported TUF specification version. Default is the + version currently supported by the library. + expires: Metadata expiry date. Default is current date and time. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + meta: Dictionary of targets filenames to ``MetaFile`` objects. Default + is a dictionary with a Metafile for "snapshot.json" version 1. + + Raises: + ValueError: Invalid arguments. + """ + + type = _SNAPSHOT + + def __init__( + self, + version: Optional[int] = None, + spec_version: Optional[str] = None, + expires: Optional[datetime] = None, + meta: Optional[Dict[str, MetaFile]] = None, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + super().__init__(version, spec_version, expires, unrecognized_fields) + self.meta = meta if meta is not None else {"targets.json": MetaFile(1)} + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Snapshot): + return False + + return super().__eq__(other) and self.meta == other.meta + + @classmethod + def from_dict(cls, signed_dict: Dict[str, Any]) -> "Snapshot": + """Create ``Snapshot`` object from its json/dict representation. + + Raises: + ValueError, KeyError: Invalid arguments. + """ + common_args = cls._common_fields_from_dict(signed_dict) + meta_dicts = signed_dict.pop("meta") + meta = {} + for meta_path, meta_dict in meta_dicts.items(): + meta[meta_path] = MetaFile.from_dict(meta_dict) + # All fields left in the snapshot_dict are unrecognized. + return cls(*common_args, meta, signed_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the dict representation of self.""" + snapshot_dict = self._common_fields_to_dict() + meta_dict = {} + for meta_path, meta_info in self.meta.items(): + meta_dict[meta_path] = meta_info.to_dict() + + snapshot_dict["meta"] = meta_dict + return snapshot_dict + + +class DelegatedRole(Role): + """A container with information about a delegated role. + + A delegation can happen in two ways: + + - ``paths`` is set: delegates targets matching any path pattern in + ``paths`` + - ``path_hash_prefixes`` is set: delegates targets whose target path + hash starts with any of the prefixes in ``path_hash_prefixes`` + + ``paths`` and ``path_hash_prefixes`` are mutually exclusive: + both cannot be set, at least one of them must be set. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + name: Delegated role name. + keyids: Delegated role signing key identifiers. + threshold: Number of keys required to sign this role's metadata. + terminating: ``True`` if this delegation terminates a target lookup. + paths: Path patterns. See note above. + path_hash_prefixes: Hash prefixes. See note above. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API. + + Raises: + ValueError: Invalid arguments. + """ + + def __init__( + self, + name: str, + keyids: List[str], + threshold: int, + terminating: bool, + paths: Optional[List[str]] = None, + path_hash_prefixes: Optional[List[str]] = None, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + super().__init__(keyids, threshold, unrecognized_fields) + self.name = name + self.terminating = terminating + exclusive_vars = [paths, path_hash_prefixes] + if sum(1 for var in exclusive_vars if var is not None) != 1: + raise ValueError( + "Only one of (paths, path_hash_prefixes) must be set" + ) + + if paths is not None and any(not isinstance(p, str) for p in paths): + raise ValueError("Paths must be strings") + if path_hash_prefixes is not None and any( + not isinstance(p, str) for p in path_hash_prefixes + ): + raise ValueError("Path_hash_prefixes must be strings") + + self.paths = paths + self.path_hash_prefixes = path_hash_prefixes + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, DelegatedRole): + return False + + return ( + super().__eq__(other) + and self.name == other.name + and self.terminating == other.terminating + and self.paths == other.paths + and self.path_hash_prefixes == other.path_hash_prefixes + ) + + @classmethod + def from_dict(cls, role_dict: Dict[str, Any]) -> "DelegatedRole": + """Create ``DelegatedRole`` object from its json/dict representation. + + Raises: + ValueError, KeyError, TypeError: Invalid arguments. + """ + name = role_dict.pop("name") + keyids = role_dict.pop("keyids") + threshold = role_dict.pop("threshold") + terminating = role_dict.pop("terminating") + paths = role_dict.pop("paths", None) + path_hash_prefixes = role_dict.pop("path_hash_prefixes", None) + # All fields left in the role_dict are unrecognized. + return cls( + name, + keyids, + threshold, + terminating, + paths, + path_hash_prefixes, + role_dict, + ) + + def to_dict(self) -> Dict[str, Any]: + """Return the dict representation of self.""" + base_role_dict = super().to_dict() + res_dict = { + "name": self.name, + "terminating": self.terminating, + **base_role_dict, + } + if self.paths is not None: + res_dict["paths"] = self.paths + elif self.path_hash_prefixes is not None: + res_dict["path_hash_prefixes"] = self.path_hash_prefixes + return res_dict + + @staticmethod + def _is_target_in_pathpattern(targetpath: str, pathpattern: str) -> bool: + """Determine whether ``targetpath`` matches the ``pathpattern``.""" + # We need to make sure that targetpath and pathpattern are pointing to + # the same directory as fnmatch doesn't threat "/" as a special symbol. + target_parts = targetpath.split("/") + pattern_parts = pathpattern.split("/") + if len(target_parts) != len(pattern_parts): + return False + + # Every part in the pathpattern could include a glob pattern, that's why + # each of the target and pathpattern parts should match. + for target_dir, pattern_dir in zip(target_parts, pattern_parts): + if not fnmatch.fnmatch(target_dir, pattern_dir): + return False + + return True + + def is_delegated_path(self, target_filepath: str) -> bool: + """Determine whether the given ``target_filepath`` is in one of + the paths that ``DelegatedRole`` is trusted to provide. + + The ``target_filepath`` and the ``DelegatedRole`` paths are expected to + be in their canonical forms, so e.g. "a/b" instead of "a//b" . Only "/" + is supported as target path separator. Leading separators are not + handled as special cases (see `TUF specification on targetpath + `_). + + Args: + target_filepath: URL path to a target file, relative to a base + targets URL. + """ + + if self.path_hash_prefixes is not None: + # Calculate the hash of the filepath + # to determine in which bin to find the target. + digest_object = sslib_hash.digest(algorithm="sha256") + digest_object.update(target_filepath.encode("utf-8")) + target_filepath_hash = digest_object.hexdigest() + + for path_hash_prefix in self.path_hash_prefixes: + if target_filepath_hash.startswith(path_hash_prefix): + return True + + elif self.paths is not None: + for pathpattern in self.paths: + # A delegated role path may be an explicit path or glob + # pattern (Unix shell-style wildcards). + if self._is_target_in_pathpattern(target_filepath, pathpattern): + return True + + return False + + +class SuccinctRoles(Role): + """Succinctly defines a hash bin delegation graph. + + A ``SuccinctRoles`` object describes a delegation graph that covers all + targets, distributing them uniformly over the delegated roles (i.e. bins) + in the graph. + + The total number of bins is 2 to the power of the passed ``bit_length``. + + Bin names are the concatenation of the passed ``name_prefix`` and a + zero-padded hex representation of the bin index separated by a hyphen. + + The passed ``keyids`` and ``threshold`` is used for each bin, and each bin + is 'terminating'. + + For details: https://github.com/theupdateframework/taps/blob/master/tap15.md + + Args: + keyids: Signing key identifiers for any bin metadata. + threshold: Number of keys required to sign any bin metadata. + bit_length: Number of bits between 1 and 32. + name_prefix: Prefix of all bin names. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API. + + Raises: + ValueError, TypeError, AttributeError: Invalid arguments. + """ + + def __init__( + self, + keyids: List[str], + threshold: int, + bit_length: int, + name_prefix: str, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ) -> None: + super().__init__(keyids, threshold, unrecognized_fields) + + if bit_length <= 0 or bit_length > 32: + raise ValueError("bit_length must be between 1 and 32") + if not isinstance(name_prefix, str): + raise ValueError("name_prefix must be a string") + + self.bit_length = bit_length + self.name_prefix = name_prefix + + # Calculate the suffix_len value based on the total number of bins in + # hex. If bit_length = 10 then number_of_bins = 1024 or bin names will + # have a suffix between "000" and "3ff" in hex and suffix_len will be 3 + # meaning the third bin will have a suffix of "003". + self.number_of_bins = 2**bit_length + # suffix_len is calculated based on "number_of_bins - 1" as the name + # of the last bin contains the number "number_of_bins -1" as a suffix. + self.suffix_len = len(f"{self.number_of_bins-1:x}") + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, SuccinctRoles): + return False + + return ( + super().__eq__(other) + and self.bit_length == other.bit_length + and self.name_prefix == other.name_prefix + ) + + @classmethod + def from_dict(cls, role_dict: Dict[str, Any]) -> "SuccinctRoles": + """Create ``SuccinctRoles`` object from its json/dict representation. + + Raises: + ValueError, KeyError, AttributeError, TypeError: Invalid arguments. + """ + keyids = role_dict.pop("keyids") + threshold = role_dict.pop("threshold") + bit_length = role_dict.pop("bit_length") + name_prefix = role_dict.pop("name_prefix") + # All fields left in the role_dict are unrecognized. + return cls(keyids, threshold, bit_length, name_prefix, role_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the dict representation of self.""" + base_role_dict = super().to_dict() + return { + "bit_length": self.bit_length, + "name_prefix": self.name_prefix, + **base_role_dict, + } + + def get_role_for_target(self, target_filepath: str) -> str: + """Calculate the name of the delegated role responsible for + ``target_filepath``. + + The target at path ``target_filepath`` is assigned to a bin by casting + the left-most ``bit_length`` of bits of the file path hash digest to + int, using it as bin index between 0 and ``2**bit_length - 1``. + + Args: + target_filepath: URL path to a target file, relative to a base + targets URL. + """ + hasher = sslib_hash.digest(algorithm="sha256") + hasher.update(target_filepath.encode("utf-8")) + + # We can't ever need more than 4 bytes (32 bits). + hash_bytes = hasher.digest()[:4] + # Right shift hash bytes, so that we only have the leftmost + # bit_length bits that we care about. + shift_value = 32 - self.bit_length + bin_number = int.from_bytes(hash_bytes, byteorder="big") >> shift_value + # Add zero padding if necessary and cast to hex the suffix. + suffix = f"{bin_number:0{self.suffix_len}x}" + return f"{self.name_prefix}-{suffix}" + + def get_roles(self) -> Iterator[str]: + """Yield the names of all different delegated roles one by one.""" + for i in range(0, self.number_of_bins): + suffix = f"{i:0{self.suffix_len}x}" + yield f"{self.name_prefix}-{suffix}" + + def is_delegated_role(self, role_name: str) -> bool: + """Determine whether the given ``role_name`` is in one of + the delegated roles that ``SuccinctRoles`` represents. + + Args: + role_name: The name of the role to check against. + """ + desired_prefix = self.name_prefix + "-" + + if not role_name.startswith(desired_prefix): + return False + + suffix = role_name[len(desired_prefix) :] + if len(suffix) != self.suffix_len: + return False + + try: + # make sure suffix is hex value + num = int(suffix, 16) + except ValueError: + return False + + return 0 <= num < self.number_of_bins + + +class Delegations: + """A container object storing information about all delegations. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``. + roles: Ordered dictionary of role names to DelegatedRoles instances. It + defines which keys are required to sign the metadata for a specific + role. The roles order also defines the order that role delegations + are considered during target searches. + succinct_roles: Contains succinct information about hash bin + delegations. Note that succinct roles is not a TUF specification + feature yet and setting `succinct_roles` to a value makes the + resulting metadata non-compliant. The metadata will not be accepted + as valid by specification compliant clients such as those built with + python-tuf <= 1.1.0. For more information see: https://github.com/theupdateframework/taps/blob/master/tap15.md + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + + Exactly one of ``roles`` and ``succinct_roles`` must be set. + + Raises: + ValueError: Invalid arguments. + """ + + def __init__( + self, + keys: Dict[str, Key], + roles: Optional[Dict[str, DelegatedRole]] = None, + succinct_roles: Optional[SuccinctRoles] = None, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + self.keys = keys + if sum(1 for v in [roles, succinct_roles] if v is not None) != 1: + raise ValueError("One of roles and succinct_roles must be set") + + if roles is not None: + for role in roles: + if not role or role in TOP_LEVEL_ROLE_NAMES: + raise ValueError( + "Delegated roles cannot be empty or use top-level " + "role names" + ) + + self.roles = roles + self.succinct_roles = succinct_roles + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Delegations): + return False + + all_attributes_check = ( + self.keys == other.keys + and self.roles == other.roles + and self.succinct_roles == other.succinct_roles + and self.unrecognized_fields == other.unrecognized_fields + ) + + if self.roles is not None and other.roles is not None: + all_attributes_check = ( + all_attributes_check + # Order of the delegated roles matters (see issue #1788). + and list(self.roles.items()) == list(other.roles.items()) + ) + + return all_attributes_check + + @classmethod + def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations": + """Create ``Delegations`` object from its json/dict representation. + + Raises: + ValueError, KeyError, TypeError: Invalid arguments. + """ + keys = delegations_dict.pop("keys") + keys_res = {} + for keyid, key_dict in keys.items(): + keys_res[keyid] = Key.from_dict(keyid, key_dict) + roles = delegations_dict.pop("roles", None) + roles_res: Optional[Dict[str, DelegatedRole]] = None + + if roles is not None: + roles_res = {} + for role_dict in roles: + new_role = DelegatedRole.from_dict(role_dict) + if new_role.name in roles_res: + raise ValueError(f"Duplicate role {new_role.name}") + roles_res[new_role.name] = new_role + + succinct_roles_dict = delegations_dict.pop("succinct_roles", None) + succinct_roles_info = None + if succinct_roles_dict is not None: + succinct_roles_info = SuccinctRoles.from_dict(succinct_roles_dict) + + # All fields left in the delegations_dict are unrecognized. + return cls(keys_res, roles_res, succinct_roles_info, delegations_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the dict representation of self.""" + keys = {keyid: key.to_dict() for keyid, key in self.keys.items()} + res_dict: Dict[str, Any] = { + "keys": keys, + **self.unrecognized_fields, + } + if self.roles is not None: + roles = [role_obj.to_dict() for role_obj in self.roles.values()] + res_dict["roles"] = roles + elif self.succinct_roles is not None: + res_dict["succinct_roles"] = self.succinct_roles.to_dict() + + return res_dict + + def get_roles_for_target( + self, target_filepath: str + ) -> Iterator[Tuple[str, bool]]: + """Given ``target_filepath`` get names and terminating status of all + delegated roles who are responsible for it. + + Args: + target_filepath: URL path to a target file, relative to a base + targets URL. + """ + if self.roles is not None: + for role in self.roles.values(): + if role.is_delegated_path(target_filepath): + yield role.name, role.terminating + + elif self.succinct_roles is not None: + # We consider all succinct_roles as terminating. + # For more information read TAP 15. + yield self.succinct_roles.get_role_for_target(target_filepath), True + + +class TargetFile(BaseFile): + """A container with information about a particular target file. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + length: Length of the target file in bytes. + hashes: Dictionary of hash algorithm names to hashes of the target + file content. + path: URL path to a target file, relative to a base targets URL. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + + Raises: + ValueError, TypeError: Invalid arguments. + """ + + def __init__( + self, + length: int, + hashes: Dict[str, str], + path: str, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ): + self._validate_length(length) + self._validate_hashes(hashes) + + self.length = length + self.hashes = hashes + self.path = path + if unrecognized_fields is None: + unrecognized_fields = {} + + self.unrecognized_fields = unrecognized_fields + + @property + def custom(self) -> Any: + """Get implementation specific data related to the target. + + python-tuf does not use or validate this data. + """ + return self.unrecognized_fields.get("custom") + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, TargetFile): + return False + + return ( + self.length == other.length + and self.hashes == other.hashes + and self.path == other.path + and self.unrecognized_fields == other.unrecognized_fields + ) + + @classmethod + def from_dict(cls, target_dict: Dict[str, Any], path: str) -> "TargetFile": + """Create ``TargetFile`` object from its json/dict representation. + + Raises: + ValueError, KeyError, TypeError: Invalid arguments. + """ + length = target_dict.pop("length") + hashes = target_dict.pop("hashes") + + # All fields left in the target_dict are unrecognized. + return cls(length, hashes, path, target_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the JSON-serializable dictionary representation of self.""" + return { + "length": self.length, + "hashes": self.hashes, + **self.unrecognized_fields, + } + + @classmethod + def from_file( + cls, + target_file_path: str, + local_path: str, + hash_algorithms: Optional[List[str]] = None, + ) -> "TargetFile": + """Create ``TargetFile`` object from a file. + + Args: + target_file_path: URL path to a target file, relative to a base + targets URL. + local_path: Local path to target file content. + hash_algorithms: Hash algorithms to calculate hashes with. If not + specified the securesystemslib default hash algorithm is used. + + Raises: + FileNotFoundError: The file doesn't exist. + ValueError: The hash algorithms list contains an unsupported + algorithm. + """ + with open(local_path, "rb") as file: + return cls.from_data(target_file_path, file, hash_algorithms) + + @classmethod + def from_data( + cls, + target_file_path: str, + data: Union[bytes, IO[bytes]], + hash_algorithms: Optional[List[str]] = None, + ) -> "TargetFile": + """Create ``TargetFile`` object from bytes. + + Args: + target_file_path: URL path to a target file, relative to a base + targets URL. + data: Target file content. + hash_algorithms: Hash algorithms to create the hashes with. If not + specified the securesystemslib default hash algorithm is used. + + Raises: + ValueError: The hash algorithms list contains an unsupported + algorithm. + """ + length, hashes = cls._get_length_and_hashes(data, hash_algorithms) + return cls(length, hashes, target_file_path) + + def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: + """Verify that length and hashes of ``data`` match expected values. + + Args: + data: Target file object or its content in bytes. + + Raises: + LengthOrHashMismatchError: Calculated length or hashes do not + match expected values or hash algorithm is not supported. + """ + self._verify_length(data, self.length) + self._verify_hashes(data, self.hashes) + + def get_prefixed_paths(self) -> List[str]: + """ + Return hash-prefixed URL path fragments for the target file path. + """ + paths = [] + parent, sep, name = self.path.rpartition("/") + for hash_value in self.hashes.values(): + paths.append(f"{parent}{sep}{hash_value}.{name}") + + return paths + + +class Targets(Signed, _DelegatorMixin): + """A container for the signed part of targets metadata. + + Targets contains verifying information about target files and also + delegates responsibility to other Targets roles. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Args: + version: Metadata version number. Default is 1. + spec_version: Supported TUF specification version. Default is the + version currently supported by the library. + expires: Metadata expiry date. Default is current date and time. + targets: Dictionary of target filenames to TargetFiles. Default is an + empty dictionary. + delegations: Defines how this Targets delegates responsibility to other + Targets Metadata files. Default is None. + unrecognized_fields: Dictionary of all attributes that are not managed + by TUF Metadata API + + Raises: + ValueError: Invalid arguments. + """ + + type = _TARGETS + + def __init__( + self, + version: Optional[int] = None, + spec_version: Optional[str] = None, + expires: Optional[datetime] = None, + targets: Optional[Dict[str, TargetFile]] = None, + delegations: Optional[Delegations] = None, + unrecognized_fields: Optional[Dict[str, Any]] = None, + ) -> None: + super().__init__(version, spec_version, expires, unrecognized_fields) + self.targets = targets if targets is not None else {} + self.delegations = delegations + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Targets): + return False + + return ( + super().__eq__(other) + and self.targets == other.targets + and self.delegations == other.delegations + ) + + @classmethod + def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": + """Create ``Targets`` object from its json/dict representation. + + Raises: + ValueError, KeyError, TypeError: Invalid arguments. + """ + common_args = cls._common_fields_from_dict(signed_dict) + targets = signed_dict.pop(_TARGETS) + try: + delegations_dict = signed_dict.pop("delegations") + except KeyError: + delegations = None + else: + delegations = Delegations.from_dict(delegations_dict) + res_targets = {} + for target_path, target_info in targets.items(): + res_targets[target_path] = TargetFile.from_dict( + target_info, target_path + ) + # All fields left in the targets_dict are unrecognized. + return cls(*common_args, res_targets, delegations, signed_dict) + + def to_dict(self) -> Dict[str, Any]: + """Return the dict representation of self.""" + targets_dict = self._common_fields_to_dict() + targets = {} + for target_path, target_file_obj in self.targets.items(): + targets[target_path] = target_file_obj.to_dict() + targets_dict[_TARGETS] = targets + if self.delegations is not None: + targets_dict["delegations"] = self.delegations.to_dict() + return targets_dict + + def add_key(self, key: Key, role: Optional[str] = None) -> None: + """Add new signing key for delegated role ``role``. + + If succinct_roles is used then the ``role`` argument is not required. + + Args: + key: Signing key to be added for ``role``. + role: Name of the role, for which ``key`` is added. + + Raises: + ValueError: If the argument order is wrong or if there are no + delegated roles or if ``role`` is not delegated by this Target. + """ + # Verify that our users are not using the old argument order. + if isinstance(role, Key): + raise ValueError("Role must be a string, not a Key instance") + + if self.delegations is None: + raise ValueError(f"Delegated role {role} doesn't exist") + + if self.delegations.roles is not None: + if role not in self.delegations.roles: + raise ValueError(f"Delegated role {role} doesn't exist") + if key.keyid not in self.delegations.roles[role].keyids: + self.delegations.roles[role].keyids.append(key.keyid) + + elif self.delegations.succinct_roles is not None: + if key.keyid not in self.delegations.succinct_roles.keyids: + self.delegations.succinct_roles.keyids.append(key.keyid) + + self.delegations.keys[key.keyid] = key + + def revoke_key(self, keyid: str, role: Optional[str] = None) -> None: + """Revokes key from delegated role ``role`` and updates the delegations + key store. + + If succinct_roles is used then the ``role`` argument is not required. + + Args: + keyid: Identifier of the key to be removed for ``role``. + role: Name of the role, for which a signing key is removed. + + Raises: + ValueError: If there are no delegated roles or if ``role`` is not + delegated by this ``Target`` or if key is not used by ``role`` + or if key with id ``keyid`` is not used by succinct roles. + """ + if self.delegations is None: + raise ValueError(f"Delegated role {role} doesn't exist") + + if self.delegations.roles is not None: + if role not in self.delegations.roles: + raise ValueError(f"Delegated role {role} doesn't exist") + if keyid not in self.delegations.roles[role].keyids: + raise ValueError(f"Key with id {keyid} is not used by {role}") + + self.delegations.roles[role].keyids.remove(keyid) + for keyinfo in self.delegations.roles.values(): + if keyid in keyinfo.keyids: + return + + elif self.delegations.succinct_roles is not None: + if keyid not in self.delegations.succinct_roles.keyids: + raise ValueError( + f"Key with id {keyid} is not used by succinct_roles" + ) + + self.delegations.succinct_roles.keyids.remove(keyid) + + del self.delegations.keys[keyid] + + def get_delegated_role(self, delegated_role: str) -> Role: + """Return the role object for the given delegated role. + + Raises ValueError if delegated_role is not actually delegated. + """ + if self.delegations is None: + raise ValueError("No delegations found") + + role: Optional[Role] = None + if self.delegations.roles is not None: + role = self.delegations.roles.get(delegated_role) + elif self.delegations.succinct_roles is not None: + succinct = self.delegations.succinct_roles + if succinct.is_delegated_role(delegated_role): + role = succinct + + if not role: + raise ValueError(f"Delegated role {delegated_role} not found") + + return role + + def get_key(self, keyid: str) -> Key: # noqa: D102 + if self.delegations is None: + raise ValueError("No delegations found") + if keyid not in self.delegations.keys: + raise ValueError(f"Key {keyid} not found") + + return self.delegations.keys[keyid] diff --git a/tuf/api/dsse.py b/tuf/api/dsse.py new file mode 100644 index 0000000000..bcdc84b9b3 --- /dev/null +++ b/tuf/api/dsse.py @@ -0,0 +1,159 @@ +"""Low-level TUF DSSE API. (experimental!) + +""" +import json +from typing import Dict, Generic, Type, cast + +from securesystemslib.dsse import Envelope as BaseSimpleEnvelope + +# Expose all payload classes to use API independently of ``tuf.api.metadata``. +from tuf.api._payload import ( # noqa: F401 + _ROOT, + _SNAPSHOT, + _TARGETS, + _TIMESTAMP, + SPECIFICATION_VERSION, + TOP_LEVEL_ROLE_NAMES, + BaseFile, + DelegatedRole, + Delegations, + MetaFile, + Role, + Root, + RootVerificationResult, + Signed, + Snapshot, + SuccinctRoles, + T, + TargetFile, + Targets, + Timestamp, + VerificationResult, +) +from tuf.api.serialization import DeserializationError, SerializationError + + +class SimpleEnvelope(Generic[T], BaseSimpleEnvelope): + """Dead Simple Signing Envelope (DSSE) for TUF payloads. + + * Sign with ``self.sign()`` (inherited). + * Verify with ``verify_delegate`` on a ``Root`` or ``Targets`` + object:: + + delegator.verify_delegate( + role_name, + envelope.pae(), # Note, how we don't pass ``envelope.payload``! + envelope.signatures_dict, + ) + + Attributes: + payload: Serialized payload bytes. + payload_type: Payload string identifier. + signatures: List of ``Signature`` objects. + signatures_dict: Ordered dictionary of keyids to ``Signature`` objects. + + """ + + _DEFAULT_PAYLOAD_TYPE = "application/vnd.tuf+json" + + @property + def signatures_dict(self) -> Dict: + """Convenience alias for ``self.signatures`` mapped to keyids.""" + # TODO: Propose changing ``signatures`` list to dict upstream + return {sig.keyid: sig for sig in self.signatures} + + @classmethod + def from_bytes(cls, data: bytes) -> "SimpleEnvelope[T]": + """Load envelope from JSON bytes. + + NOTE: Unlike ``tuf.api.metadata.Metadata.from_bytes``, this method + does not deserialize the contained payload. Use ``self.get_signed`` to + deserialize the payload into a ``Signed`` object. + + Args: + data: envelope JSON bytes. + + Raises: + tuf.api.serialization.DeserializationError: + data cannot be deserialized. + + Returns: + TUF ``SimpleEnvelope`` object. + """ + try: + envelope_dict = json.loads(data.decode()) + envelope = SimpleEnvelope.from_dict(envelope_dict) + + except Exception as e: + raise DeserializationError from e + + return envelope + + def to_bytes(self) -> bytes: + """Return envelope as JSON bytes. + + NOTE: Unlike ``tuf.api.metadata.Metadata.to_bytes``, this method does + not serialize the payload. Use ``SimpleEnvelope.from_signed`` to + serialize a ``Signed`` object and wrap it in an SimpleEnvelope. + + Raises: + tuf.api.serialization.SerializationError: + self cannot be serialized. + """ + try: + envelope_dict = self.to_dict() + json_bytes = json.dumps(envelope_dict).encode() + + except Exception as e: + raise SerializationError from e + + return json_bytes + + @classmethod + def from_signed(cls, signed: T) -> "SimpleEnvelope[T]": + """Serialize payload as JSON bytes and wrap in envelope. + + Args: + signed: ``Signed`` object. + + Raises: + tuf.api.serialization.SerializationError: + The signed object cannot be serialized. + """ + try: + signed_dict = signed.to_dict() + json_bytes = json.dumps(signed_dict).encode() + + except Exception as e: + raise SerializationError from e + + return cls(json_bytes, cls._DEFAULT_PAYLOAD_TYPE, []) + + def get_signed(self) -> T: + """Extract and deserialize payload JSON bytes from envelope. + + Raises: + tuf.api.serialization.DeserializationError: + The signed object cannot be deserialized. + """ + + try: + payload_dict = json.loads(self.payload.decode()) + + # TODO: can we move this to tuf.api._payload? + _type = payload_dict["_type"] + if _type == _TARGETS: + inner_cls: Type[Signed] = Targets + elif _type == _SNAPSHOT: + inner_cls = Snapshot + elif _type == _TIMESTAMP: + inner_cls = Timestamp + elif _type == _ROOT: + inner_cls = Root + else: + raise ValueError(f'unrecognized role type "{_type}"') + + except Exception as e: + raise DeserializationError from e + + return cast(T, inner_cls.from_dict(payload_dict)) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 3f385efa86..ae42a3f539 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -29,58 +29,50 @@ A basic example of repository implementation using the Metadata is available in `examples/repo_example `_. """ -import abc -import fnmatch -import io import logging import tempfile -from dataclasses import dataclass -from datetime import datetime -from typing import ( - IO, - Any, - ClassVar, - Dict, - Generic, - Iterator, - List, - Optional, - Tuple, - Type, - TypeVar, - Union, - cast, -) +from typing import Any, Dict, Generic, Optional, Type, cast -from securesystemslib import exceptions as sslib_exceptions -from securesystemslib import hash as sslib_hash -from securesystemslib.signer import Key, Signature, Signer +from securesystemslib.signer import Signature, Signer from securesystemslib.storage import FilesystemBackend, StorageBackendInterface from securesystemslib.util import persist_temp_file -from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError +# Expose payload classes via ``tuf.api.metadata`` to maintain the API, +# even if they are unused in the local scope. +from tuf.api._payload import ( # noqa: F401 + _ROOT, + _SNAPSHOT, + _TARGETS, + _TIMESTAMP, + SPECIFICATION_VERSION, + TOP_LEVEL_ROLE_NAMES, + BaseFile, + DelegatedRole, + Delegations, + Key, + LengthOrHashMismatchError, + MetaFile, + Role, + Root, + RootVerificationResult, + Signed, + Snapshot, + SuccinctRoles, + T, + TargetFile, + Targets, + Timestamp, + VerificationResult, +) +from tuf.api.exceptions import UnsignedMetadataError from tuf.api.serialization import ( MetadataDeserializer, MetadataSerializer, SignedSerializer, ) -_ROOT = "root" -_SNAPSHOT = "snapshot" -_TARGETS = "targets" -_TIMESTAMP = "timestamp" - - logger = logging.getLogger(__name__) -# We aim to support SPECIFICATION_VERSION and require the input metadata -# files to have the same major version (the first number) as ours. -SPECIFICATION_VERSION = ["1", "0", "31"] -TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS} - -# T is a Generic type constraint for Metadata.signed -T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") - class Metadata(Generic[T]): """A container for signed TUF metadata. @@ -418,1737 +410,3 @@ def verify_delegate( self.signed.verify_delegate( delegated_role, payload, delegated_metadata.signatures ) - - -class Signed(metaclass=abc.ABCMeta): - """A base class for the signed part of TUF metadata. - - Objects with base class Signed are usually included in a ``Metadata`` object - on the signed attribute. This class provides attributes and methods that - are common for all TUF metadata types (roles). - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - version: Metadata version number. If None, then 1 is assigned. - spec_version: Supported TUF specification version. If None, then the - version currently supported by the library is assigned. - expires: Metadata expiry date. If None, then current date and time is - assigned. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - - Raises: - ValueError: Invalid arguments. - """ - - # type is required for static reference without changing the API - type: ClassVar[str] = "signed" - - # _type and type are identical: 1st replicates file format, 2nd passes lint - @property - def _type(self) -> str: - return self.type - - @property - def expires(self) -> datetime: - """Get the metadata expiry date. - - # Use 'datetime' module to e.g. expire in seven days from now - obj.expires = utcnow() + timedelta(days=7) - """ - return self._expires - - @expires.setter - def expires(self, value: datetime) -> None: - self._expires = value.replace(microsecond=0) - - # NOTE: Signed is a stupid name, because this might not be signed yet, but - # we keep it to match spec terminology (I often refer to this as "payload", - # or "inner metadata") - def __init__( - self, - version: Optional[int], - spec_version: Optional[str], - expires: Optional[datetime], - unrecognized_fields: Optional[Dict[str, Any]], - ): - if spec_version is None: - spec_version = ".".join(SPECIFICATION_VERSION) - # Accept semver (X.Y.Z) but also X.Y for legacy compatibility - spec_list = spec_version.split(".") - if len(spec_list) not in [2, 3] or not all( - el.isdigit() for el in spec_list - ): - raise ValueError(f"Failed to parse spec_version {spec_version}") - - # major version must match - if spec_list[0] != SPECIFICATION_VERSION[0]: - raise ValueError(f"Unsupported spec_version {spec_version}") - - self.spec_version = spec_version - - self.expires = expires or datetime.utcnow() - - if version is None: - version = 1 - elif version <= 0: - raise ValueError(f"version must be > 0, got {version}") - self.version = version - - if unrecognized_fields is None: - unrecognized_fields = {} - - self.unrecognized_fields = unrecognized_fields - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Signed): - return False - - return ( - self.type == other.type - and self.version == other.version - and self.spec_version == other.spec_version - and self.expires == other.expires - and self.unrecognized_fields == other.unrecognized_fields - ) - - @abc.abstractmethod - def to_dict(self) -> Dict[str, Any]: - """Serialize and return a dict representation of self.""" - raise NotImplementedError - - @classmethod - @abc.abstractmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Signed": - """Deserialization helper, creates object from json/dict - representation. - """ - raise NotImplementedError - - @classmethod - def _common_fields_from_dict( - cls, signed_dict: Dict[str, Any] - ) -> Tuple[int, str, datetime]: - """Return common fields of ``Signed`` instances from the passed dict - representation, and returns an ordered list to be passed as leading - positional arguments to a subclass constructor. - - See ``{Root, Timestamp, Snapshot, Targets}.from_dict`` - methods for usage. - - """ - _type = signed_dict.pop("_type") - if _type != cls.type: - raise ValueError(f"Expected type {cls.type}, got {_type}") - - version = signed_dict.pop("version") - spec_version = signed_dict.pop("spec_version") - expires_str = signed_dict.pop("expires") - # Convert 'expires' TUF metadata string to a datetime object, which is - # what the constructor expects and what we store. The inverse operation - # is implemented in '_common_fields_to_dict'. - expires = datetime.strptime(expires_str, "%Y-%m-%dT%H:%M:%SZ") - - return version, spec_version, expires - - def _common_fields_to_dict(self) -> Dict[str, Any]: - """Return a dict representation of common fields of - ``Signed`` instances. - - See ``{Root, Timestamp, Snapshot, Targets}.to_dict`` methods for usage. - - """ - return { - "_type": self._type, - "version": self.version, - "spec_version": self.spec_version, - "expires": self.expires.isoformat() + "Z", - **self.unrecognized_fields, - } - - def is_expired(self, reference_time: Optional[datetime] = None) -> bool: - """Check metadata expiration against a reference time. - - Args: - reference_time: Time to check expiration date against. A naive - datetime in UTC expected. Default is current UTC date and time. - - Returns: - ``True`` if expiration time is less than the reference time. - """ - if reference_time is None: - reference_time = datetime.utcnow() - - return reference_time >= self.expires - - -class Role: - """Container that defines which keys are required to sign roles metadata. - - Role defines how many keys are required to successfully sign the roles - metadata, and which keys are accepted. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - keyids: Roles signing key identifiers. - threshold: Number of keys required to sign this role's metadata. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - - Raises: - ValueError: Invalid arguments. - """ - - def __init__( - self, - keyids: List[str], - threshold: int, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - if len(set(keyids)) != len(keyids): - raise ValueError(f"Nonunique keyids: {keyids}") - if threshold < 1: - raise ValueError("threshold should be at least 1!") - self.keyids = keyids - self.threshold = threshold - if unrecognized_fields is None: - unrecognized_fields = {} - - self.unrecognized_fields = unrecognized_fields - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Role): - return False - - return ( - self.keyids == other.keyids - and self.threshold == other.threshold - and self.unrecognized_fields == other.unrecognized_fields - ) - - @classmethod - def from_dict(cls, role_dict: Dict[str, Any]) -> "Role": - """Create ``Role`` object from its json/dict representation. - - Raises: - ValueError, KeyError: Invalid arguments. - """ - keyids = role_dict.pop("keyids") - threshold = role_dict.pop("threshold") - # All fields left in the role_dict are unrecognized. - return cls(keyids, threshold, role_dict) - - def to_dict(self) -> Dict[str, Any]: - """Return the dictionary representation of self.""" - return { - "keyids": self.keyids, - "threshold": self.threshold, - **self.unrecognized_fields, - } - - -@dataclass -class VerificationResult: - """Signature verification result for delegated role metadata. - - Attributes: - threshold: Number of required signatures. - signed: dict of keyid to Key, containing keys that have signed. - unsigned: dict of keyid to Key, containing keys that have not signed. - """ - - threshold: int - signed: Dict[str, Key] - unsigned: Dict[str, Key] - - def __bool__(self) -> bool: - return self.verified - - @property - def verified(self) -> bool: - """True if threshold of signatures is met.""" - return len(self.signed) >= self.threshold - - @property - def missing(self) -> int: - """Number of additional signatures required to reach threshold.""" - return max(0, self.threshold - len(self.signed)) - - -@dataclass -class RootVerificationResult: - """Signature verification result for root metadata. - - Root must be verified by itself and the previous root version. This - dataclass represents both results. For the edge case of first version - of root, these underlying results are identical. - - Note that `signed` and `unsigned` correctness requires the underlying - VerificationResult keys to not conflict (no reusing the same keyid for - different keys). - - Attributes: - first: First underlying VerificationResult - second: Second underlying VerificationResult - """ - - first: VerificationResult - second: VerificationResult - - def __bool__(self) -> bool: - return self.verified - - @property - def verified(self) -> bool: - """True if threshold of signatures is met in both underlying - VerificationResults. - """ - return self.first.verified and self.second.verified - - @property - def signed(self) -> Dict[str, Key]: - """Dictionary of all signing keys that have signed, from both - VerificationResults. - return a union of all signed (in python<3.9 this requires - dict unpacking) - """ - return {**self.first.signed, **self.second.signed} - - @property - def unsigned(self) -> Dict[str, Key]: - """Dictionary of all signing keys that have not signed, from both - VerificationResults. - return a union of all unsigned (in python<3.9 this requires - dict unpacking) - """ - return {**self.first.unsigned, **self.second.unsigned} - - -class _DelegatorMixin(metaclass=abc.ABCMeta): - """Class that implements verify_delegate() for Root and Targets""" - - @abc.abstractmethod - def get_delegated_role(self, delegated_role: str) -> Role: - """Return the role object for the given delegated role. - - Raises ValueError if delegated_role is not actually delegated. - """ - raise NotImplementedError - - @abc.abstractmethod - def get_key(self, keyid: str) -> Key: - """Return the key object for the given keyid. - - Raises ValueError if key is not found. - """ - raise NotImplementedError - - def get_verification_result( - self, - delegated_role: str, - payload: bytes, - signatures: Dict[str, Signature], - ) -> VerificationResult: - """Return signature threshold verification result for delegated role. - - NOTE: Unlike `verify_delegate()` this method does not raise, if the - role metadata is not fully verified. - - Args: - delegated_role: Name of the delegated role to verify - payload: Signed payload bytes for the delegated role - signatures: Signatures over payload bytes - - Raises: - ValueError: no delegation was found for ``delegated_role``. - """ - role = self.get_delegated_role(delegated_role) - - signed = {} - unsigned = {} - - for keyid in role.keyids: - try: - key = self.get_key(keyid) - except ValueError: - logger.info("No key for keyid %s", keyid) - continue - - if keyid not in signatures: - unsigned[keyid] = key - logger.info("No signature for keyid %s", keyid) - continue - - sig = signatures[keyid] - try: - key.verify_signature(sig, payload) - signed[keyid] = key - except sslib_exceptions.UnverifiedSignatureError: - unsigned[keyid] = key - logger.info("Key %s failed to verify %s", keyid, delegated_role) - - return VerificationResult(role.threshold, signed, unsigned) - - def verify_delegate( - self, - delegated_role: str, - payload: bytes, - signatures: Dict[str, Signature], - ) -> None: - """Verify signature threshold for delegated role. - - Verify that there are enough valid ``signatures`` over ``payload``, to - meet the threshold of keys for ``delegated_role``, as defined by the - delegator (``self``). - - Args: - delegated_role: Name of the delegated role to verify - payload: Signed payload bytes for the delegated role - signatures: Signatures over payload bytes - - Raises: - UnsignedMetadataError: ``delegated_role`` was not signed with - required threshold of keys for ``role_name``. - ValueError: no delegation was found for ``delegated_role``. - """ - result = self.get_verification_result( - delegated_role, payload, signatures - ) - if not result: - raise UnsignedMetadataError( - f"{delegated_role} was signed by {len(result.signed)}/" - f"{result.threshold} keys" - ) - - -class Root(Signed, _DelegatorMixin): - """A container for the signed part of root metadata. - - Parameters listed below are also instance attributes. - - Args: - version: Metadata version number. Default is 1. - spec_version: Supported TUF specification version. Default is the - version currently supported by the library. - expires: Metadata expiry date. Default is current date and time. - keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``. - Default is empty dictionary. - roles: Dictionary of role names to Roles. Defines which keys are - required to sign the metadata for a specific role. Default is - a dictionary of top level roles without keys and threshold of 1. - consistent_snapshot: ``True`` if repository supports consistent - snapshots. Default is True. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - - Raises: - ValueError: Invalid arguments. - """ - - type = _ROOT - - def __init__( - self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - keys: Optional[Dict[str, Key]] = None, - roles: Optional[Dict[str, Role]] = None, - consistent_snapshot: Optional[bool] = True, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - super().__init__(version, spec_version, expires, unrecognized_fields) - self.consistent_snapshot = consistent_snapshot - self.keys = keys if keys is not None else {} - - if roles is None: - roles = {r: Role([], 1) for r in TOP_LEVEL_ROLE_NAMES} - elif set(roles) != TOP_LEVEL_ROLE_NAMES: - raise ValueError("Role names must be the top-level metadata roles") - self.roles = roles - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Root): - return False - - return ( - super().__eq__(other) - and self.keys == other.keys - and self.roles == other.roles - and self.consistent_snapshot == other.consistent_snapshot - ) - - @classmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root": - """Create ``Root`` object from its json/dict representation. - - Raises: - ValueError, KeyError, TypeError: Invalid arguments. - """ - common_args = cls._common_fields_from_dict(signed_dict) - consistent_snapshot = signed_dict.pop("consistent_snapshot", None) - keys = signed_dict.pop("keys") - roles = signed_dict.pop("roles") - - for keyid, key_dict in keys.items(): - keys[keyid] = Key.from_dict(keyid, key_dict) - for role_name, role_dict in roles.items(): - roles[role_name] = Role.from_dict(role_dict) - - # All fields left in the signed_dict are unrecognized. - return cls(*common_args, keys, roles, consistent_snapshot, signed_dict) - - def to_dict(self) -> Dict[str, Any]: - """Return the dict representation of self.""" - root_dict = self._common_fields_to_dict() - keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()} - roles = {} - for role_name, role in self.roles.items(): - roles[role_name] = role.to_dict() - if self.consistent_snapshot is not None: - root_dict["consistent_snapshot"] = self.consistent_snapshot - - root_dict.update( - { - "keys": keys, - "roles": roles, - } - ) - return root_dict - - def add_key(self, key: Key, role: str) -> None: - """Add new signing key for delegated role ``role``. - - Args: - key: Signing key to be added for ``role``. - role: Name of the role, for which ``key`` is added. - - Raises: - ValueError: If the argument order is wrong or if ``role`` doesn't - exist. - """ - # Verify that our users are not using the old argument order. - if isinstance(role, Key): - raise ValueError("Role must be a string, not a Key instance") - - if role not in self.roles: - raise ValueError(f"Role {role} doesn't exist") - if key.keyid not in self.roles[role].keyids: - self.roles[role].keyids.append(key.keyid) - self.keys[key.keyid] = key - - def revoke_key(self, keyid: str, role: str) -> None: - """Revoke key from ``role`` and updates the key store. - - Args: - keyid: Identifier of the key to be removed for ``role``. - role: Name of the role, for which a signing key is removed. - - Raises: - ValueError: If ``role`` doesn't exist or if ``role`` doesn't include - the key. - """ - if role not in self.roles: - raise ValueError(f"Role {role} doesn't exist") - if keyid not in self.roles[role].keyids: - raise ValueError(f"Key with id {keyid} is not used by {role}") - self.roles[role].keyids.remove(keyid) - for keyinfo in self.roles.values(): - if keyid in keyinfo.keyids: - return - - del self.keys[keyid] - - def get_delegated_role(self, delegated_role: str) -> Role: - """Return the role object for the given delegated role. - - Raises ValueError if delegated_role is not actually delegated. - """ - if delegated_role not in self.roles: - raise ValueError(f"Delegated role {delegated_role} not found") - - return self.roles[delegated_role] - - def get_key(self, keyid: str) -> Key: # noqa: D102 - if keyid not in self.keys: - raise ValueError(f"Key {keyid} not found") - - return self.keys[keyid] - - def get_root_verification_result( - self, - previous: Optional["Root"], - payload: bytes, - signatures: Dict[str, Signature], - ) -> RootVerificationResult: - """Return signature threshold verification result for two root roles. - - Verify root metadata with two roles (`self` and optionally `previous`). - - If the repository has no root role versions yet, `previous` can be left - None. In all other cases, `previous` must be the previous version of - the Root. - - NOTE: Unlike `verify_delegate()` this method does not raise, if the - root metadata is not fully verified. - - Args: - previous: The previous `Root` to verify payload with, or None - payload: Signed payload bytes for root - signatures: Signatures over payload bytes - - Raises: - ValueError: no delegation was found for ``root`` or given Root - versions are not sequential. - """ - - if previous is None: - previous = self - elif self.version != previous.version + 1: - versions = f"v{previous.version} and v{self.version}" - raise ValueError( - f"Expected sequential root versions, got {versions}." - ) - - return RootVerificationResult( - previous.get_verification_result(Root.type, payload, signatures), - self.get_verification_result(Root.type, payload, signatures), - ) - - -class BaseFile: - """A base class of ``MetaFile`` and ``TargetFile``. - - Encapsulates common static methods for length and hash verification. - """ - - @staticmethod - def _verify_hashes( - data: Union[bytes, IO[bytes]], expected_hashes: Dict[str, str] - ) -> None: - """Verify that the hash of ``data`` matches ``expected_hashes``.""" - is_bytes = isinstance(data, bytes) - for algo, exp_hash in expected_hashes.items(): - try: - if is_bytes: - digest_object = sslib_hash.digest(algo) - digest_object.update(data) - else: - # if data is not bytes, assume it is a file object - digest_object = sslib_hash.digest_fileobject(data, algo) - except ( - sslib_exceptions.UnsupportedAlgorithmError, - sslib_exceptions.FormatError, - ) as e: - raise LengthOrHashMismatchError( - f"Unsupported algorithm '{algo}'" - ) from e - - observed_hash = digest_object.hexdigest() - if observed_hash != exp_hash: - raise LengthOrHashMismatchError( - f"Observed hash {observed_hash} does not match " - f"expected hash {exp_hash}" - ) - - @staticmethod - def _verify_length( - data: Union[bytes, IO[bytes]], expected_length: int - ) -> None: - """Verify that the length of ``data`` matches ``expected_length``.""" - if isinstance(data, bytes): - observed_length = len(data) - else: - # if data is not bytes, assume it is a file object - data.seek(0, io.SEEK_END) - observed_length = data.tell() - - if observed_length != expected_length: - raise LengthOrHashMismatchError( - f"Observed length {observed_length} does not match " - f"expected length {expected_length}" - ) - - @staticmethod - def _validate_hashes(hashes: Dict[str, str]) -> None: - if not hashes: - raise ValueError("Hashes must be a non empty dictionary") - for key, value in hashes.items(): - if not (isinstance(key, str) and isinstance(value, str)): - raise TypeError("Hashes items must be strings") - - @staticmethod - def _validate_length(length: int) -> None: - if length < 0: - raise ValueError(f"Length must be >= 0, got {length}") - - @staticmethod - def _get_length_and_hashes( - data: Union[bytes, IO[bytes]], hash_algorithms: Optional[List[str]] - ) -> Tuple[int, Dict[str, str]]: - """Calculate length and hashes of ``data``.""" - if isinstance(data, bytes): - length = len(data) - else: - data.seek(0, io.SEEK_END) - length = data.tell() - - hashes = {} - - if hash_algorithms is None: - hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM] - - for algorithm in hash_algorithms: - try: - if isinstance(data, bytes): - digest_object = sslib_hash.digest(algorithm) - digest_object.update(data) - else: - digest_object = sslib_hash.digest_fileobject( - data, algorithm - ) - except ( - sslib_exceptions.UnsupportedAlgorithmError, - sslib_exceptions.FormatError, - ) as e: - raise ValueError(f"Unsupported algorithm '{algorithm}'") from e - - hashes[algorithm] = digest_object.hexdigest() - - return (length, hashes) - - -class MetaFile(BaseFile): - """A container with information about a particular metadata file. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - version: Version of the metadata file. - length: Length of the metadata file in bytes. - hashes: Dictionary of hash algorithm names to hashes of the metadata - file content. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - - Raises: - ValueError, TypeError: Invalid arguments. - """ - - def __init__( - self, - version: int = 1, - length: Optional[int] = None, - hashes: Optional[Dict[str, str]] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - if version <= 0: - raise ValueError(f"Metafile version must be > 0, got {version}") - if length is not None: - self._validate_length(length) - if hashes is not None: - self._validate_hashes(hashes) - - self.version = version - self.length = length - self.hashes = hashes - if unrecognized_fields is None: - unrecognized_fields = {} - - self.unrecognized_fields = unrecognized_fields - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, MetaFile): - return False - - return ( - self.version == other.version - and self.length == other.length - and self.hashes == other.hashes - and self.unrecognized_fields == other.unrecognized_fields - ) - - @classmethod - def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile": - """Create ``MetaFile`` object from its json/dict representation. - - Raises: - ValueError, KeyError: Invalid arguments. - """ - version = meta_dict.pop("version") - length = meta_dict.pop("length", None) - hashes = meta_dict.pop("hashes", None) - - # All fields left in the meta_dict are unrecognized. - return cls(version, length, hashes, meta_dict) - - @classmethod - def from_data( - cls, - version: int, - data: Union[bytes, IO[bytes]], - hash_algorithms: List[str], - ) -> "MetaFile": - """Creates MetaFile object from bytes. - This constructor should only be used if hashes are wanted. - By default, MetaFile(ver) should be used. - Args: - version: Version of the metadata file. - data: Metadata bytes that the metafile represents. - hash_algorithms: Hash algorithms to create the hashes with. If not - specified, the securesystemslib default hash algorithm is used. - - Raises: - ValueError: The hash algorithms list contains an unsupported - algorithm. - """ - length, hashes = cls._get_length_and_hashes(data, hash_algorithms) - return cls(version, length, hashes) - - def to_dict(self) -> Dict[str, Any]: - """Return the dictionary representation of self.""" - res_dict: Dict[str, Any] = { - "version": self.version, - **self.unrecognized_fields, - } - - if self.length is not None: - res_dict["length"] = self.length - - if self.hashes is not None: - res_dict["hashes"] = self.hashes - - return res_dict - - def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: - """Verify that the length and hashes of ``data`` match expected values. - - Args: - data: File object or its content in bytes. - - Raises: - LengthOrHashMismatchError: Calculated length or hashes do not - match expected values or hash algorithm is not supported. - """ - if self.length is not None: - self._verify_length(data, self.length) - - if self.hashes is not None: - self._verify_hashes(data, self.hashes) - - -class Timestamp(Signed): - """A container for the signed part of timestamp metadata. - - TUF file format uses a dictionary to contain the snapshot information: - this is not the case with ``Timestamp.snapshot_meta`` which is a - ``MetaFile``. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - version: Metadata version number. Default is 1. - spec_version: Supported TUF specification version. Default is the - version currently supported by the library. - expires: Metadata expiry date. Default is current date and time. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - snapshot_meta: Meta information for snapshot metadata. Default is a - MetaFile with version 1. - - Raises: - ValueError: Invalid arguments. - """ - - type = _TIMESTAMP - - def __init__( - self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - snapshot_meta: Optional[MetaFile] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - super().__init__(version, spec_version, expires, unrecognized_fields) - self.snapshot_meta = snapshot_meta or MetaFile(1) - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Timestamp): - return False - - return ( - super().__eq__(other) and self.snapshot_meta == other.snapshot_meta - ) - - @classmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Timestamp": - """Create ``Timestamp`` object from its json/dict representation. - - Raises: - ValueError, KeyError: Invalid arguments. - """ - common_args = cls._common_fields_from_dict(signed_dict) - meta_dict = signed_dict.pop("meta") - snapshot_meta = MetaFile.from_dict(meta_dict["snapshot.json"]) - # All fields left in the timestamp_dict are unrecognized. - return cls(*common_args, snapshot_meta, signed_dict) - - def to_dict(self) -> Dict[str, Any]: - """Return the dict representation of self.""" - res_dict = self._common_fields_to_dict() - res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()} - return res_dict - - -class Snapshot(Signed): - """A container for the signed part of snapshot metadata. - - Snapshot contains information about all target Metadata files. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - version: Metadata version number. Default is 1. - spec_version: Supported TUF specification version. Default is the - version currently supported by the library. - expires: Metadata expiry date. Default is current date and time. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - meta: Dictionary of targets filenames to ``MetaFile`` objects. Default - is a dictionary with a Metafile for "snapshot.json" version 1. - - Raises: - ValueError: Invalid arguments. - """ - - type = _SNAPSHOT - - def __init__( - self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - meta: Optional[Dict[str, MetaFile]] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - super().__init__(version, spec_version, expires, unrecognized_fields) - self.meta = meta if meta is not None else {"targets.json": MetaFile(1)} - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Snapshot): - return False - - return super().__eq__(other) and self.meta == other.meta - - @classmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Snapshot": - """Create ``Snapshot`` object from its json/dict representation. - - Raises: - ValueError, KeyError: Invalid arguments. - """ - common_args = cls._common_fields_from_dict(signed_dict) - meta_dicts = signed_dict.pop("meta") - meta = {} - for meta_path, meta_dict in meta_dicts.items(): - meta[meta_path] = MetaFile.from_dict(meta_dict) - # All fields left in the snapshot_dict are unrecognized. - return cls(*common_args, meta, signed_dict) - - def to_dict(self) -> Dict[str, Any]: - """Return the dict representation of self.""" - snapshot_dict = self._common_fields_to_dict() - meta_dict = {} - for meta_path, meta_info in self.meta.items(): - meta_dict[meta_path] = meta_info.to_dict() - - snapshot_dict["meta"] = meta_dict - return snapshot_dict - - -class DelegatedRole(Role): - """A container with information about a delegated role. - - A delegation can happen in two ways: - - - ``paths`` is set: delegates targets matching any path pattern in - ``paths`` - - ``path_hash_prefixes`` is set: delegates targets whose target path - hash starts with any of the prefixes in ``path_hash_prefixes`` - - ``paths`` and ``path_hash_prefixes`` are mutually exclusive: - both cannot be set, at least one of them must be set. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - name: Delegated role name. - keyids: Delegated role signing key identifiers. - threshold: Number of keys required to sign this role's metadata. - terminating: ``True`` if this delegation terminates a target lookup. - paths: Path patterns. See note above. - path_hash_prefixes: Hash prefixes. See note above. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API. - - Raises: - ValueError: Invalid arguments. - """ - - def __init__( - self, - name: str, - keyids: List[str], - threshold: int, - terminating: bool, - paths: Optional[List[str]] = None, - path_hash_prefixes: Optional[List[str]] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - super().__init__(keyids, threshold, unrecognized_fields) - self.name = name - self.terminating = terminating - exclusive_vars = [paths, path_hash_prefixes] - if sum(1 for var in exclusive_vars if var is not None) != 1: - raise ValueError( - "Only one of (paths, path_hash_prefixes) must be set" - ) - - if paths is not None and any(not isinstance(p, str) for p in paths): - raise ValueError("Paths must be strings") - if path_hash_prefixes is not None and any( - not isinstance(p, str) for p in path_hash_prefixes - ): - raise ValueError("Path_hash_prefixes must be strings") - - self.paths = paths - self.path_hash_prefixes = path_hash_prefixes - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, DelegatedRole): - return False - - return ( - super().__eq__(other) - and self.name == other.name - and self.terminating == other.terminating - and self.paths == other.paths - and self.path_hash_prefixes == other.path_hash_prefixes - ) - - @classmethod - def from_dict(cls, role_dict: Dict[str, Any]) -> "DelegatedRole": - """Create ``DelegatedRole`` object from its json/dict representation. - - Raises: - ValueError, KeyError, TypeError: Invalid arguments. - """ - name = role_dict.pop("name") - keyids = role_dict.pop("keyids") - threshold = role_dict.pop("threshold") - terminating = role_dict.pop("terminating") - paths = role_dict.pop("paths", None) - path_hash_prefixes = role_dict.pop("path_hash_prefixes", None) - # All fields left in the role_dict are unrecognized. - return cls( - name, - keyids, - threshold, - terminating, - paths, - path_hash_prefixes, - role_dict, - ) - - def to_dict(self) -> Dict[str, Any]: - """Return the dict representation of self.""" - base_role_dict = super().to_dict() - res_dict = { - "name": self.name, - "terminating": self.terminating, - **base_role_dict, - } - if self.paths is not None: - res_dict["paths"] = self.paths - elif self.path_hash_prefixes is not None: - res_dict["path_hash_prefixes"] = self.path_hash_prefixes - return res_dict - - @staticmethod - def _is_target_in_pathpattern(targetpath: str, pathpattern: str) -> bool: - """Determine whether ``targetpath`` matches the ``pathpattern``.""" - # We need to make sure that targetpath and pathpattern are pointing to - # the same directory as fnmatch doesn't threat "/" as a special symbol. - target_parts = targetpath.split("/") - pattern_parts = pathpattern.split("/") - if len(target_parts) != len(pattern_parts): - return False - - # Every part in the pathpattern could include a glob pattern, that's why - # each of the target and pathpattern parts should match. - for target_dir, pattern_dir in zip(target_parts, pattern_parts): - if not fnmatch.fnmatch(target_dir, pattern_dir): - return False - - return True - - def is_delegated_path(self, target_filepath: str) -> bool: - """Determine whether the given ``target_filepath`` is in one of - the paths that ``DelegatedRole`` is trusted to provide. - - The ``target_filepath`` and the ``DelegatedRole`` paths are expected to - be in their canonical forms, so e.g. "a/b" instead of "a//b" . Only "/" - is supported as target path separator. Leading separators are not - handled as special cases (see `TUF specification on targetpath - `_). - - Args: - target_filepath: URL path to a target file, relative to a base - targets URL. - """ - - if self.path_hash_prefixes is not None: - # Calculate the hash of the filepath - # to determine in which bin to find the target. - digest_object = sslib_hash.digest(algorithm="sha256") - digest_object.update(target_filepath.encode("utf-8")) - target_filepath_hash = digest_object.hexdigest() - - for path_hash_prefix in self.path_hash_prefixes: - if target_filepath_hash.startswith(path_hash_prefix): - return True - - elif self.paths is not None: - for pathpattern in self.paths: - # A delegated role path may be an explicit path or glob - # pattern (Unix shell-style wildcards). - if self._is_target_in_pathpattern(target_filepath, pathpattern): - return True - - return False - - -class SuccinctRoles(Role): - """Succinctly defines a hash bin delegation graph. - - A ``SuccinctRoles`` object describes a delegation graph that covers all - targets, distributing them uniformly over the delegated roles (i.e. bins) - in the graph. - - The total number of bins is 2 to the power of the passed ``bit_length``. - - Bin names are the concatenation of the passed ``name_prefix`` and a - zero-padded hex representation of the bin index separated by a hyphen. - - The passed ``keyids`` and ``threshold`` is used for each bin, and each bin - is 'terminating'. - - For details: https://github.com/theupdateframework/taps/blob/master/tap15.md - - Args: - keyids: Signing key identifiers for any bin metadata. - threshold: Number of keys required to sign any bin metadata. - bit_length: Number of bits between 1 and 32. - name_prefix: Prefix of all bin names. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API. - - Raises: - ValueError, TypeError, AttributeError: Invalid arguments. - """ - - def __init__( - self, - keyids: List[str], - threshold: int, - bit_length: int, - name_prefix: str, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ) -> None: - super().__init__(keyids, threshold, unrecognized_fields) - - if bit_length <= 0 or bit_length > 32: - raise ValueError("bit_length must be between 1 and 32") - if not isinstance(name_prefix, str): - raise ValueError("name_prefix must be a string") - - self.bit_length = bit_length - self.name_prefix = name_prefix - - # Calculate the suffix_len value based on the total number of bins in - # hex. If bit_length = 10 then number_of_bins = 1024 or bin names will - # have a suffix between "000" and "3ff" in hex and suffix_len will be 3 - # meaning the third bin will have a suffix of "003". - self.number_of_bins = 2**bit_length - # suffix_len is calculated based on "number_of_bins - 1" as the name - # of the last bin contains the number "number_of_bins -1" as a suffix. - self.suffix_len = len(f"{self.number_of_bins-1:x}") - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, SuccinctRoles): - return False - - return ( - super().__eq__(other) - and self.bit_length == other.bit_length - and self.name_prefix == other.name_prefix - ) - - @classmethod - def from_dict(cls, role_dict: Dict[str, Any]) -> "SuccinctRoles": - """Create ``SuccinctRoles`` object from its json/dict representation. - - Raises: - ValueError, KeyError, AttributeError, TypeError: Invalid arguments. - """ - keyids = role_dict.pop("keyids") - threshold = role_dict.pop("threshold") - bit_length = role_dict.pop("bit_length") - name_prefix = role_dict.pop("name_prefix") - # All fields left in the role_dict are unrecognized. - return cls(keyids, threshold, bit_length, name_prefix, role_dict) - - def to_dict(self) -> Dict[str, Any]: - """Return the dict representation of self.""" - base_role_dict = super().to_dict() - return { - "bit_length": self.bit_length, - "name_prefix": self.name_prefix, - **base_role_dict, - } - - def get_role_for_target(self, target_filepath: str) -> str: - """Calculate the name of the delegated role responsible for - ``target_filepath``. - - The target at path ``target_filepath`` is assigned to a bin by casting - the left-most ``bit_length`` of bits of the file path hash digest to - int, using it as bin index between 0 and ``2**bit_length - 1``. - - Args: - target_filepath: URL path to a target file, relative to a base - targets URL. - """ - hasher = sslib_hash.digest(algorithm="sha256") - hasher.update(target_filepath.encode("utf-8")) - - # We can't ever need more than 4 bytes (32 bits). - hash_bytes = hasher.digest()[:4] - # Right shift hash bytes, so that we only have the leftmost - # bit_length bits that we care about. - shift_value = 32 - self.bit_length - bin_number = int.from_bytes(hash_bytes, byteorder="big") >> shift_value - # Add zero padding if necessary and cast to hex the suffix. - suffix = f"{bin_number:0{self.suffix_len}x}" - return f"{self.name_prefix}-{suffix}" - - def get_roles(self) -> Iterator[str]: - """Yield the names of all different delegated roles one by one.""" - for i in range(0, self.number_of_bins): - suffix = f"{i:0{self.suffix_len}x}" - yield f"{self.name_prefix}-{suffix}" - - def is_delegated_role(self, role_name: str) -> bool: - """Determine whether the given ``role_name`` is in one of - the delegated roles that ``SuccinctRoles`` represents. - - Args: - role_name: The name of the role to check against. - """ - desired_prefix = self.name_prefix + "-" - - if not role_name.startswith(desired_prefix): - return False - - suffix = role_name[len(desired_prefix) :] - if len(suffix) != self.suffix_len: - return False - - try: - # make sure suffix is hex value - num = int(suffix, 16) - except ValueError: - return False - - return 0 <= num < self.number_of_bins - - -class Delegations: - """A container object storing information about all delegations. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``. - roles: Ordered dictionary of role names to DelegatedRoles instances. It - defines which keys are required to sign the metadata for a specific - role. The roles order also defines the order that role delegations - are considered during target searches. - succinct_roles: Contains succinct information about hash bin - delegations. Note that succinct roles is not a TUF specification - feature yet and setting `succinct_roles` to a value makes the - resulting metadata non-compliant. The metadata will not be accepted - as valid by specification compliant clients such as those built with - python-tuf <= 1.1.0. For more information see: https://github.com/theupdateframework/taps/blob/master/tap15.md - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - - Exactly one of ``roles`` and ``succinct_roles`` must be set. - - Raises: - ValueError: Invalid arguments. - """ - - def __init__( - self, - keys: Dict[str, Key], - roles: Optional[Dict[str, DelegatedRole]] = None, - succinct_roles: Optional[SuccinctRoles] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - self.keys = keys - if sum(1 for v in [roles, succinct_roles] if v is not None) != 1: - raise ValueError("One of roles and succinct_roles must be set") - - if roles is not None: - for role in roles: - if not role or role in TOP_LEVEL_ROLE_NAMES: - raise ValueError( - "Delegated roles cannot be empty or use top-level " - "role names" - ) - - self.roles = roles - self.succinct_roles = succinct_roles - if unrecognized_fields is None: - unrecognized_fields = {} - - self.unrecognized_fields = unrecognized_fields - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Delegations): - return False - - all_attributes_check = ( - self.keys == other.keys - and self.roles == other.roles - and self.succinct_roles == other.succinct_roles - and self.unrecognized_fields == other.unrecognized_fields - ) - - if self.roles is not None and other.roles is not None: - all_attributes_check = ( - all_attributes_check - # Order of the delegated roles matters (see issue #1788). - and list(self.roles.items()) == list(other.roles.items()) - ) - - return all_attributes_check - - @classmethod - def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations": - """Create ``Delegations`` object from its json/dict representation. - - Raises: - ValueError, KeyError, TypeError: Invalid arguments. - """ - keys = delegations_dict.pop("keys") - keys_res = {} - for keyid, key_dict in keys.items(): - keys_res[keyid] = Key.from_dict(keyid, key_dict) - roles = delegations_dict.pop("roles", None) - roles_res: Optional[Dict[str, DelegatedRole]] = None - - if roles is not None: - roles_res = {} - for role_dict in roles: - new_role = DelegatedRole.from_dict(role_dict) - if new_role.name in roles_res: - raise ValueError(f"Duplicate role {new_role.name}") - roles_res[new_role.name] = new_role - - succinct_roles_dict = delegations_dict.pop("succinct_roles", None) - succinct_roles_info = None - if succinct_roles_dict is not None: - succinct_roles_info = SuccinctRoles.from_dict(succinct_roles_dict) - - # All fields left in the delegations_dict are unrecognized. - return cls(keys_res, roles_res, succinct_roles_info, delegations_dict) - - def to_dict(self) -> Dict[str, Any]: - """Return the dict representation of self.""" - keys = {keyid: key.to_dict() for keyid, key in self.keys.items()} - res_dict: Dict[str, Any] = { - "keys": keys, - **self.unrecognized_fields, - } - if self.roles is not None: - roles = [role_obj.to_dict() for role_obj in self.roles.values()] - res_dict["roles"] = roles - elif self.succinct_roles is not None: - res_dict["succinct_roles"] = self.succinct_roles.to_dict() - - return res_dict - - def get_roles_for_target( - self, target_filepath: str - ) -> Iterator[Tuple[str, bool]]: - """Given ``target_filepath`` get names and terminating status of all - delegated roles who are responsible for it. - - Args: - target_filepath: URL path to a target file, relative to a base - targets URL. - """ - if self.roles is not None: - for role in self.roles.values(): - if role.is_delegated_path(target_filepath): - yield role.name, role.terminating - - elif self.succinct_roles is not None: - # We consider all succinct_roles as terminating. - # For more information read TAP 15. - yield self.succinct_roles.get_role_for_target(target_filepath), True - - -class TargetFile(BaseFile): - """A container with information about a particular target file. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - length: Length of the target file in bytes. - hashes: Dictionary of hash algorithm names to hashes of the target - file content. - path: URL path to a target file, relative to a base targets URL. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - - Raises: - ValueError, TypeError: Invalid arguments. - """ - - def __init__( - self, - length: int, - hashes: Dict[str, str], - path: str, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - self._validate_length(length) - self._validate_hashes(hashes) - - self.length = length - self.hashes = hashes - self.path = path - if unrecognized_fields is None: - unrecognized_fields = {} - - self.unrecognized_fields = unrecognized_fields - - @property - def custom(self) -> Any: - """Get implementation specific data related to the target. - - python-tuf does not use or validate this data. - """ - return self.unrecognized_fields.get("custom") - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, TargetFile): - return False - - return ( - self.length == other.length - and self.hashes == other.hashes - and self.path == other.path - and self.unrecognized_fields == other.unrecognized_fields - ) - - @classmethod - def from_dict(cls, target_dict: Dict[str, Any], path: str) -> "TargetFile": - """Create ``TargetFile`` object from its json/dict representation. - - Raises: - ValueError, KeyError, TypeError: Invalid arguments. - """ - length = target_dict.pop("length") - hashes = target_dict.pop("hashes") - - # All fields left in the target_dict are unrecognized. - return cls(length, hashes, path, target_dict) - - def to_dict(self) -> Dict[str, Any]: - """Return the JSON-serializable dictionary representation of self.""" - return { - "length": self.length, - "hashes": self.hashes, - **self.unrecognized_fields, - } - - @classmethod - def from_file( - cls, - target_file_path: str, - local_path: str, - hash_algorithms: Optional[List[str]] = None, - ) -> "TargetFile": - """Create ``TargetFile`` object from a file. - - Args: - target_file_path: URL path to a target file, relative to a base - targets URL. - local_path: Local path to target file content. - hash_algorithms: Hash algorithms to calculate hashes with. If not - specified the securesystemslib default hash algorithm is used. - - Raises: - FileNotFoundError: The file doesn't exist. - ValueError: The hash algorithms list contains an unsupported - algorithm. - """ - with open(local_path, "rb") as file: - return cls.from_data(target_file_path, file, hash_algorithms) - - @classmethod - def from_data( - cls, - target_file_path: str, - data: Union[bytes, IO[bytes]], - hash_algorithms: Optional[List[str]] = None, - ) -> "TargetFile": - """Create ``TargetFile`` object from bytes. - - Args: - target_file_path: URL path to a target file, relative to a base - targets URL. - data: Target file content. - hash_algorithms: Hash algorithms to create the hashes with. If not - specified the securesystemslib default hash algorithm is used. - - Raises: - ValueError: The hash algorithms list contains an unsupported - algorithm. - """ - length, hashes = cls._get_length_and_hashes(data, hash_algorithms) - return cls(length, hashes, target_file_path) - - def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: - """Verify that length and hashes of ``data`` match expected values. - - Args: - data: Target file object or its content in bytes. - - Raises: - LengthOrHashMismatchError: Calculated length or hashes do not - match expected values or hash algorithm is not supported. - """ - self._verify_length(data, self.length) - self._verify_hashes(data, self.hashes) - - def get_prefixed_paths(self) -> List[str]: - """ - Return hash-prefixed URL path fragments for the target file path. - """ - paths = [] - parent, sep, name = self.path.rpartition("/") - for hash_value in self.hashes.values(): - paths.append(f"{parent}{sep}{hash_value}.{name}") - - return paths - - -class Targets(Signed, _DelegatorMixin): - """A container for the signed part of targets metadata. - - Targets contains verifying information about target files and also - delegates responsibility to other Targets roles. - - *All parameters named below are not just constructor arguments but also - instance attributes.* - - Args: - version: Metadata version number. Default is 1. - spec_version: Supported TUF specification version. Default is the - version currently supported by the library. - expires: Metadata expiry date. Default is current date and time. - targets: Dictionary of target filenames to TargetFiles. Default is an - empty dictionary. - delegations: Defines how this Targets delegates responsibility to other - Targets Metadata files. Default is None. - unrecognized_fields: Dictionary of all attributes that are not managed - by TUF Metadata API - - Raises: - ValueError: Invalid arguments. - """ - - type = _TARGETS - - def __init__( - self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - targets: Optional[Dict[str, TargetFile]] = None, - delegations: Optional[Delegations] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ) -> None: - super().__init__(version, spec_version, expires, unrecognized_fields) - self.targets = targets if targets is not None else {} - self.delegations = delegations - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Targets): - return False - - return ( - super().__eq__(other) - and self.targets == other.targets - and self.delegations == other.delegations - ) - - @classmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": - """Create ``Targets`` object from its json/dict representation. - - Raises: - ValueError, KeyError, TypeError: Invalid arguments. - """ - common_args = cls._common_fields_from_dict(signed_dict) - targets = signed_dict.pop(_TARGETS) - try: - delegations_dict = signed_dict.pop("delegations") - except KeyError: - delegations = None - else: - delegations = Delegations.from_dict(delegations_dict) - res_targets = {} - for target_path, target_info in targets.items(): - res_targets[target_path] = TargetFile.from_dict( - target_info, target_path - ) - # All fields left in the targets_dict are unrecognized. - return cls(*common_args, res_targets, delegations, signed_dict) - - def to_dict(self) -> Dict[str, Any]: - """Return the dict representation of self.""" - targets_dict = self._common_fields_to_dict() - targets = {} - for target_path, target_file_obj in self.targets.items(): - targets[target_path] = target_file_obj.to_dict() - targets_dict[_TARGETS] = targets - if self.delegations is not None: - targets_dict["delegations"] = self.delegations.to_dict() - return targets_dict - - def add_key(self, key: Key, role: Optional[str] = None) -> None: - """Add new signing key for delegated role ``role``. - - If succinct_roles is used then the ``role`` argument is not required. - - Args: - key: Signing key to be added for ``role``. - role: Name of the role, for which ``key`` is added. - - Raises: - ValueError: If the argument order is wrong or if there are no - delegated roles or if ``role`` is not delegated by this Target. - """ - # Verify that our users are not using the old argument order. - if isinstance(role, Key): - raise ValueError("Role must be a string, not a Key instance") - - if self.delegations is None: - raise ValueError(f"Delegated role {role} doesn't exist") - - if self.delegations.roles is not None: - if role not in self.delegations.roles: - raise ValueError(f"Delegated role {role} doesn't exist") - if key.keyid not in self.delegations.roles[role].keyids: - self.delegations.roles[role].keyids.append(key.keyid) - - elif self.delegations.succinct_roles is not None: - if key.keyid not in self.delegations.succinct_roles.keyids: - self.delegations.succinct_roles.keyids.append(key.keyid) - - self.delegations.keys[key.keyid] = key - - def revoke_key(self, keyid: str, role: Optional[str] = None) -> None: - """Revokes key from delegated role ``role`` and updates the delegations - key store. - - If succinct_roles is used then the ``role`` argument is not required. - - Args: - keyid: Identifier of the key to be removed for ``role``. - role: Name of the role, for which a signing key is removed. - - Raises: - ValueError: If there are no delegated roles or if ``role`` is not - delegated by this ``Target`` or if key is not used by ``role`` - or if key with id ``keyid`` is not used by succinct roles. - """ - if self.delegations is None: - raise ValueError(f"Delegated role {role} doesn't exist") - - if self.delegations.roles is not None: - if role not in self.delegations.roles: - raise ValueError(f"Delegated role {role} doesn't exist") - if keyid not in self.delegations.roles[role].keyids: - raise ValueError(f"Key with id {keyid} is not used by {role}") - - self.delegations.roles[role].keyids.remove(keyid) - for keyinfo in self.delegations.roles.values(): - if keyid in keyinfo.keyids: - return - - elif self.delegations.succinct_roles is not None: - if keyid not in self.delegations.succinct_roles.keyids: - raise ValueError( - f"Key with id {keyid} is not used by succinct_roles" - ) - - self.delegations.succinct_roles.keyids.remove(keyid) - - del self.delegations.keys[keyid] - - def get_delegated_role(self, delegated_role: str) -> Role: - """Return the role object for the given delegated role. - - Raises ValueError if delegated_role is not actually delegated. - """ - if self.delegations is None: - raise ValueError("No delegations found") - - role: Optional[Role] = None - if self.delegations.roles is not None: - role = self.delegations.roles.get(delegated_role) - elif self.delegations.succinct_roles is not None: - succinct = self.delegations.succinct_roles - if succinct.is_delegated_role(delegated_role): - role = succinct - - if not role: - raise ValueError(f"Delegated role {delegated_role} not found") - - return role - - def get_key(self, keyid: str) -> Key: # noqa: D102 - if self.delegations is None: - raise ValueError("No delegations found") - if keyid not in self.delegations.keys: - raise ValueError(f"Key {keyid} not found") - - return self.delegations.keys[keyid] diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index b9a8c3587a..87c42b05bf 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -13,6 +13,8 @@ (``trusted_set[Root.type]``) or, in the case of top-level metadata, using the helper properties (``trusted_set.root``). +Signatures are verified and discarded upon inclusion into the trusted set. + The rules that ``TrustedMetadataSet`` follows for top-level metadata are * Metadata must be loaded in order: root -> timestamp -> snapshot -> targets -> (delegated targets). @@ -32,10 +34,10 @@ >>> # Load local root (RepositoryErrors here stop the update) >>> with open(root_path, "rb") as f: ->>> trusted_set = TrustedMetadataSet(f.read()) +>>> trusted_set = TrustedMetadataSet(f.read(), EnvelopeType.METADATA) >>> >>> # update root from remote until no more are available ->>> with download(Root.type, trusted_set.root.signed.version + 1) as f: +>>> with download(Root.type, trusted_set.root.version + 1) as f: >>> trusted_set.update_root(f.read()) >>> >>> # load local timestamp, then update from remote @@ -62,13 +64,27 @@ import datetime import logging from collections import abc -from typing import Dict, Iterator, Optional +from typing import Dict, Iterator, Optional, Tuple, Type, Union, cast + +from securesystemslib.signer import Signature from tuf.api import exceptions -from tuf.api.metadata import Metadata, Root, Snapshot, Targets, Timestamp +from tuf.api.dsse import SimpleEnvelope +from tuf.api.metadata import ( + Metadata, + Root, + Signed, + Snapshot, + T, + Targets, + Timestamp, +) +from tuf.ngclient.config import EnvelopeType logger = logging.getLogger(__name__) +Delegator = Union[Root, Targets] + class TrustedMetadataSet(abc.Mapping): """Internal class to keep track of trusted metadata in ``Updater``. @@ -79,63 +95,70 @@ class TrustedMetadataSet(abc.Mapping): what is updated. """ - def __init__(self, root_data: bytes): + def __init__(self, root_data: bytes, envelope_type: EnvelopeType): """Initialize ``TrustedMetadataSet`` by loading trusted root metadata. Args: root_data: Trusted root metadata as bytes. Note that this metadata will only be verified by itself: it is the source of trust for all metadata in the ``TrustedMetadataSet`` + envelope_type: Configures deserialization and verification mode of + TUF metadata. Raises: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. """ - self._trusted_set: Dict[str, Metadata] = {} + self._trusted_set: Dict[str, Signed] = {} self.reference_time = datetime.datetime.utcnow() + if envelope_type is EnvelopeType.SIMPLE: + self._load_data = _load_from_simple_envelope + else: + self._load_data = _load_from_metadata + # Load and validate the local root metadata. Valid initial trusted root # metadata is required logger.debug("Updating initial trusted root") self._load_trusted_root(root_data) - def __getitem__(self, role: str) -> Metadata: - """Return current ``Metadata`` for ``role``.""" + def __getitem__(self, role: str) -> Signed: + """Return current ``Signed`` for ``role``.""" return self._trusted_set[role] def __len__(self) -> int: - """Return number of ``Metadata`` objects in ``TrustedMetadataSet``.""" + """Return number of ``Signed`` objects in ``TrustedMetadataSet``.""" return len(self._trusted_set) - def __iter__(self) -> Iterator[Metadata]: - """Return iterator over ``Metadata`` objects in + def __iter__(self) -> Iterator[Signed]: + """Return iterator over ``Signed`` objects in ``TrustedMetadataSet``. """ return iter(self._trusted_set.values()) # Helper properties for top level metadata @property - def root(self) -> Metadata[Root]: - """Get current root ``Metadata``.""" - return self._trusted_set[Root.type] + def root(self) -> Root: + """Get current root.""" + return cast(Root, self._trusted_set[Root.type]) @property - def timestamp(self) -> Metadata[Timestamp]: - """Get current timestamp ``Metadata``.""" - return self._trusted_set[Timestamp.type] + def timestamp(self) -> Timestamp: + """Get current timestamp.""" + return cast(Timestamp, self._trusted_set[Timestamp.type]) @property - def snapshot(self) -> Metadata[Snapshot]: - """Get current snapshot ``Metadata``.""" - return self._trusted_set[Snapshot.type] + def snapshot(self) -> Snapshot: + """Get current snapshot.""" + return cast(Snapshot, self._trusted_set[Snapshot.type]) @property - def targets(self) -> Metadata[Targets]: - """Get current top-level targets ``Metadata``.""" - return self._trusted_set[Targets.type] + def targets(self) -> Targets: + """Get current top-level targets.""" + return cast(Targets, self._trusted_set[Targets.type]) # Methods for updating metadata - def update_root(self, data: bytes) -> Metadata[Root]: + def update_root(self, data: bytes) -> Root: """Verify and load ``data`` as new root metadata. Note that an expired intermediate root is considered valid: expiry is @@ -150,41 +173,30 @@ def update_root(self, data: bytes) -> Metadata[Root]: error type and content will contain more details. Returns: - Deserialized and verified root ``Metadata`` object + Deserialized and verified ``Root`` object """ if Timestamp.type in self._trusted_set: raise RuntimeError("Cannot update root after timestamp") logger.debug("Updating root") - new_root = Metadata[Root].from_bytes(data) - - if new_root.signed.type != Root.type: - raise exceptions.RepositoryError( - f"Expected 'root', got '{new_root.signed.type}'" - ) - - # Verify that new root is signed by trusted root - self.root.signed.verify_delegate( - Root.type, new_root.signed_bytes, new_root.signatures + new_root, new_root_bytes, new_root_signatures = self._load_data( + Root, data, self.root ) - - if new_root.signed.version != self.root.signed.version + 1: + if new_root.version != self.root.version + 1: raise exceptions.BadVersionNumberError( - f"Expected root version {self.root.signed.version + 1}" - f" instead got version {new_root.signed.version}" + f"Expected root version {self.root.version + 1}" + f" instead got version {new_root.version}" ) # Verify that new root is signed by itself - new_root.signed.verify_delegate( - Root.type, new_root.signed_bytes, new_root.signatures - ) + new_root.verify_delegate(Root.type, new_root_bytes, new_root_signatures) self._trusted_set[Root.type] = new_root - logger.debug("Updated root v%d", new_root.signed.version) + logger.debug("Updated root v%d", new_root.version) return new_root - def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: + def update_timestamp(self, data: bytes) -> Timestamp: """Verify and load ``data`` as new timestamp metadata. Note that an intermediate timestamp is allowed to be expired: @@ -204,44 +216,35 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: more details. Returns: - Deserialized and verified timestamp ``Metadata`` object + Deserialized and verified ``Timestamp`` object """ if Snapshot.type in self._trusted_set: raise RuntimeError("Cannot update timestamp after snapshot") # client workflow 5.3.10: Make sure final root is not expired. - if self.root.signed.is_expired(self.reference_time): + if self.root.is_expired(self.reference_time): raise exceptions.ExpiredMetadataError("Final root.json is expired") # No need to check for 5.3.11 (fast forward attack recovery): # timestamp/snapshot can not yet be loaded at this point - new_timestamp = Metadata[Timestamp].from_bytes(data) - - if new_timestamp.signed.type != Timestamp.type: - raise exceptions.RepositoryError( - f"Expected 'timestamp', got '{new_timestamp.signed.type}'" - ) - - self.root.signed.verify_delegate( - Timestamp.type, new_timestamp.signed_bytes, new_timestamp.signatures - ) + new_timestamp, _, _ = self._load_data(Timestamp, data, self.root) # If an existing trusted timestamp is updated, # check for a rollback attack if Timestamp.type in self._trusted_set: # Prevent rolling back timestamp version - if new_timestamp.signed.version < self.timestamp.signed.version: + if new_timestamp.version < self.timestamp.version: raise exceptions.BadVersionNumberError( - f"New timestamp version {new_timestamp.signed.version} must" - f" be >= {self.timestamp.signed.version}" + f"New timestamp version {new_timestamp.version} must" + f" be >= {self.timestamp.version}" ) # Keep using old timestamp if versions are equal. - if new_timestamp.signed.version == self.timestamp.signed.version: + if new_timestamp.version == self.timestamp.version: raise exceptions.EqualVersionNumberError() # Prevent rolling back snapshot version - snapshot_meta = self.timestamp.signed.snapshot_meta - new_snapshot_meta = new_timestamp.signed.snapshot_meta + snapshot_meta = self.timestamp.snapshot_meta + new_snapshot_meta = new_timestamp.snapshot_meta if new_snapshot_meta.version < snapshot_meta.version: raise exceptions.BadVersionNumberError( f"New snapshot version must be >= {snapshot_meta.version}" @@ -252,7 +255,7 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: # protection of new timestamp: expiry is checked in update_snapshot() self._trusted_set[Timestamp.type] = new_timestamp - logger.debug("Updated timestamp v%d", new_timestamp.signed.version) + logger.debug("Updated timestamp v%d", new_timestamp.version) # timestamp is loaded: raise if it is not valid _final_ timestamp self._check_final_timestamp() @@ -262,12 +265,12 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: def _check_final_timestamp(self) -> None: """Raise if timestamp is expired.""" - if self.timestamp.signed.is_expired(self.reference_time): + if self.timestamp.is_expired(self.reference_time): raise exceptions.ExpiredMetadataError("timestamp.json is expired") def update_snapshot( self, data: bytes, trusted: Optional[bool] = False - ) -> Metadata[Snapshot]: + ) -> Snapshot: """Verify and load ``data`` as new snapshot metadata. Note that an intermediate snapshot is allowed to be expired and version @@ -293,7 +296,7 @@ def update_snapshot( The actual error type and content will contain more details. Returns: - Deserialized and verified snapshot ``Metadata`` object + Deserialized and verified ``Snapshot`` object """ if Timestamp.type not in self._trusted_set: @@ -305,31 +308,22 @@ def update_snapshot( # Snapshot cannot be loaded if final timestamp is expired self._check_final_timestamp() - snapshot_meta = self.timestamp.signed.snapshot_meta + snapshot_meta = self.timestamp.snapshot_meta # Verify non-trusted data against the hashes in timestamp, if any. # Trusted snapshot data has already been verified once. if not trusted: snapshot_meta.verify_length_and_hashes(data) - new_snapshot = Metadata[Snapshot].from_bytes(data) - - if new_snapshot.signed.type != Snapshot.type: - raise exceptions.RepositoryError( - f"Expected 'snapshot', got '{new_snapshot.signed.type}'" - ) - - self.root.signed.verify_delegate( - Snapshot.type, new_snapshot.signed_bytes, new_snapshot.signatures - ) + new_snapshot, _, _ = self._load_data(Snapshot, data, self.root) # version not checked against meta version to allow old snapshot to be # used in rollback protection: it is checked when targets is updated # If an existing trusted snapshot is updated, check for rollback attack if Snapshot.type in self._trusted_set: - for filename, fileinfo in self.snapshot.signed.meta.items(): - new_fileinfo = new_snapshot.signed.meta.get(filename) + for filename, fileinfo in self.snapshot.meta.items(): + new_fileinfo = new_snapshot.meta.get(filename) # Prevent removal of any metadata in meta if new_fileinfo is None: @@ -348,7 +342,7 @@ def update_snapshot( # protection of new snapshot: it is checked when targets is updated self._trusted_set[Snapshot.type] = new_snapshot - logger.debug("Updated snapshot v%d", new_snapshot.signed.version) + logger.debug("Updated snapshot v%d", new_snapshot.version) # snapshot is loaded, but we raise if it's not valid _final_ snapshot self._check_final_snapshot() @@ -358,16 +352,16 @@ def update_snapshot( def _check_final_snapshot(self) -> None: """Raise if snapshot is expired or meta version does not match.""" - if self.snapshot.signed.is_expired(self.reference_time): + if self.snapshot.is_expired(self.reference_time): raise exceptions.ExpiredMetadataError("snapshot.json is expired") - snapshot_meta = self.timestamp.signed.snapshot_meta - if self.snapshot.signed.version != snapshot_meta.version: + snapshot_meta = self.timestamp.snapshot_meta + if self.snapshot.version != snapshot_meta.version: raise exceptions.BadVersionNumberError( f"Expected snapshot version {snapshot_meta.version}, " - f"got {self.snapshot.signed.version}" + f"got {self.snapshot.version}" ) - def update_targets(self, data: bytes) -> Metadata[Targets]: + def update_targets(self, data: bytes) -> Targets: """Verify and load ``data`` as new top-level targets metadata. Args: @@ -378,13 +372,13 @@ def update_targets(self, data: bytes) -> Metadata[Targets]: error type and content will contain more details. Returns: - Deserialized and verified targets ``Metadata`` object + Deserialized and verified `Targets`` object """ return self.update_delegated_targets(data, Targets.type, Root.type) def update_delegated_targets( self, data: bytes, role_name: str, delegator_name: str - ) -> Metadata[Targets]: + ) -> Targets: """Verify and load ``data`` as new metadata for target ``role_name``. Args: @@ -398,7 +392,7 @@ def update_delegated_targets( error type and content will contain more details. Returns: - Deserialized and verified targets ``Metadata`` object + Deserialized and verified ``Targets`` object """ if Snapshot.type not in self._trusted_set: raise RuntimeError("Cannot load targets before snapshot") @@ -407,14 +401,14 @@ def update_delegated_targets( # does not match meta version in timestamp self._check_final_snapshot() - delegator: Optional[Metadata] = self.get(delegator_name) + delegator: Optional[Delegator] = self.get(delegator_name) if delegator is None: raise RuntimeError("Cannot load targets before delegator") logger.debug("Updating %s delegated by %s", role_name, delegator_name) # Verify against the hashes in snapshot, if any - meta = self.snapshot.signed.meta.get(f"{role_name}.json") + meta = self.snapshot.meta.get(f"{role_name}.json") if meta is None: raise exceptions.RepositoryError( f"Snapshot does not contain information for '{role_name}'" @@ -422,24 +416,17 @@ def update_delegated_targets( meta.verify_length_and_hashes(data) - new_delegate = Metadata[Targets].from_bytes(data) - - if new_delegate.signed.type != Targets.type: - raise exceptions.RepositoryError( - f"Expected 'targets', got '{new_delegate.signed.type}'" - ) - - delegator.signed.verify_delegate( - role_name, new_delegate.signed_bytes, new_delegate.signatures + new_delegate, _, _ = self._load_data( + Targets, data, delegator, role_name ) - version = new_delegate.signed.version + version = new_delegate.version if version != meta.version: raise exceptions.BadVersionNumberError( f"Expected {role_name} v{meta.version}, got v{version}." ) - if new_delegate.signed.is_expired(self.reference_time): + if new_delegate.is_expired(self.reference_time): raise exceptions.ExpiredMetadataError(f"New {role_name} is expired") self._trusted_set[role_name] = new_delegate @@ -453,16 +440,73 @@ def _load_trusted_root(self, data: bytes) -> None: Note that an expired initial root is considered valid: expiry is only checked for the final root in ``update_timestamp()``. """ - new_root = Metadata[Root].from_bytes(data) + new_root, new_root_bytes, new_root_signatures = self._load_data( + Root, data + ) + new_root.verify_delegate(Root.type, new_root_bytes, new_root_signatures) - if new_root.signed.type != Root.type: - raise exceptions.RepositoryError( - f"Expected 'root', got '{new_root.signed.type}'" - ) + self._trusted_set[Root.type] = new_root + logger.debug("Loaded trusted root v%d", new_root.version) + + +def _load_from_metadata( + role: Type[T], + data: bytes, + delegator: Optional[Delegator] = None, + role_name: Optional[str] = None, +) -> Tuple[T, bytes, Dict[str, Signature]]: # noqa: D102 + """Load traditional metadata bytes, and extract and verify payload. - new_root.signed.verify_delegate( - Root.type, new_root.signed_bytes, new_root.signatures + If no delegator is passed, verification is skipped. Returns a tuple of + deserialized payload, signed payload bytes, and signatures. + """ + md = Metadata[T].from_bytes(data) + + if md.signed.type != role.type: + raise exceptions.RepositoryError( + f"Expected '{role.type}', got '{md.signed.type}'" ) - self._trusted_set[Root.type] = new_root - logger.debug("Loaded trusted root v%d", new_root.signed.version) + if delegator: + if role_name is None: + role_name = role.type + + delegator.verify_delegate(role_name, md.signed_bytes, md.signatures) + + return md.signed, md.signed_bytes, md.signatures + + +def _load_from_simple_envelope( + role: Type[T], + data: bytes, + delegator: Optional[Delegator] = None, + role_name: Optional[str] = None, +) -> Tuple[T, bytes, Dict[str, Signature]]: # noqa: D102 + """Load simple envelope bytes, and extract and verify payload. + + If no delegator is passed, verification is skipped. Returns a tuple of + deserialized payload, signed payload bytes, and signatures. + """ + + envelope = SimpleEnvelope[T].from_bytes(data) + + if envelope.payload_type != SimpleEnvelope._DEFAULT_PAYLOAD_TYPE: + raise exceptions.RepositoryError( + f"Expected '{SimpleEnvelope._DEFAULT_PAYLOAD_TYPE}', " + f"got '{envelope.payload_type}'" + ) + + if delegator: + if role_name is None: + role_name = role.type + delegator.verify_delegate( + role_name, envelope.pae(), envelope.signatures_dict + ) + + signed = envelope.get_signed() + if signed.type != role.type: + raise exceptions.RepositoryError( + f"Expected '{role.type}', got '{signed.type}'" + ) + + return signed, envelope.pae(), envelope.signatures_dict diff --git a/tuf/ngclient/config.py b/tuf/ngclient/config.py index 5027994278..3ef294063b 100644 --- a/tuf/ngclient/config.py +++ b/tuf/ngclient/config.py @@ -5,6 +5,20 @@ """ from dataclasses import dataclass +from enum import Flag, unique + + +@unique +class EnvelopeType(Flag): + """Configures deserialization and verification mode of TUF metadata. + + Args: + METADATA: Traditional canonical JSON -based TUF Metadata. + SIMPLE: Dead Simple Signing Envelope. (experimental) + """ + + METADATA = 1 + SIMPLE = 2 @dataclass @@ -23,7 +37,9 @@ class UpdaterConfig: are used, target download URLs are formed by prefixing the filename with a hash digest of file content by default. This can be overridden by setting ``prefix_targets_with_hash`` to ``False``. - + envelope_type: Configures deserialization and verification mode of TUF + metadata. Per default, it is treated as traditional canonical JSON + -based TUF Metadata. """ max_root_rotations: int = 32 @@ -33,3 +49,4 @@ class UpdaterConfig: snapshot_max_length: int = 2000000 # bytes targets_max_length: int = 5000000 # bytes prefix_targets_with_hash: bool = True + envelope_type: EnvelopeType = EnvelopeType.METADATA diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index ca41b2b566..2cfccc661e 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -41,20 +41,13 @@ import os import shutil import tempfile -from typing import Optional, Set +from typing import Optional, Set, cast from urllib import parse from tuf.api import exceptions -from tuf.api.metadata import ( - Metadata, - Root, - Snapshot, - TargetFile, - Targets, - Timestamp, -) +from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set -from tuf.ngclient.config import UpdaterConfig +from tuf.ngclient.config import EnvelopeType, UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface logger = logging.getLogger(__name__) @@ -101,10 +94,20 @@ def __init__( # Read trusted local root metadata data = self._load_local_metadata(Root.type) - self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data) self._fetcher = fetcher or requests_fetcher.RequestsFetcher() self.config = config or UpdaterConfig() + supported_envelopes = [EnvelopeType.METADATA, EnvelopeType.SIMPLE] + if self.config.envelope_type not in supported_envelopes: + raise ValueError( + f"config: envelope_type must be one of {supported_envelopes}, " + f"got '{self.config.envelope_type}'" + ) + + self._trusted_set = trusted_metadata_set.TrustedMetadataSet( + data, self.config.envelope_type + ) + def refresh(self) -> None: """Refresh top-level metadata. @@ -244,7 +247,7 @@ def download_target( target_base_url = _ensure_trailing_slash(target_base_url) target_filepath = targetinfo.path - consistent_snapshot = self._trusted_set.root.signed.consistent_snapshot + consistent_snapshot = self._trusted_set.root.consistent_snapshot if consistent_snapshot and self.config.prefix_targets_with_hash: hashes = list(targetinfo.hashes.values()) dirname, sep, basename = target_filepath.rpartition("/") @@ -310,7 +313,7 @@ def _load_root(self) -> None: """ # Update the root role - lower_bound = self._trusted_set.root.signed.version + 1 + lower_bound = self._trusted_set.root.version + 1 upper_bound = lower_bound + self.config.max_root_rotations for next_version in range(lower_bound, upper_bound): @@ -361,22 +364,22 @@ def _load_snapshot(self) -> None: # Local snapshot does not exist or is invalid: update from remote logger.debug("Local snapshot not valid as final: %s", e) - snapshot_meta = self._trusted_set.timestamp.signed.snapshot_meta + snapshot_meta = self._trusted_set.timestamp.snapshot_meta length = snapshot_meta.length or self.config.snapshot_max_length version = None - if self._trusted_set.root.signed.consistent_snapshot: + if self._trusted_set.root.consistent_snapshot: version = snapshot_meta.version data = self._download_metadata(Snapshot.type, length, version) self._trusted_set.update_snapshot(data) self._persist_metadata(Snapshot.type, data) - def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: + def _load_targets(self, role: str, parent_role: str) -> Targets: """Load local (and if needed remote) metadata for ``role``.""" # Avoid loading 'role' more than once during "get_targetinfo" if role in self._trusted_set: - return self._trusted_set[role] + return cast(Targets, self._trusted_set[role]) try: data = self._load_local_metadata(role) @@ -389,7 +392,7 @@ def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: # Local 'role' does not exist or is invalid: update from remote logger.debug("Failed to load local %s: %s", role, e) - snapshot = self._trusted_set.snapshot.signed + snapshot = self._trusted_set.snapshot metainfo = snapshot.meta.get(f"{role}.json") if metainfo is None: raise exceptions.RepositoryError( @@ -398,7 +401,7 @@ def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: length = metainfo.length or self.config.targets_max_length version = None - if self._trusted_set.root.signed.consistent_snapshot: + if self._trusted_set.root.consistent_snapshot: version = metainfo.version data = self._download_metadata(role, length, version) @@ -438,7 +441,7 @@ def _preorder_depth_first_walk( # The metadata for 'role_name' must be downloaded/updated before # its targets, delegations, and child roles can be inspected. - targets = self._load_targets(role_name, parent_role).signed + targets = self._load_targets(role_name, parent_role) target = targets.targets.get(target_filepath)