From 5011e779606b610aedf9241d1490132a6f7300f4 Mon Sep 17 00:00:00 2001 From: iabdalkader Date: Thu, 10 Oct 2024 15:39:55 +0200 Subject: [PATCH] ussl: Add support for sign callback on MicroPython. Signed-off-by: iabdalkader --- src/arduino_iot_cloud/ussl.py | 78 ++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 15 deletions(-) diff --git a/src/arduino_iot_cloud/ussl.py b/src/arduino_iot_cloud/ussl.py index 058532a..2628eb9 100644 --- a/src/arduino_iot_cloud/ussl.py +++ b/src/arduino_iot_cloud/ussl.py @@ -9,22 +9,56 @@ import ssl import sys import logging -import binascii +try: + from micropython import const +except (ImportError, AttributeError): + def const(x): + return x pkcs11 = None +se_dev = None # Default engine and provider. _ENGINE_PATH = "/usr/lib/engines-3/libpkcs11.so" _MODULE_PATH = "/usr/lib/softhsm/libsofthsm2.so" -# Reference EC key for NXP's PlugNTrust -_EC_REF_KEY = binascii.unhexlify( - b"3041020100301306072a8648ce3d020106082a8648ce3d03010704273025" - b"0201010420100000000000000000000000000000000000ffffffffa5a6b5" - b"b6a5a6b5b61000" +# Reference EC key for the SE. +_EC_REF_KEY = const( + b"\x30\x41\x02\x01\x00\x30\x13\x06\x07\x2A\x86\x48\xCE\x3D\x02\x01" + b"\x06\x08\x2A\x86\x48\xCE\x3D\x03\x01\x07\x04\x27\x30\x25\x02\x01" + b"\x01\x04\x20\xA5\xA6\xB5\xB6\xA5\xA6\xB5\xB6\x00\x00\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF" + b"\xFF\xFF\xFF" ) +def log_level_enabled(level): + return logging.getLogger().isEnabledFor(level) + + +def ecdsa_sign_callback(key, data): + if log_level_enabled(logging.DEBUG): + key_hex = "".join("%02X" % b for b in key) + logging.debug(f"ecdsa_sign_callback key:{key_hex}") + + if key[0:8] != b"\xA5\xA6\xB5\xB6\xA5\xA6\xB5\xB6": + if log_level_enabled(logging.DEBUG): + logging.debug("ecdsa_sign_callback falling back to default sign") + return None + + obj_id = int.from_bytes(key[-4:], "big") + if log_level_enabled(logging.DEBUG): + logging.debug(f"ecdsa_sign_callback oid: 0x{obj_id:02X}") + + # Sign data on SE using reference key object id. + sig = se_dev.sign(obj_id, data) + if log_level_enabled(logging.DEBUG): + sig_hex = "".join("%02X" % b for b in sig) + logging.debug(f"ecdsa_sign_callback sig: {sig_hex}") + logging.info("Signed using secure element") + return sig + + def wrap_socket(sock, ssl_params={}): keyfile = ssl_params.get("keyfile", None) certfile = ssl_params.get("certfile", None) @@ -33,16 +67,27 @@ def wrap_socket(sock, ssl_params={}): ciphers = ssl_params.get("ciphers", None) verify = ssl_params.get("verify_mode", ssl.CERT_NONE) hostname = ssl_params.get("server_hostname", None) - micropython = sys.implementation.name == "micropython" - if keyfile is not None and "token" in keyfile and micropython: - # Create a reference EC key for NXP EdgeLock device. - objid = int(keyfile.split("=")[1], 16).to_bytes(4, "big") - keyfile = _EC_REF_KEY[0:53] + objid + _EC_REF_KEY[57:] - # Load the certificate from the secure element (when supported). - # import cryptoki - # with cryptoki.open() as token: - # cert = token.read(0x65, 412) + se_key_token = keyfile is not None and "token" in keyfile + se_crt_token = certfile is not None and "token" in certfile + sys_micropython = sys.implementation.name == "micropython" + + if sys_micropython and (se_key_token or se_crt_token): + import se05x + + # Create and initialize SE05x device. + global se_dev + if se_dev is None: + se_dev = se05x.SE05X() + + if se_key_token: + # Create a reference key for the secure element. + obj_id = int(keyfile.split("=")[1], 16) + keyfile = _EC_REF_KEY[0:-4] + obj_id.to_bytes(4, "big") + + if se_crt_token: + # Load the certificate from the secure element. + certfile = se_dev.read(0x65, 412) if keyfile is None or "token" not in keyfile: # Use MicroPython/CPython SSL to wrap socket. @@ -58,6 +103,9 @@ def wrap_socket(sock, ssl_params={}): ctx.set_ciphers(ciphers) if cafile is not None or cadata is not None: ctx.load_verify_locations(cafile=cafile, cadata=cadata) + if sys_micropython and se_key_token: + # Set alternate ECDSA sign function. + ctx._context.ecdsa_sign_callback = ecdsa_sign_callback return ctx.wrap_socket(sock, server_hostname=hostname) else: # Use M2Crypto to load key and cert from HSM.