Skip to content

Commit

Permalink
Support identity reference (release-engineering#35)
Browse files Browse the repository at this point in the history
* Support identity reference

When identity reference is provided it's used for cosign as
--sign-container-identity
  • Loading branch information
midnightercz authored Jun 26, 2024
1 parent c3d9e7b commit 6399420
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/pubtools/sign/operations/containersign.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class ContainerSignOperation(SignOperation):
task_id: str = field(
metadata={"description": "Usually pub task id, serves as identifier for in signing request"}
)
identity_references: List[str] = field(
metadata={"description": "List of references to sign"}, default_factory=list
)

def to_dict(self) -> dict[str, Any]:
"""Return a dict representation of the object."""
Expand Down
36 changes: 32 additions & 4 deletions src/pubtools/sign/signers/cosignsigner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from dataclasses import field, dataclass
import itertools
import json
import logging
from typing import Dict, List, ClassVar, Any, Tuple, Type
Expand Down Expand Up @@ -233,6 +234,7 @@ def container_sign(self: CosignSigner, operation: ContainerSignOperation) -> Sig

outputs = {}
ref_args = {}
identity_args = {}
common_args = [
self.cosign_bin,
"-t",
Expand All @@ -254,14 +256,28 @@ def container_sign(self: CosignSigner, operation: ContainerSignOperation) -> Sig
env_vars = os.environ.copy()
env_vars.update(self.env_variables)
if operation.references:
for ref, digest in zip(operation.references, operation.digests):
for ref, identity, digest in itertools.zip_longest(
operation.references, operation.identity_references, operation.digests, fillvalue=""
):
repo, tag = ref.rsplit(":", 1)
ref_args[f"{repo}@{digest}"] = ["-a", f"tag={tag}", f"{repo}@{digest}"]
if identity:
identity_args[f"{repo}@{digest}"] = ["--sign-container-identity", identity]

else:
for ref_digest in operation.digests:
for ref_digest, identity in itertools.zip_longest(
operation.digests, operation.identity_references, fillvalue=""
):
ref_args[ref_digest] = [ref_digest]
if identity:
repo, digest = ref_digest.rsplit("@", 1)
identity_args[f"{repo}@{digest}"] = ["--sign-container-identity", identity]

for ref, args in ref_args.items():
outputs[ref] = run_command(common_args + args, env=env_vars, tries=self.retries)
_identity_args = identity_args.get(ref, [])
outputs[ref] = run_command(
common_args + _identity_args + args, env=env_vars, tries=self.retries
)

for ref, (stdout, stderr, returncode) in outputs.items():
if returncode != 0:
Expand Down Expand Up @@ -318,6 +334,7 @@ def cosign_container_sign(
config_file: str = "",
digest: List[str] = [],
reference: List[str] = [],
identity: List[str] = [],
) -> Dict[str, Any]:
"""Run containersign operation with cli arguments.
Expand All @@ -326,6 +343,7 @@ def cosign_container_sign(
config_file (str): path to the config file
digest (str): digest of the image to sign
reference (str): reference of the image to sign
identity (str): identity to sign the image with
Returns:
dict: signing result
"""
Expand All @@ -336,13 +354,14 @@ def cosign_container_sign(
operation = ContainerSignOperation(
digests=digest,
references=reference,
identity_references=identity,
signing_key=signing_key,
task_id="",
)
signing_result = cosign_signer.sign(operation)
return {
"signer_result": signing_result.signer_results.to_dict(),
"operation_results": signing_result.operation_result.results, # type: ignore
"operation_results": signing_result.operation_result.results,
"operation": signing_result.operation.to_dict(),
"signing_key": signing_result.operation_result.signing_key,
}
Expand Down Expand Up @@ -384,12 +403,20 @@ def cosign_list_existing_signatures(config_file: str, reference: str) -> Tuple[b
type=str,
help="References which should be signed.",
)
@click.option(
"--identity",
required=False,
multiple=True,
type=str,
help="Identity reference.",
)
@click.option("--raw", default=False, is_flag=True, help="Print raw output instead of json")
def cosign_container_sign_main(
signing_key: str = "",
config_file: str = "",
digest: List[str] = [],
reference: List[str] = [],
identity: List[str] = [],
raw: bool = False,
) -> None:
"""Entry point method for containersign operation."""
Expand All @@ -398,6 +425,7 @@ def cosign_container_sign_main(
config_file=config_file,
digest=digest,
reference=reference,
identity=identity,
)
if not raw:
click.echo(json.dumps(ret))
Expand Down
2 changes: 1 addition & 1 deletion src/pubtools/sign/signers/msgsigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ def msg_container_sign(
signing_result = msg_signer.sign(operation)
return {
"signer_result": signing_result.signer_results.to_dict(),
"operation_results": signing_result.operation_result.results, # type: ignore
"operation_results": signing_result.operation_result.results,
"operation": signing_result.operation.to_dict(),
"signing_key": signing_result.operation_result.signing_key,
}
Expand Down
157 changes: 157 additions & 0 deletions tests/test_cosign_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ def f_expected_container_sign_args(f_config_cosign_signer_ok):
]


@pytest.fixture
def f_expected_container_sign_identity_args(f_config_cosign_signer_ok):
return [
"--signing-key",
"test-signing-key",
"--digest",
"some-digest",
"--reference",
"some-reference",
"--identity",
"some-registry/namespace/repo",
"--config-file",
f_config_cosign_signer_ok,
]


@pytest.fixture
def f_expected_cosign_sign_args():
return [
Expand All @@ -89,6 +105,33 @@ def f_expected_cosign_sign_args():
]


@pytest.fixture
def f_expected_cosign_sign_identity_args():
return [
"/usr/bin/cosign",
"-t",
"30s",
"sign",
"-y",
"--key",
"test-signing-key",
"--allow-http-registry=false",
"--allow-insecure-registry=false",
"--rekor-url",
"https://rekor.sigstore.dev",
"--tlog-upload=true",
"--registry-username",
"some-user",
"--registry-password",
"some-password",
"--sign-container-identity",
"some-registry/namespace/repo",
"-a",
"tag=tag",
"some-registry/namespace/repo@sha256:abcdefg",
]


@pytest.fixture
def f_expected_cosign_triangulate_args():
return [
Expand Down Expand Up @@ -118,6 +161,18 @@ def test_cosign_container_sign(f_cosign_signer, f_expected_container_sign_args):
assert result.exit_code == 0, result.output


def test_cosign_container_identity_sign(f_cosign_signer, f_expected_container_sign_identity_args):
f_cosign_signer.return_value.sign.return_value.signer_results.to_dict.return_value = {
"status": "ok"
}
f_cosign_signer.return_value.sign.return_value.operation_result.results = []
f_cosign_signer.return_value.sign.return_value.operation_result.signing_key = ""
f_cosign_signer.return_value.sign.return_value.operation.to_dict.return_value = {}
result = CliRunner().invoke(cosign_container_sign_main, f_expected_container_sign_identity_args)
print(result.stdout)
assert result.exit_code == 0, result.output


def test_cosign_container_sign_error(f_cosign_signer, f_expected_container_sign_args):
f_cosign_signer.return_value.sign.return_value.signer_results.to_dict.return_value = {
"status": "error",
Expand Down Expand Up @@ -224,6 +279,47 @@ def test_container_sign(f_config_cosign_signer_ok, f_environ, f_expected_cosign_
)


def test_container_sign_identity(
f_config_cosign_signer_ok, f_environ, f_expected_cosign_sign_identity_args
):
container_sign_operation = ContainerSignOperation(
task_id="",
digests=["sha256:abcdefg"],
references=["some-registry/namespace/repo:tag"],
identity_references=["some-registry/namespace/repo"],
signing_key="test-signing-key",
)

with patch("subprocess.Popen") as patched_popen:
patched_popen().returncode = 0
patched_popen().communicate.return_value = ("stdout", "stderr")

signer = CosignSigner()
signer.load_config(load_config(f_config_cosign_signer_ok))
res = signer.container_sign(container_sign_operation)

patched_popen.assert_has_calls(
[
call(
f_expected_cosign_sign_identity_args,
env={"PYTEST_CURRENT_TEST": ANY},
stderr=-1,
stdout=-1,
text=True,
)
]
)

assert res == SigningResults(
signer=signer,
operation=container_sign_operation,
signer_results=CosignSignerResults(status="ok", error_message=""),
operation_result=ContainerSignResult(
results=["stderr"], signing_key="test-signing-key", failed=False
),
)


def test_container_sign_alias(f_config_cosign_signer_aliases, f_environ):
container_sign_operation = ContainerSignOperation(
task_id="",
Expand Down Expand Up @@ -374,6 +470,67 @@ def test_container_sign_digests_only(
)


def test_container_sign_digests_only_indentity(
f_config_cosign_signer_ok, f_environ, f_expected_cosign_sign_args
):
container_sign_operation = ContainerSignOperation(
task_id="",
digests=["some-registry/namespace/repo@sha256:abcdefg"],
references=[],
identity_references=["some-registry/namespace/repo"],
signing_key="test-signing-key",
)

with patch("subprocess.Popen") as patched_popen:
patched_popen().returncode = 0
patched_popen().communicate.return_value = ("stdout", "stderr")

signer = CosignSigner()
signer.load_config(load_config(f_config_cosign_signer_ok))
res = signer.container_sign(container_sign_operation)

patched_popen.assert_has_calls(
[
call(
[
"/usr/bin/cosign",
"-t",
"30s",
"sign",
"-y",
"--key",
"test-signing-key",
"--allow-http-registry=false",
"--allow-insecure-registry=false",
"--rekor-url",
"https://rekor.sigstore.dev",
"--tlog-upload=true",
"--registry-username",
"some-user",
"--registry-password",
"some-password",
"--sign-container-identity",
"some-registry/namespace/repo",
"some-registry/namespace/repo@sha256:abcdefg",
],
env=ANY,
stderr=-1,
stdout=-1,
text=True,
)
]
)

assert res == SigningResults(
signer=signer,
operation=container_sign_operation,
signer_results=CosignSignerResults(status="ok", error_message=""),
operation_result=ContainerSignResult(
results=["stderr"], signing_key="test-signing-key", failed=False
),
)


def test_container_sign_mismatch_refs(f_config_cosign_signer_ok):
container_sign_operation = ContainerSignOperation(
task_id="",
Expand Down
2 changes: 2 additions & 0 deletions tests/test_sign_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ def test_containersign_operation_doc_argument():
"task_id": {
"description": "Usually pub task id, serves as identifier for in signing request"
},
"identity_references": {"description": "List of references to sign"},
},
"examples": {
"digests": "",
"references": "",
"signing_key": "",
"task_id": "",
"identity_references": "",
},
}

Expand Down

0 comments on commit 6399420

Please sign in to comment.