diff --git a/apps/pair.py b/apps/pair.py index 39ee4fe0..93459124 100644 --- a/apps/pair.py +++ b/apps/pair.py @@ -24,10 +24,16 @@ from bumble.colors import color from bumble.device import Device, Peer from bumble.transport import open_transport_or_link -from bumble.pairing import PairingDelegate, PairingConfig +from bumble.pairing import OobData, PairingDelegate, PairingConfig +from bumble.smp import OobContext, OobLegacyContext from bumble.smp import error_name as smp_error_name from bumble.keys import JsonKeyStore -from bumble.core import ProtocolError +from bumble.core import ( + AdvertisingData, + ProtocolError, + BT_LE_TRANSPORT, + BT_BR_EDR_TRANSPORT, +) from bumble.gatt import ( GATT_DEVICE_NAME_CHARACTERISTIC, GATT_GENERIC_ACCESS_SERVICE, @@ -60,7 +66,7 @@ async def wait_until_terminated(self): class Delegate(PairingDelegate): def __init__(self, mode, connection, capability_string, do_prompt): super().__init__( - { + io_capability={ 'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY, 'display': PairingDelegate.DISPLAY_OUTPUT_ONLY, 'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, @@ -286,6 +292,7 @@ async def pair( bond, ctkd, io, + oob, prompt, request, print_keys, @@ -343,16 +350,51 @@ async def pair( await device.keystore.print(prefix=color('@@@ ', 'blue')) print(color('@@@-----------------------------------', 'blue')) + # Create an OOB context if needed + if oob: + our_oob_context = OobContext() + shared_data = ( + None + if oob == '-' + else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob))) + ) + legacy_context = OobLegacyContext() + oob_contexts = PairingConfig.OobConfig( + our_context=our_oob_context, + peer_data=shared_data, + legacy_context=legacy_context, + ) + oob_data = OobData( + address=device.random_address, + shared_data=shared_data, + legacy_context=legacy_context, + ) + print(color('@@@-----------------------------------', 'yellow')) + print(color('@@@ OOB Data:', 'yellow')) + print(color(f'@@@ {our_oob_context.share()}', 'yellow')) + print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow')) + print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow')) + print(color('@@@-----------------------------------', 'yellow')) + else: + oob_contexts = None + # Set up a pairing config factory device.pairing_config_factory = lambda connection: PairingConfig( - sc, mitm, bond, Delegate(mode, connection, io, prompt) + sc=sc, + mitm=mitm, + bonding=bond, + oob=oob_contexts, + delegate=Delegate(mode, connection, io, prompt), ) # Connect to a peer or wait for a connection device.on('connection', lambda connection: on_connection(connection, request)) if address_or_name is not None: print(color(f'=== Connecting to {address_or_name}...', 'green')) - connection = await device.connect(address_or_name) + connection = await device.connect( + address_or_name, + transport=BT_LE_TRANSPORT if mode == 'le' else BT_BR_EDR_TRANSPORT, + ) if not request: try: @@ -421,6 +463,14 @@ def emit(self, record): default='display+keyboard', show_default=True, ) +@click.option( + '--oob', + metavar='', + help=( + 'Use OOB pairing with this data from the peer ' + '(use "-" to enable OOB without peer data)' + ), +) @click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request') @click.option( '--request', is_flag=True, help='Request that the connecting peer initiate pairing' @@ -441,6 +491,7 @@ def main( bond, ctkd, io, + oob, prompt, request, print_keys, @@ -464,6 +515,7 @@ def main( bond, ctkd, io, + oob, prompt, request, print_keys, diff --git a/bumble/core.py b/bumble/core.py index 4a67d6ec..2722b870 100644 --- a/bumble/core.py +++ b/bumble/core.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- from __future__ import annotations +import enum import struct from typing import List, Optional, Tuple, Union, cast, Dict @@ -1051,3 +1052,13 @@ def __init__(self, tx_phy, rx_phy): def __str__(self): return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})' + + +# ----------------------------------------------------------------------------- +# LE Role +# ----------------------------------------------------------------------------- +class LeRole(enum.IntEnum): + PERIPHERAL_ONLY = 0x00 + CENTRAL_ONLY = 0x01 + BOTH_PERIPHERAL_PREFERRED = 0x02 + BOTH_CENTRAL_PREFERRED = 0x03 diff --git a/bumble/pairing.py b/bumble/pairing.py index 877b7394..5614e84b 100644 --- a/bumble/pairing.py +++ b/bumble/pairing.py @@ -15,7 +15,9 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations import enum +from dataclasses import dataclass from typing import Optional, Tuple from .hci import ( @@ -35,7 +37,60 @@ SMP_ID_KEY_DISTRIBUTION_FLAG, SMP_SIGN_KEY_DISTRIBUTION_FLAG, SMP_LINK_KEY_DISTRIBUTION_FLAG, + OobContext, + OobLegacyContext, + OobSharedData, ) +from .core import AdvertisingData, LeRole + + +# ----------------------------------------------------------------------------- +@dataclass +class OobData: + """OOB data that can be sent from one device to another.""" + + address: Optional[Address] = None + role: Optional[LeRole] = None + shared_data: Optional[OobSharedData] = None + legacy_context: Optional[OobLegacyContext] = None + + @classmethod + def from_ad(cls, ad: AdvertisingData) -> OobData: + instance = cls() + shared_data_c: Optional[bytes] = None + shared_data_r: Optional[bytes] = None + for ad_type, ad_data in ad.ad_structures: + if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS: + instance.address = Address(ad_data) + elif ad_type == AdvertisingData.LE_ROLE: + instance.role = LeRole(ad_data[0]) + elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: + shared_data_c = ad_data + elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE: + shared_data_r = ad_data + elif ad_type == AdvertisingData.SECURITY_MANAGER_TK_VALUE: + instance.legacy_context = OobLegacyContext(tk=ad_data) + if shared_data_c and shared_data_r: + instance.shared_data = OobSharedData(c=shared_data_c, r=shared_data_r) + + return instance + + def to_ad(self) -> AdvertisingData: + ad_structures = [] + if self.address is not None: + ad_structures.append( + (AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address)) + ) + if self.role is not None: + ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role]))) + if self.shared_data is not None: + ad_structures.extend(self.shared_data.to_ad().ad_structures) + if self.legacy_context is not None: + ad_structures.append( + (AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk) + ) + + return AdvertisingData(ad_structures) # ----------------------------------------------------------------------------- @@ -173,6 +228,14 @@ class AddressType(enum.IntEnum): PUBLIC = Address.PUBLIC_DEVICE_ADDRESS RANDOM = Address.RANDOM_DEVICE_ADDRESS + @dataclass + class OobConfig: + """Config for OOB pairing.""" + + our_context: Optional[OobContext] + peer_data: Optional[OobSharedData] + legacy_context: Optional[OobLegacyContext] + def __init__( self, sc: bool = True, @@ -180,17 +243,20 @@ def __init__( bonding: bool = True, delegate: Optional[PairingDelegate] = None, identity_address_type: Optional[AddressType] = None, + oob: Optional[OobConfig] = None, ) -> None: self.sc = sc self.mitm = mitm self.bonding = bonding self.delegate = delegate or PairingDelegate() self.identity_address_type = identity_address_type + self.oob = oob def __str__(self) -> str: return ( f'PairingConfig(sc={self.sc}, ' f'mitm={self.mitm}, bonding={self.bonding}, ' f'identity_address_type={self.identity_address_type}, ' - f'delegate[{self.delegate.io_capability}])' + f'delegate[{self.delegate.io_capability}]), ' + f'oob[{self.oob}])' ) diff --git a/bumble/smp.py b/bumble/smp.py index f8bba400..1461969c 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -27,6 +27,7 @@ import asyncio import enum import secrets +from dataclasses import dataclass from typing import ( TYPE_CHECKING, Any, @@ -53,6 +54,7 @@ BT_BR_EDR_TRANSPORT, BT_CENTRAL_ROLE, BT_LE_TRANSPORT, + AdvertisingData, ProtocolError, name_or_number, ) @@ -563,6 +565,54 @@ class PairingMethod(enum.IntEnum): CTKD_OVER_CLASSIC = 4 +# ----------------------------------------------------------------------------- +class OobContext: + """Cryptographic context for LE SC OOB pairing.""" + + ecc_key: crypto.EccKey + r: bytes + + def __init__( + self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None + ) -> None: + self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key + self.r = crypto.r() if r is None else r + + def share(self) -> OobSharedData: + pkx = bytes(reversed(self.ecc_key.x)) + return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r) + + +# ----------------------------------------------------------------------------- +class OobLegacyContext: + """Cryptographic context for LE Legacy OOB pairing.""" + + tk: bytes + + def __init__(self, tk: Optional[bytes] = None) -> None: + self.tk = crypto.r() if tk is None else tk + + +# ----------------------------------------------------------------------------- +@dataclass +class OobSharedData: + """Shareable data for LE SC OOB pairing.""" + + c: bytes + r: bytes + + def to_ad(self) -> AdvertisingData: + return AdvertisingData( + [ + (AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c), + (AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r), + ] + ) + + def __str__(self) -> str: + return f'OOB(C={self.c.hex()}, R={self.r.hex()})' + + # ----------------------------------------------------------------------------- class Session: # I/O Capability to pairing method decision matrix @@ -640,8 +690,6 @@ def __init__( self.pres: Optional[bytes] = None self.ea = None self.eb = None - self.tk = bytes(16) - self.r = bytes(16) self.stk = None self.ltk = None self.ltk_ediv = 0 @@ -659,7 +707,7 @@ def __init__( self.peer_bd_addr: Optional[Address] = None self.peer_signature_key = None self.peer_expected_distributions: List[Type[SMP_Command]] = [] - self.dh_key = None + self.dh_key = b'' self.confirm_value = None self.passkey: Optional[int] = None self.passkey_ready = asyncio.Event() @@ -712,8 +760,8 @@ def __init__( self.io_capability = pairing_config.delegate.io_capability self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY - # OOB (not supported yet) - self.oob = False + # OOB + self.oob_data_flag = 0 if pairing_config.oob is None else 1 # Set up addresses self_address = connection.self_address @@ -729,9 +777,37 @@ def __init__( self.ia = bytes(peer_address) self.iat = 1 if peer_address.is_random else 0 + # Select the ECC key, TK and r initial value + if pairing_config.oob: + self.peer_oob_data = pairing_config.oob.peer_data + if pairing_config.sc: + if pairing_config.oob.our_context is None: + raise ValueError( + "oob pairing config requires a context when sc is True" + ) + self.r = pairing_config.oob.our_context.r + self.ecc_key = pairing_config.oob.our_context.ecc_key + if pairing_config.oob.legacy_context is None: + self.tk = None + else: + self.tk = pairing_config.oob.legacy_context.tk + else: + if pairing_config.oob.legacy_context is None: + raise ValueError( + "oob pairing config requires a legacy context when sc is False" + ) + self.r = bytes(16) + self.ecc_key = manager.ecc_key + self.tk = pairing_config.oob.legacy_context.tk + else: + self.peer_oob_data = None + self.r = bytes(16) + self.ecc_key = manager.ecc_key + self.tk = bytes(16) + @property def pkx(self) -> Tuple[bytes, bytes]: - return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x) + return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x) @property def pka(self) -> bytes: @@ -768,7 +844,10 @@ def get_long_term_key(self, rand: bytes, ediv: int) -> Optional[bytes]: return None def decide_pairing_method( - self, auth_req: int, initiator_io_capability: int, responder_io_capability: int + self, + auth_req: int, + initiator_io_capability: int, + responder_io_capability: int, ) -> None: if self.connection.transport == BT_BR_EDR_TRANSPORT: self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC @@ -909,7 +988,7 @@ def send_pairing_request_command(self) -> None: command = SMP_Pairing_Request_Command( io_capability=self.io_capability, - oob_data_flag=0, + oob_data_flag=self.oob_data_flag, auth_req=self.auth_req, maximum_encryption_key_size=16, initiator_key_distribution=self.initiator_key_distribution, @@ -921,7 +1000,7 @@ def send_pairing_request_command(self) -> None: def send_pairing_response_command(self) -> None: response = SMP_Pairing_Response_Command( io_capability=self.io_capability, - oob_data_flag=0, + oob_data_flag=self.oob_data_flag, auth_req=self.auth_req, maximum_encryption_key_size=16, initiator_key_distribution=self.initiator_key_distribution, @@ -982,8 +1061,8 @@ def send_pairing_random_command(self) -> None: def send_public_key_command(self) -> None: self.send_command( SMP_Pairing_Public_Key_Command( - public_key_x=bytes(reversed(self.manager.ecc_key.x)), - public_key_y=bytes(reversed(self.manager.ecc_key.y)), + public_key_x=bytes(reversed(self.ecc_key.x)), + public_key_y=bytes(reversed(self.ecc_key.y)), ) ) @@ -1030,7 +1109,6 @@ async def derive_ltk(self) -> None: self.ltk = crypto.h6(ilk, b'brle') def distribute_keys(self) -> None: - # Distribute the keys as required if self.is_initiator: # CTKD: Derive LTK from LinkKey @@ -1296,7 +1374,7 @@ def on_smp_command(self, command: SMP_Command) -> None: try: handler(command) except Exception as error: - logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') + logger.exception(f'{color("!!! Exception in handler:", "red")} {error}') response = SMP_Pairing_Failed_Command( reason=SMP_UNSPECIFIED_REASON_ERROR ) @@ -1333,15 +1411,28 @@ async def on_smp_pairing_request_command_async( self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) - # Check for OOB - if command.oob_data_flag != 0: - self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) - return + # Infer the pairing method + if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( + not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0) + ): + # Use OOB + self.pairing_method = PairingMethod.OOB + if not self.sc and self.tk is None: + # For legacy OOB, TK is required. + logger.warning("legacy OOB without TK") + self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) + return + if command.oob_data_flag == 0: + # The peer doesn't have OOB data, use r=0 + self.r = bytes(16) + else: + # Decide which pairing method to use from the IO capability + self.decide_pairing_method( + command.auth_req, + command.io_capability, + self.io_capability, + ) - # Decide which pairing method to use - self.decide_pairing_method( - command.auth_req, command.io_capability, self.io_capability - ) logger.debug(f'pairing method: {self.pairing_method.name}') # Key distribution @@ -1390,15 +1481,26 @@ def on_smp_pairing_response_command( self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) - # Check for OOB - if self.sc and command.oob_data_flag: - self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) - return + # Infer the pairing method + if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( + not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0) + ): + # Use OOB + self.pairing_method = PairingMethod.OOB + if not self.sc and self.tk is None: + # For legacy OOB, TK is required. + logger.warning("legacy OOB without TK") + self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) + return + if command.oob_data_flag == 0: + # The peer doesn't have OOB data, use r=0 + self.r = bytes(16) + else: + # Decide which pairing method to use from the IO capability + self.decide_pairing_method( + command.auth_req, self.io_capability, command.io_capability + ) - # Decide which pairing method to use - self.decide_pairing_method( - command.auth_req, self.io_capability, command.io_capability - ) logger.debug(f'pairing method: {self.pairing_method.name}') # Key distribution @@ -1549,12 +1651,13 @@ def on_smp_pairing_random_command_secure_connections( if self.passkey_step < 20: self.send_pairing_confirm_command() return - else: + elif self.pairing_method != PairingMethod.OOB: return else: if self.pairing_method in ( PairingMethod.JUST_WORKS, PairingMethod.NUMERIC_COMPARISON, + PairingMethod.OOB, ): self.send_pairing_random_command() elif self.pairing_method == PairingMethod.PASSKEY: @@ -1591,6 +1694,7 @@ def on_smp_pairing_random_command_secure_connections( if self.pairing_method in ( PairingMethod.JUST_WORKS, PairingMethod.NUMERIC_COMPARISON, + PairingMethod.OOB, ): ra = bytes(16) rb = ra @@ -1599,7 +1703,6 @@ def on_smp_pairing_random_command_secure_connections( ra = self.passkey.to_bytes(16, byteorder='little') rb = ra else: - # OOB not implemented yet return assert self.preq and self.pres @@ -1653,7 +1756,7 @@ def on_smp_pairing_public_key_command( # Compute the DH key self.dh_key = bytes( reversed( - self.manager.ecc_key.dh( + self.ecc_key.dh( bytes(reversed(command.public_key_x)), bytes(reversed(command.public_key_y)), ) @@ -1661,8 +1764,27 @@ def on_smp_pairing_public_key_command( ) logger.debug(f'DH key: {self.dh_key.hex()}') + if self.pairing_method == PairingMethod.OOB: + # Check against shared OOB data + if self.peer_oob_data: + confirm_verifier = crypto.f4( + self.peer_public_key_x, + self.peer_public_key_x, + self.peer_oob_data.r, + bytes(1), + ) + if not self.check_expected_value( + self.peer_oob_data.c, + confirm_verifier, + SMP_CONFIRM_VALUE_FAILED_ERROR, + ): + return + if self.is_initiator: - self.send_pairing_confirm_command() + if self.pairing_method == PairingMethod.OOB: + self.send_pairing_random_command() + else: + self.send_pairing_confirm_command() else: if self.pairing_method == PairingMethod.PASSKEY: self.display_or_input_passkey() @@ -1673,6 +1795,7 @@ def on_smp_pairing_public_key_command( if self.pairing_method in ( PairingMethod.JUST_WORKS, PairingMethod.NUMERIC_COMPARISON, + PairingMethod.OOB, ): # We can now send the confirmation value self.send_pairing_confirm_command() diff --git a/tests/self_test.py b/tests/self_test.py index 98ce5e80..728fbc7e 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -34,6 +34,8 @@ from bumble.smp import ( SMP_PAIRING_NOT_SUPPORTED_ERROR, SMP_CONFIRM_VALUE_FAILED_ERROR, + OobContext, + OobLegacyContext, ) from bumble.core import ProtocolError from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE @@ -575,6 +577,77 @@ async def test_self_smp_public_address(): await _test_self_smp_with_configs(pairing_config, pairing_config) +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_self_smp_oob_sc(): + oob_context_1 = OobContext() + oob_context_2 = OobContext() + + pairing_config_1 = PairingConfig( + mitm=True, + sc=True, + bonding=True, + oob=PairingConfig.OobConfig(oob_context_1, oob_context_2.share(), None), + ) + + pairing_config_2 = PairingConfig( + mitm=True, + sc=True, + bonding=True, + oob=PairingConfig.OobConfig(oob_context_2, oob_context_1.share(), None), + ) + + await _test_self_smp_with_configs(pairing_config_1, pairing_config_2) + + pairing_config_3 = PairingConfig( + mitm=True, + sc=True, + bonding=True, + oob=PairingConfig.OobConfig(oob_context_2, None, None), + ) + + await _test_self_smp_with_configs(pairing_config_1, pairing_config_3) + await _test_self_smp_with_configs(pairing_config_3, pairing_config_1) + + pairing_config_4 = PairingConfig( + mitm=True, + sc=True, + bonding=True, + oob=PairingConfig.OobConfig(oob_context_2, oob_context_2.share(), None), + ) + + with pytest.raises(ProtocolError) as error: + await _test_self_smp_with_configs(pairing_config_1, pairing_config_4) + assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR + + with pytest.raises(ProtocolError): + await _test_self_smp_with_configs(pairing_config_4, pairing_config_1) + assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_self_smp_oob_legacy(): + legacy_context = OobLegacyContext() + + pairing_config_1 = PairingConfig( + mitm=True, + sc=False, + bonding=True, + oob=PairingConfig.OobConfig(None, None, legacy_context), + ) + + pairing_config_2 = PairingConfig( + mitm=True, + sc=True, + bonding=True, + oob=PairingConfig.OobConfig(OobContext(), None, legacy_context), + ) + + await _test_self_smp_with_configs(pairing_config_1, pairing_config_2) + await _test_self_smp_with_configs(pairing_config_2, pairing_config_1) + + # ----------------------------------------------------------------------------- async def run_test_self(): await test_self_connection() @@ -585,6 +658,8 @@ async def run_test_self(): await test_self_smp_wrong_pin() await test_self_smp_over_classic() await test_self_smp_public_address() + await test_self_smp_oob_sc() + await test_self_smp_oob_legacy() # ----------------------------------------------------------------------------- diff --git a/tests/smp_test.py b/tests/smp_test.py index bdfa0217..4bd75228 100644 --- a/tests/smp_test.py +++ b/tests/smp_test.py @@ -17,11 +17,16 @@ # ----------------------------------------------------------------------------- from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1 +from bumble.pairing import OobData, OobSharedData, LeRole +from bumble.hci import Address +from bumble.core import AdvertisingData + # ----------------------------------------------------------------------------- # pylint: disable=invalid-name # ----------------------------------------------------------------------------- + # ----------------------------------------------------------------------------- def reversed_hex(hex_str): return bytes(reversed(bytes.fromhex(hex_str))) @@ -233,6 +238,23 @@ def test_ah(): assert value == expected +# ----------------------------------------------------------------------------- +def test_oob_data(): + oob_data = OobData( + address=Address("F0:F1:F2:F3:F4:F5"), + role=LeRole.BOTH_PERIPHERAL_PREFERRED, + shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])), + ) + oob_data_ad = oob_data.to_ad() + oob_data_bytes = bytes(oob_data_ad) + oob_data_ad_parsed = AdvertisingData.from_bytes(oob_data_bytes) + oob_data_parsed = OobData.from_ad(oob_data_ad_parsed) + assert oob_data_parsed.address == oob_data.address + assert oob_data_parsed.role == oob_data.role + assert oob_data_parsed.shared_data.c == oob_data.shared_data.c + assert oob_data_parsed.shared_data.r == oob_data.shared_data.r + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_ecc() @@ -246,3 +268,4 @@ def test_ah(): test_h6() test_h7() test_ah() + test_oob_data()