From 2c5f3472a9ee5208b1e46aed8ab23f7b6364da91 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Sat, 30 Dec 2023 15:51:17 +0800 Subject: [PATCH] CSIP: Encrypted SIRK implementation --- .vscode/settings.json | 1 + bumble/profiles/csip.py | 69 ++++++++++++++++++++++++++++++++++++----- tests/csip_test.py | 21 +++++++++---- 3 files changed, 77 insertions(+), 14 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index b564a38b..bd953cab 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -22,6 +22,7 @@ "cmac", "CONNECTIONLESS", "csip", + "csis", "csrcs", "CVSD", "datagram", diff --git a/bumble/profiles/csip.py b/bumble/profiles/csip.py index cb17f48f..0878a603 100644 --- a/bumble/profiles/csip.py +++ b/bumble/profiles/csip.py @@ -19,8 +19,9 @@ from __future__ import annotations import enum import struct -from typing import Optional +from typing import Optional, Tuple +from bumble import att from bumble import core from bumble import crypto from bumble import device @@ -31,6 +32,9 @@ # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- +SET_IDENTITY_RESOLVING_KEY_LENGTH = 16 + + class SirkType(enum.IntEnum): '''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.''' @@ -66,6 +70,10 @@ def k1(n: bytes, salt: bytes, p: bytes) -> bytes: def sef(k: bytes, r: bytes) -> bytes: ''' Coordinated Set Identification Service - 4.5 SIRK encryption function sef. + + SIRK decryption function sdf shares the same algorithm. The only difference is that argument r is: + * Plaintext in encryption + * Cipher in decryption ''' return crypto.xor(k1(k, s1(b'SIRKenc'[::-1]), b'csis'[::-1]), r) @@ -105,6 +113,11 @@ def __init__( set_member_lock: Optional[MemberLock] = None, set_member_rank: Optional[int] = None, ) -> None: + if len(set_identity_resolving_key) != SET_IDENTITY_RESOLVING_KEY_LENGTH: + raise ValueError( + f'Invalid SIRK length {len(set_identity_resolving_key)}, expected {SET_IDENTITY_RESOLVING_KEY_LENGTH}' + ) + characteristics = [] self.set_identity_resolving_key = set_identity_resolving_key @@ -113,7 +126,7 @@ def __init__( uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC, properties=gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY, - permissions=gatt.Characteristic.Permissions.READABLE, + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, value=gatt.CharacteristicValue(read=self.on_sirk_read), ) characteristics.append(self.set_identity_resolving_key_characteristic) @@ -123,7 +136,7 @@ def __init__( uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC, properties=gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY, - permissions=gatt.Characteristic.Permissions.READABLE, + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, value=struct.pack('B', coordinated_set_size), ) characteristics.append(self.coordinated_set_size_characteristic) @@ -134,7 +147,7 @@ def __init__( properties=gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY | gatt.Characteristic.Properties.WRITE, - permissions=gatt.Characteristic.Permissions.READABLE + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION | gatt.Characteristic.Permissions.WRITEABLE, value=struct.pack('B', set_member_lock), ) @@ -145,18 +158,32 @@ def __init__( uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC, properties=gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY, - permissions=gatt.Characteristic.Permissions.READABLE, + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, value=struct.pack('B', set_member_rank), ) characteristics.append(self.set_member_rank_characteristic) super().__init__(characteristics) - def on_sirk_read(self, _connection: Optional[device.Connection]) -> bytes: + async def on_sirk_read(self, connection: Optional[device.Connection]) -> bytes: if self.set_identity_resolving_key_type == SirkType.PLAINTEXT: - return bytes([SirkType.PLAINTEXT]) + self.set_identity_resolving_key + sirk_bytes = self.set_identity_resolving_key else: - raise NotImplementedError('TODO: Pending async Characteristic read.') + assert connection + + if connection.transport == core.BT_LE_TRANSPORT: + key = await connection.device.get_long_term_key( + connection_handle=connection.handle, rand=b'', ediv=0 + ) + else: + key = await connection.device.get_link_key(connection.peer_address) + + if not key: + raise RuntimeError('LTK or LinkKey is not present') + + sirk_bytes = sef(key, self.set_identity_resolving_key) + + return bytes([self.set_identity_resolving_key_type]) + sirk_bytes def get_advertising_data(self) -> bytes: return bytes( @@ -203,3 +230,29 @@ def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC ): self.set_member_rank = characteristics[0] + + async def read_set_identity_resolving_key(self) -> Tuple[SirkType, bytes]: + '''Reads SIRK and decrypts if encrypted.''' + response = await self.set_identity_resolving_key.read_value() + if len(response) != SET_IDENTITY_RESOLVING_KEY_LENGTH + 1: + raise RuntimeError('Invalid SIRK value') + + sirk_type = SirkType(response[0]) + if sirk_type == SirkType.PLAINTEXT: + sirk = response[1:] + else: + connection = self.service_proxy.client.connection + device = connection.device + if connection.transport == core.BT_LE_TRANSPORT: + key = await device.get_long_term_key( + connection_handle=connection.handle, rand=b'', ediv=0 + ) + else: + key = await device.get_link_key(connection.peer_address) + + if not key: + raise RuntimeError('LTK or LinkKey is not present') + + sirk = sef(key, response[1:]) + + return (sirk_type, sirk) diff --git a/tests/csip_test.py b/tests/csip_test.py index 5899d81f..b34c4268 100644 --- a/tests/csip_test.py +++ b/tests/csip_test.py @@ -20,6 +20,7 @@ import pytest import struct import logging +from unittest import mock from bumble import device from bumble.profiles import csip @@ -68,14 +69,18 @@ def test_sef(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_csis(): +@pytest.mark.parametrize( + 'sirk_type,', [(csip.SirkType.ENCRYPTED), (csip.SirkType.PLAINTEXT)] +) +async def test_csis(sirk_type): SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa') + LTK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa') devices = TwoDevices() devices[0].add_service( csip.CoordinatedSetIdentificationService( set_identity_resolving_key=SIRK, - set_identity_resolving_key_type=csip.SirkType.PLAINTEXT, + set_identity_resolving_key_type=sirk_type, coordinated_set_size=2, set_member_lock=csip.MemberLock.UNLOCKED, set_member_rank=0, @@ -83,15 +88,19 @@ async def test_csis(): ) await devices.setup_connection() + + # Mock encryption. + devices.connections[0].encryption = 1 + devices.connections[1].encryption = 1 + devices[0].get_long_term_key = mock.AsyncMock(return_value=LTK) + devices[1].get_long_term_key = mock.AsyncMock(return_value=LTK) + peer = device.Peer(devices.connections[1]) csis_client = await peer.discover_service_and_create_proxy( csip.CoordinatedSetIdentificationProxy ) - assert ( - await csis_client.set_identity_resolving_key.read_value() - == bytes([csip.SirkType.PLAINTEXT]) + SIRK - ) + assert await csis_client.read_set_identity_resolving_key() == (sirk_type, SIRK) assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2) assert await csis_client.set_member_lock.read_value() == struct.pack( 'B', csip.MemberLock.UNLOCKED