diff --git a/.github/workflows/xtest.yml b/.github/workflows/xtest.yml index a4acb7eb..0b7ed95d 100644 --- a/.github/workflows/xtest.yml +++ b/.github/workflows/xtest.yml @@ -187,67 +187,3 @@ jobs: working-directory: otdftests/xtest env: PLATFORM_DIR: '../../${{ steps.run-platform.outputs.platform-working-dir }}' -###### TODO: move these unbound tests to v2 platform -# unbound-test-js: -# timeout-minutes: 60 -# runs-on: ubuntu-latest -# defaults: -# run: -# working-directory: xtest -# permissions: -# contents: read -# packages: read -# strategy: -# matrix: -# kasversion: [ python-kas, go-kas ] -# steps: -# - uses: actions/checkout@v3 -# - name: Set kas-related environment variable -# shell: bash -# run: echo "KAS_VERSION=${{ matrix.kasversion }}" >> $GITHUB_ENV -# - name: Set up Node 18 -# uses: actions/setup-node@v3 -# with: -# node-version: "18.x" -# registry-url: https://npm.pkg.github.com -# - name: Set up Python 3.10 -# uses: actions/setup-python@v4 -# with: -# python-version: "3.10" -# # todo: install and activate virtual env for python? -# - name: update packages -# run: |- -# npm ci -# npm install @opentdf/cli@${{ github.event.client_payload.version }} @opentdf/client@${{ github.event.client_payload.version }} -# npm list -# pip3 install -r requirements.txt -# env: -# NODE_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }} -# - uses: yokawasa/action-setup-kube-tools@v0.9.2 -# with: -# setup-tools: | -# kubectl -# helm -# tilt -# # This should be in sync with the minikube-deployed kube version below -# kubectl: "1.24.1" -# helm: "3.9.2" -# tilt: "0.31.2" -# - run: | -# kubectl version --client -# kustomize version -# tilt version -# - name: start minikube -# id: minikube -# uses: medyagh/setup-minikube@master -# with: -# minikube-version: 1.26.0 -# # This should be in sync with the setup-tools version above -# kubernetes-version: 1.24.1 -# - name: Run tilt -# run: |- -# [[ -z "${{github.event.inputs.backendVersion}}" ]] && export BACKEND_LATEST_VERSION=$(skopeo list-tags docker://ghcr.io/opentdf/charts/backend \ -# | python3 -c "import sys, json; sys.stdout.write([tag for tag in json.load(sys.stdin)['Tags'] if not tag.endswith('.sig')][-1])") || export BACKEND_LATEST_VERSION="${{github.event.inputs.backendVersion}}" -# echo "Testing Backend [$BACKEND_LATEST_VERSION]">>$GITHUB_STEP_SUMMARY -# kubectl version -# tilt ci -f Tiltfile.unbound-js-sdk diff --git a/xtest/sdk/py/test_unbound_policy.py b/xtest/sdk/py/test_unbound_policy.py deleted file mode 100644 index 3d2a65de..00000000 --- a/xtest/sdk/py/test_unbound_policy.py +++ /dev/null @@ -1,121 +0,0 @@ -import json -import logging -import os -import sys -import zipfile - -from opentdf import TDFClient, OIDCCredentials, LogLevel, TDFStorageType - -OIDC_ENDPONT = "http://localhost:65432/" -KAS_URL = "http://localhost:65432/api/kas" -OIDC_CONFIGURATION_URL = ( - "http://localhost:65432/auth/realms/tdf/.well-known/openid-configuration" -) - - -def extract_policy_binding(zip_file_path): - with open(zip_file_path, "r") as manifest: - manifest_data = json.load(manifest) - return manifest_data["encryptionInformation"]["keyAccess"][0]["policyBinding"] - - -def update_policy_binding(zip_file_path, new_policy_binding): - with open(zip_file_path, "r") as manifest: - manifest_data = json.load(manifest) - manifest_data["encryptionInformation"]["keyAccess"][0][ - "policyBinding" - ] = new_policy_binding - - with open(zip_file_path, "w") as manifest: - json.dump(manifest_data, manifest) - - -def main(): - # Create OIDC credentials object - oidc_creds = OIDCCredentials(OIDC_CONFIGURATION_URL) - oidc_creds.set_client_id_and_client_secret( - client_id="tdf-client", client_secret="123-456" - ) - - client = TDFClient(oidc_credentials=oidc_creds, kas_url=KAS_URL) - client.enable_console_logging(LogLevel.Debug) - cwd = os.getcwd() - sample_txt = os.path.join(cwd, "sample.txt") - with open(sample_txt, "w") as file: - file.write("Virtru") - - sampleTxtStorage = TDFStorageType() - sampleTxtStorage.set_tdf_storage_file_type(sample_txt) - tdf_no_policy = os.path.join(cwd, "sample_nopolicy.txt.tdf") - - client.encrypt_file(sampleTxtStorage, tdf_no_policy) - - client.add_data_attribute( - "https://example.com/attr/Classification/value/M", KAS_URL - ) - tdf_with_policy = os.path.join(cwd, "sample_policy.txt.tdf") - client.encrypt_file(sampleTxtStorage, tdf_with_policy) - - # Unzip the first zip file - tdf_nopolicy_folder = os.path.join(cwd, "no_policy") - with zipfile.ZipFile(tdf_no_policy, "r") as zip_ref: - zip_ref.extractall(tdf_nopolicy_folder) - logging.info("Unzipped 1st tdf file: %s", tdf_no_policy) - - # Unzip the second zip file - tdf_with_policy_folder = os.path.join(cwd, "with_policy") - with zipfile.ZipFile(tdf_with_policy, "r") as zip_ref: - zip_ref.extractall(tdf_with_policy_folder) - logging.info("Unzipped 2nd tdf file: %s", tdf_with_policy) - - # Get the policy binding from the second zip file - fullpath = os.path.join(tdf_with_policy_folder, "0.manifest.json") - policy_binding = extract_policy_binding(fullpath) - - # Update the policy binding in the first zip file - fullpath = os.path.join(tdf_nopolicy_folder, "0.manifest.json") - update_policy_binding(fullpath, policy_binding) - logging.info("Updated policy binding in file: %s", fullpath) - - # Zip the updated files - updated_tdf_file = "updated_file.tdf" - with zipfile.ZipFile(updated_tdf_file, "w") as zip_ref: - for folder_name, subfolders, filenames in os.walk(tdf_nopolicy_folder): - for filename in filenames: - file_path = os.path.join(folder_name, filename) - zip_ref.write( - file_path, os.path.relpath(file_path, tdf_nopolicy_folder) - ) - logging.info("Added file to zip: %s", file_path) - - sampleTdfStorage = TDFStorageType() - sampleTdfStorage.set_tdf_storage_file_type(os.path.join(cwd, updated_tdf_file)) - - if os.path.exists(os.path.join(cwd, "sample_policy.txt")): - os.remove(os.path.join(cwd, "sample_policy.txt")) - - try: - client.decrypt_file(sampleTdfStorage, os.path.join(cwd, "sample_policy.txt")) - if os.path.exists(os.path.join(cwd, "sample_policy.txt")): - logging.error( - "Decryption was successful, but it was not expected due to a broken policy." - ) - sys.exit(1) - else: - logging.error("Broken policy ignored") - sys.exit(0) - except Exception as e: - error_message = f"An error occurred: {e}" - if "[403] Error: [Invalid Binding]" in str(e): - logging.warning("Expected - Invalid binding error occurred: %s", e) - print("Invalid binding error occurred.") - elif "desc = bad request" in error_message: - logging.warning("Expected - Policy HMAC mismatch error occurred: %s", e) - print("Policy HMAC mismatch error occurred.") - else: - print("Unexpected error: %s" % sys.exc_info()[0]) - raise - - -if __name__ == "__main__": - main() diff --git a/xtest/tdfs.py b/xtest/tdfs.py index 1ea60010..6448e185 100644 --- a/xtest/tdfs.py +++ b/xtest/tdfs.py @@ -1,3 +1,5 @@ +import base64 +from collections.abc import Callable import logging import os import subprocess @@ -22,6 +24,25 @@ } +class DataAttribute(BaseModel): + attribute: str + isDefault: bool | None = None + displayName: str | None = None + pubKey: str + kasUrl: str + schemaVersion: str | None = None + + +class PolicyBody(BaseModel): + dataAttributes: list[DataAttribute] | None = None + dissem: list[str] | None = None + + +class Policy(BaseModel): + uuid: str + body: PolicyBody + + class PolicyBinding(BaseModel): alg: str hash: str @@ -80,6 +101,16 @@ class EncryptionInformation(BaseModel): method: EncryptionMethod integrityInformation: Integrity + @property + def policy_object(self) -> Policy: + b = base64.b64decode(self.policy) + return Policy.model_validate_json(b) + + @policy_object.setter + def policy_object(self, value: Policy): + b = value.model_dump_json().encode() + self.policy = base64.b64encode(b).decode() + class Manifest(BaseModel): encryptionInformation: EncryptionInformation @@ -92,6 +123,30 @@ def manifest(tdf_file: str) -> Manifest: return Manifest.model_validate_json(manifestEntry.read()) +# Create a modified variant of a TDF by manipulating its manifest +def update_manifest( + scenario_name: str, tdf_file: str, manifest_change: Callable[[Manifest], Manifest] +) -> str: + # get the parent directory of the tdf file + tmp_dir = os.path.dirname(tdf_file) + fname = os.path.basename(tdf_file).split(".")[0] + unzipped_dir = os.path.join(tmp_dir, f"{fname}-{scenario_name}-unzipped") + with zipfile.ZipFile(tdf_file, "r") as zipped: + zipped.extractall(unzipped_dir) + with open(os.path.join(unzipped_dir, "0.manifest.json"), "r") as manifest_file: + manifest_data = Manifest.model_validate_json(manifest_file.read()) + new_manifest_data = manifest_change(manifest_data) + with open(os.path.join(unzipped_dir, "0.manifest.json"), "w") as manifest_file: + manifest_file.write(new_manifest_data.model_dump_json()) + outfile = os.path.join(tmp_dir, f"{fname}-{scenario_name}.tdf") + with zipfile.ZipFile(outfile, "w") as zipped: + for folder_name, _, filenames in os.walk(unzipped_dir): + for filename in filenames: + file_path = os.path.join(folder_name, filename) + zipped.write(file_path, os.path.relpath(file_path, unzipped_dir)) + return outfile + + def encrypt( sdk, pt_file, @@ -132,7 +187,7 @@ def decrypt(sdk, ct_file, rt_file, fmt="nano"): fmt, ] logger.info(f"dec [{' '.join(c)}]") - subprocess.check_call(c) + subprocess.check_output(c, stderr=subprocess.STDOUT) def supports(sdk: sdk_type, feature: feature_type) -> bool: diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index b929b8b9..16ec3272 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -1,5 +1,6 @@ import filecmp import os +import subprocess import pytest @@ -11,10 +12,41 @@ counter = 0 -def test_tdf(encrypt_sdk, decrypt_sdk, pt_file, tmp_dir, container): +def doEncryptWith( + pt_file: str, encrypt_sdk: str, container: str, tmp_dir: str, use_ecdsa: bool +) -> str: global counter counter = (counter or 0) + 1 c = counter + container_id = f"{encrypt_sdk}-{container}" + if container_id in cipherTexts: + return cipherTexts[container_id] + ct_file = f"{tmp_dir}test-{encrypt_sdk}-{c}.{container}" + tdfs.encrypt( + encrypt_sdk, + pt_file, + ct_file, + mime_type="text/plain", + fmt=container, + use_ecdsa_binding=use_ecdsa, + ) + if container == "ztdf": + manifest = tdfs.manifest(ct_file) + assert manifest.payload.isEncrypted + elif container == "nano": + with open(ct_file, "rb") as f: + envelope = nano.parse(f.read()) + assert envelope.header.version.version == 12 + assert envelope.header.binding_mode.use_ecdsa_binding == use_ecdsa + if envelope.header.kas.kid is not None: + # from xtest/platform/opentdf.yaml + expected_kid = b"ec1" + b"\0" * 5 + assert envelope.header.kas.kid == expected_kid + cipherTexts[container_id] = ct_file + return ct_file + + +def test_tdf(encrypt_sdk, decrypt_sdk, pt_file, tmp_dir, container): use_ecdsa = False if container == "nano-with-ecdsa": if not tdfs.supports(encrypt_sdk, "nano_ecdsa"): @@ -23,32 +55,31 @@ def test_tdf(encrypt_sdk, decrypt_sdk, pt_file, tmp_dir, container): ) container = "nano" use_ecdsa = True - container_id = f"{encrypt_sdk}-{container}" - if container_id not in cipherTexts: - ct_file = f"{tmp_dir}test-{encrypt_sdk}-{c}.{container}" - tdfs.encrypt( - encrypt_sdk, - pt_file, - ct_file, - mime_type="text/plain", - fmt=container, - use_ecdsa_binding=use_ecdsa, - ) - if container == "ztdf": - manifest = tdfs.manifest(ct_file) - assert manifest.payload.isEncrypted - elif container == "nano": - with open(ct_file, "rb") as f: - envelope = nano.parse(f.read()) - assert envelope.header.version.version == 12 - assert envelope.header.binding_mode.use_ecdsa_binding == use_ecdsa - if envelope.header.kas.kid is not None: - # from xtest/platform/opentdf.yaml - expected_kid = b"ec1" + b"\0" * 5 - assert envelope.header.kas.kid == expected_kid - cipherTexts[container_id] = ct_file - ct_file = cipherTexts[container_id] + ct_file = doEncryptWith(pt_file, encrypt_sdk, container, tmp_dir, use_ecdsa) assert os.path.isfile(ct_file) - rt_file = f"{tmp_dir}test-{c}.untdf" + fname = os.path.basename(ct_file).split(".")[0] + rt_file = f"{tmp_dir}test-{fname}.untdf" tdfs.decrypt(decrypt_sdk, ct_file, rt_file, container) assert filecmp.cmp(pt_file, rt_file) + + +def breakBinding(manifest: tdfs.Manifest) -> tdfs.Manifest: + # base64 decode policy from manifest.encryptionInformation.policy + p = manifest.encryptionInformation.policy_object + p.body.dataAttributes = [] + p.body.dissem = ["yves@dropp.er"] + manifest.encryptionInformation.policy_object = p + return manifest + + +def test_tdf_with_unbound_policy(encrypt_sdk, decrypt_sdk, pt_file, tmp_dir): + ct_file = doEncryptWith(pt_file, encrypt_sdk, "ztdf", tmp_dir, False) + assert os.path.isfile(ct_file) + b_file = tdfs.update_manifest("unbound_policy", ct_file, breakBinding) + fname = os.path.basename(b_file).split(".")[0] + rt_file = f"{tmp_dir}test-{fname}.untdf" + try: + tdfs.decrypt(decrypt_sdk, b_file, rt_file, "ztdf") + assert False, "decrypt succeeded unexpectedly" + except subprocess.CalledProcessError as exc: + assert b"wrap" in exc.output