From 555b9d98ed9d11deff6ddf63b44e9efc7531cc21 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Mon, 18 Dec 2023 09:49:57 -0800 Subject: [PATCH] format (+3 squashed commits) Squashed commits: [60e610f] wip [eeab73d] wip [3cdd5b8] basic first pass --- apps/bench.py | 4 +- bumble/device.py | 1040 ++++++++++++++++--------- bumble/hci.py | 172 ++-- bumble/host.py | 204 ++++- bumble/l2cap.py | 4 +- bumble/utils.py | 6 +- examples/run_advertiser.py | 14 +- examples/run_cig_setup.py | 11 +- examples/run_extended_advertiser.py | 34 +- examples/run_extended_advertiser_2.py | 100 +++ examples/run_unicast_server.py | 10 +- tests/device_test.py | 83 +- tests/hci_test.py | 10 +- 13 files changed, 1115 insertions(+), 577 deletions(-) create mode 100644 examples/run_extended_advertiser_2.py diff --git a/apps/bench.py b/apps/bench.py index a98adc47..b307df26 100644 --- a/apps/bench.py +++ b/apps/bench.py @@ -82,8 +82,8 @@ DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE' DEFAULT_L2CAP_PSM = 1234 DEFAULT_L2CAP_MAX_CREDITS = 128 -DEFAULT_L2CAP_MTU = 1022 -DEFAULT_L2CAP_MPS = 1024 +DEFAULT_L2CAP_MTU = 1024 +DEFAULT_L2CAP_MPS = 1022 DEFAULT_LINGER_TIME = 1.0 diff --git a/bumble/device.py b/bumble/device.py index f0f4ee18..91d229e3 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -22,7 +22,7 @@ import asyncio import logging from contextlib import asynccontextmanager, AsyncExitStack, closing -from dataclasses import dataclass +from dataclasses import dataclass, field from collections.abc import Iterable from typing import ( Any, @@ -34,13 +34,14 @@ Tuple, Type, TypeVar, - Set, Union, cast, overload, TYPE_CHECKING, ) +from pyee import EventEmitter + from .colors import color from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU from .gatt import Characteristic, Descriptor, Service @@ -48,6 +49,7 @@ HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_CENTRAL_ROLE, + HCI_PERIPHERAL_ROLE, HCI_COMMAND_STATUS_PENDING, HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, @@ -61,7 +63,6 @@ HCI_LE_1M_PHY_BIT, HCI_LE_2M_PHY, HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE, - HCI_LE_CLEAR_RESOLVING_LIST_COMMAND, HCI_LE_CODED_PHY, HCI_LE_CODED_PHY_BIT, HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE, @@ -79,14 +80,12 @@ HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, HCI_SUCCESS, HCI_WRITE_LE_HOST_SUPPORT_COMMAND, - Address, HCI_Accept_Connection_Request_Command, HCI_Authentication_Requested_Command, HCI_Command_Status_Event, HCI_Constant, HCI_Create_Connection_Cancel_Command, HCI_Create_Connection_Command, - HCI_Create_Connection_Command, HCI_Disconnect_Command, HCI_Encryption_Change_Event, HCI_Error, @@ -124,6 +123,7 @@ HCI_LE_Set_Extended_Advertising_Enable_Command, HCI_LE_Set_Extended_Advertising_Parameters_Command, HCI_LE_Set_Host_Feature_Command, + HCI_LE_Set_Periodic_Advertising_Enable_Command, HCI_LE_Set_PHY_Command, HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Scan_Enable_Command, @@ -151,7 +151,9 @@ HCI_Write_Scan_Enable_Command, HCI_Write_Secure_Connections_Host_Support_Command, HCI_Write_Simple_Pairing_Mode_Command, + Address, OwnAddressType, + Phy, phy_list_to_bits, ) from .host import Host @@ -234,6 +236,9 @@ DEVICE_DEFAULT_L2CAP_COC_MTU = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU DEVICE_DEFAULT_L2CAP_COC_MPS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS +DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE +) # fmt: on # pylint: enable=line-too-long @@ -436,6 +441,10 @@ def is_directed(self): AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, ) + @property + def is_high_duty_cycle_directed_connectable(self): + return self == AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY + # ----------------------------------------------------------------------------- @dataclass @@ -443,30 +452,333 @@ class LegacyAdvertiser: device: Device advertising_type: AdvertisingType own_address_type: OwnAddressType + peer_address: Address auto_restart: bool - advertising_data: Optional[bytes] - scan_response_data: Optional[bytes] + + async def start(self) -> None: + # Set/update the advertising data if the advertising type allows it + if self.advertising_type.has_data: + await self.device.send_command( + HCI_LE_Set_Advertising_Data_Command( + advertising_data=self.device.advertising_data + ), + check_result=True, + ) + + # Set/update the scan response data if the advertising is scannable + if self.advertising_type.is_scannable: + await self.device.send_command( + HCI_LE_Set_Scan_Response_Data_Command( + scan_response_data=self.device.scan_response_data + ), + check_result=True, + ) + + # Set the advertising parameters + await self.device.send_command( + HCI_LE_Set_Advertising_Parameters_Command( + advertising_interval_min=self.device.advertising_interval_min, + advertising_interval_max=self.device.advertising_interval_max, + advertising_type=int(self.advertising_type), + own_address_type=self.own_address_type, + peer_address_type=self.peer_address.address_type, + peer_address=self.peer_address, + advertising_channel_map=7, + advertising_filter_policy=0, + ), + check_result=True, + ) + + # Enable advertising + await self.device.send_command( + HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), + check_result=True, + ) async def stop(self) -> None: - await self.device.stop_legacy_advertising() + # Disable advertising + await self.device.send_command( + HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), + check_result=True, + ) + + +# ----------------------------------------------------------------------------- +@dataclass +class AdvertisingEventProperties: + is_connectable: bool = True + is_scannable: bool = False + is_directed: bool = False + is_high_duty_cycle_directed_connectable: bool = False + is_legacy: bool = False + is_anonymous: bool = False + include_tx_power: bool = False + + def __int__(self) -> int: + properties = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0) + ) + if self.is_connectable: + properties |= properties.CONNECTABLE_ADVERTISING + if self.is_scannable: + properties |= properties.SCANNABLE_ADVERTISING + if self.is_directed: + properties |= properties.DIRECTED_ADVERTISING + if self.is_high_duty_cycle_directed_connectable: + properties |= properties.HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING + if self.is_legacy: + properties |= properties.USE_LEGACY_ADVERTISING_PDUS + if self.is_anonymous: + properties |= properties.ANONYMOUS_ADVERTISING + if self.include_tx_power: + properties |= properties.INCLUDE_TX_POWER + + return int(properties) + + @staticmethod + def from_advertising_type( + advertising_type: AdvertisingType, + ) -> AdvertisingEventProperties: + return AdvertisingEventProperties( + is_connectable=advertising_type.is_connectable, + is_scannable=advertising_type.is_scannable, + is_directed=advertising_type.is_directed, + is_high_duty_cycle_directed_connectable=advertising_type.is_high_duty_cycle_directed_connectable, + is_legacy=True, + is_anonymous=False, + include_tx_power=False, + ) + + +# ----------------------------------------------------------------------------- +# TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10 +AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap # ----------------------------------------------------------------------------- @dataclass -class ExtendedAdvertiser(CompositeEventEmitter): +class AdvertisingParameters: + # pylint: disable=line-too-long + advertising_event_properties: AdvertisingEventProperties = field( + default_factory=AdvertisingEventProperties + ) + primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL + primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL + primary_advertising_channel_map: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap = ( + AdvertisingChannelMap.CHANNEL_37 + | AdvertisingChannelMap.CHANNEL_38 + | AdvertisingChannelMap.CHANNEL_39 + ) + own_address_type: OwnAddressType = OwnAddressType.RANDOM + peer_address: Address = Address.ANY + advertising_filter_policy: int = 0 + advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER + primary_advertising_phy: Phy = Phy.LE_1M + secondary_advertising_max_skip: int = 0 + secondary_advertising_phy: Phy = Phy.LE_1M + advertising_sid: int = 0 + enable_scan_request_notifications: bool = False + primary_advertising_phy_options: int = 0 + secondary_advertising_phy_options: int = 0 + + +# ----------------------------------------------------------------------------- +@dataclass +class PeriodicAdvertisingParameters: + # TODO implement this class + pass + + +# ----------------------------------------------------------------------------- +@dataclass +class AdvertisingSet(EventEmitter): device: Device - handle: int - advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties - own_address_type: OwnAddressType + advertising_handle: int auto_restart: bool - advertising_data: Optional[bytes] - scan_response_data: Optional[bytes] + random_address: Optional[Address] + advertising_parameters: AdvertisingParameters + advertising_data: bytes + scan_response_data: bytes + periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] + periodic_advertising_data: bytes + selected_tx_power: int = 0 + enabled: bool = False def __post_init__(self) -> None: super().__init__() + async def set_advertising_parameters( + self, advertising_parameters: AdvertisingParameters + ) -> None: + # Sanity check + if ( + not advertising_parameters.advertising_event_properties.is_legacy + and advertising_parameters.advertising_event_properties.is_connectable + and advertising_parameters.advertising_event_properties.is_scannable + ): + raise ValueError( + "non-legacy extended advertising event properties may not be both " + "connectable and scannable" + ) + + response = await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Parameters_Command( + advertising_handle=self.advertising_handle, + advertising_event_properties=int( + advertising_parameters.advertising_event_properties + ), + primary_advertising_interval_min=( + int(advertising_parameters.primary_advertising_interval_min / 0.625) + ), + primary_advertising_interval_max=( + int(advertising_parameters.primary_advertising_interval_min / 0.625) + ), + primary_advertising_channel_map=int( + advertising_parameters.primary_advertising_channel_map + ), + own_address_type=advertising_parameters.own_address_type, + peer_address_type=advertising_parameters.peer_address.address_type, + peer_address=advertising_parameters.peer_address, + advertising_tx_power=advertising_parameters.advertising_tx_power, + advertising_filter_policy=( + advertising_parameters.advertising_filter_policy + ), + primary_advertising_phy=advertising_parameters.primary_advertising_phy, + secondary_advertising_max_skip=( + advertising_parameters.secondary_advertising_max_skip + ), + secondary_advertising_phy=( + advertising_parameters.secondary_advertising_phy + ), + advertising_sid=advertising_parameters.advertising_sid, + scan_request_notification_enable=( + 1 if advertising_parameters.enable_scan_request_notifications else 0 + ), + ), + check_result=True, + ) + self.selected_tx_power = response.return_parameters.selected_tx_power + self.advertising_parameters = advertising_parameters + + async def set_advertising_data(self, advertising_data: bytes) -> None: + # pylint: disable=line-too-long + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Data_Command( + advertising_handle=self.advertising_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, + advertising_data=advertising_data, + ), + check_result=True, + ) + self.advertising_data = advertising_data + + async def set_scan_response_data(self, scan_response_data: bytes) -> None: + if ( + scan_response_data + and not self.advertising_parameters.advertising_event_properties.is_scannable + ): + logger.warning( + "ignoring attempt to set non-empty scan response data on non-scannable " + "advertising set" + ) + return + + # pylint: disable=line-too-long + await self.device.send_command( + HCI_LE_Set_Extended_Scan_Response_Data_Command( + advertising_handle=self.advertising_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, + scan_response_data=scan_response_data, + ), + check_result=True, + ) + self.scan_response_data = scan_response_data + + async def set_periodic_advertising_parameters( + self, advertising_parameters: PeriodicAdvertisingParameters + ) -> None: + # TODO: send command + self.periodic_advertising_parameters = advertising_parameters + + async def set_periodic_advertising_data(self, advertising_data: bytes) -> None: + # TODO: send command + self.periodic_advertising_data = advertising_data + + async def set_random_address(self, random_address: Address) -> None: + await self.device.send_command( + HCI_LE_Set_Advertising_Set_Random_Address_Command( + advertising_handle=self.advertising_handle, + random_address=(random_address or self.device.random_address), + ), + check_result=True, + ) + + async def start( + self, duration: float = 0.0, max_advertising_events: int = 0 + ) -> None: + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=1, + advertising_handles=[self.advertising_handle], + durations=[round(duration * 100)], + max_extended_advertising_events=[max_advertising_events], + ), + check_result=True, + ) + self.enabled = True + + self.emit('start') + + async def start_periodic(self, include_adi: bool = False) -> None: + await self.device.send_command( + HCI_LE_Set_Periodic_Advertising_Enable_Command( + enable=1 | (2 if include_adi else 0), + advertising_handles=self.advertising_handle, + ), + check_result=True, + ) + + self.emit('start_periodic') + async def stop(self) -> None: - await self.device.stop_extended_advertising(self.handle) + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=0, + advertising_handles=[self.advertising_handle], + durations=[0], + max_extended_advertising_events=[0], + ), + check_result=True, + ) + self.enabled = False + + self.emit('stop') + + async def stop_periodic(self) -> None: + await self.device.send_command( + HCI_LE_Set_Periodic_Advertising_Enable_Command( + enable=0, + advertising_handles=self.advertising_handle, + ), + check_result=True, + ) + + self.emit('stop_periodic') + + async def remove(self) -> None: + await self.device.send_command( + HCI_LE_Remove_Advertising_Set_Command( + advertising_handle=self.advertising_handle + ), + check_result=True, + ) + del self.device.extended_advertising_sets[self.advertising_handle] + + def on_termination(self, status: int) -> None: + self.enabled = False + self.emit('termination', status) # ----------------------------------------------------------------------------- @@ -690,9 +1002,6 @@ class Connection(CompositeEventEmitter): gatt_client: gatt_client.Client pairing_peer_io_capability: Optional[int] pairing_peer_authentication_requirements: Optional[int] - advertiser_after_disconnection: Union[ - LegacyAdvertiser, ExtendedAdvertiser, None - ] = None @composite_listener class Listener: @@ -922,7 +1231,8 @@ def __str__(self): return ( f'Connection(handle=0x{self.handle:04X}, ' f'role={self.role_name}, ' - f'address={self.peer_address})' + f'self_address={self.self_address}, ' + f'peer_address={self.peer_address})' ) @@ -1032,7 +1342,7 @@ def with_connection_from_handle(function): @functools.wraps(function) def wrapper(self, connection_handle, *args, **kwargs): if (connection := self.lookup_connection(connection_handle)) is None: - raise ValueError(f"no connection for handle: 0x{connection_handle:04x}") + raise ValueError(f'no connection for handle: 0x{connection_handle:04x}') return function(self, connection, *args, **kwargs) return wrapper @@ -1099,7 +1409,6 @@ class Device(CompositeEventEmitter): advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] config: DeviceConfiguration legacy_advertiser: Optional[LegacyAdvertiser] - extended_advertisers: Dict[int, ExtendedAdvertiser] sco_links: Dict[int, ScoLink] cis_links: Dict[int, CisLink] _pending_cis: Dict[int, Tuple[int, int]] @@ -1201,8 +1510,6 @@ def __init__( self.classic_pending_accepts = { Address.ANY: [] } # Futures, by BD address OR [Futures] for Address.ANY - self.legacy_advertiser = None - self.extended_advertisers = {} # Own address type cache self.connect_own_address_type = None @@ -1215,10 +1522,6 @@ def __init__( self.name = config.name self.random_address = config.address self.class_of_device = config.class_of_device - self.scan_response_data = config.scan_response_data - self.advertising_data = config.advertising_data - self.advertising_interval_min = config.advertising_interval_min - self.advertising_interval_max = config.advertising_interval_max self.keystore = None self.irk = config.irk self.le_enabled = config.le_enabled @@ -1233,6 +1536,22 @@ def __init__( self.classic_accept_any = config.classic_accept_any self.address_resolution_offload = config.address_resolution_offload + # Extended advertising. + self.extended_advertising_sets: Dict[int, AdvertisingSet] = {} + + # Legacy advertising. + # The advertising and scan response data, as well as the advertising interval + # values are stored as properties of this object for convenience so that they + # can be initialized from a config object, and for backward compatibility for + # client code that may set those values directly before calling + # start_advertising(). + self.legacy_advertising_set: Optional[AdvertisingSet] = None + self.legacy_advertiser: Optional[LegacyAdvertiser] = None + self.advertising_data = config.advertising_data + self.scan_response_data = config.scan_response_data + self.advertising_interval_min = config.advertising_interval_min + self.advertising_interval_max = config.advertising_interval_max + for service in config.gatt_services: characteristics = [] for characteristic in service.get("characteristics", []): @@ -1241,7 +1560,8 @@ def __init__( # Leave this check until 5/25/2023 if descriptor.get("permission", False): raise Exception( - "Error parsing Device Config's GATT Services. The key 'permission' must be renamed to 'permissions'" + "Error parsing Device Config's GATT Services. " + "The key 'permission' must be renamed to 'permissions'" ) new_descriptor = Descriptor( attribute_type=descriptor["descriptor_type"], @@ -1610,290 +1930,224 @@ def supports_le_phy(self, phy): if phy not in feature_map: raise ValueError('invalid PHY') - return self.host.supports_le_feature(feature_map[phy]) + return self.supports_le_feature(feature_map[phy]) - @deprecated("Please use start_legacy_advertising.") - async def start_advertising( - self, - advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, - target: Optional[Address] = None, - own_address_type: int = OwnAddressType.RANDOM, - auto_restart: bool = False, - ) -> None: - await self.start_legacy_advertising( - advertising_type=advertising_type, - target=target, - own_address_type=OwnAddressType(own_address_type), - auto_restart=auto_restart, + @property + def supports_le_extended_advertising(self): + return self.supports_le_feature( + HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE ) - async def start_legacy_advertising( + async def start_advertising( self, advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, target: Optional[Address] = None, - own_address_type: OwnAddressType = OwnAddressType.RANDOM, + own_address_type: int = OwnAddressType.RANDOM, auto_restart: bool = False, advertising_data: Optional[bytes] = None, scan_response_data: Optional[bytes] = None, - ) -> LegacyAdvertiser: - """Starts an legacy advertisement. + advertising_interval_min: Optional[int] = None, + advertising_interval_max: Optional[int] = None, + ) -> None: + """Start legacy advertising. - Args: - advertising_type: Advertising type passed to HCI_LE_Set_Advertising_Parameters_Command. - target: Directed advertising target. Directed type should be set in advertising_type arg. - own_address_type: own address type to use in the advertising. - auto_restart: whether the advertisement will be restarted after disconnection. - scan_response_data: raw scan response. - advertising_data: raw advertising data. + If the controller supports it, extended advertising commands with legacy PDUs + will be used to advertise. If not, legacy advertising commands will be used. - Returns: - LegacyAdvertiser object containing the metadata of advertisement. + Args: + advertising_type: + Type of advertising events. + target: + Peer address for directed advertising target. + (Ignored if `advertising_type` is not directed) + own_address_type: + Own address type to use in the advertising. + auto_restart: + Whether the advertisement will be restarted after disconnection. + advertising_data: + Raw advertising data. If None, the value of the property + self.advertising_data will be used. + scan_response_data: + Raw scan response. If None, the value of the property + self.scan_response_data will be used. + advertising_interval_min: + Minimum advertising interval, in milliseconds. If None, the value of the + property self.advertising_interval_min will be used. + advertising_interval_max: + Maximum advertising interval, in milliseconds. If None, the value of the + property self.advertising_interval_max will be used. """ - if self.extended_advertisers: - logger.warning( - 'Trying to start Legacy and Extended Advertising at the same time!' - ) - - # If we're advertising, stop first - if self.legacy_advertiser: - await self.stop_advertising() - - # Set/update the advertising data if the advertising type allows it - if advertising_type.has_data: - await self.send_command( - HCI_LE_Set_Advertising_Data_Command( - advertising_data=advertising_data or self.advertising_data or b'' - ), - check_result=True, - ) - - # Set/update the scan response data if the advertising is scannable - if advertising_type.is_scannable: - await self.send_command( - HCI_LE_Set_Scan_Response_Data_Command( - scan_response_data=scan_response_data - or self.scan_response_data - or b'' - ), - check_result=True, - ) + # Update backing properties. + if advertising_data is not None: + self.advertising_data = advertising_data + if scan_response_data is not None: + self.scan_response_data = scan_response_data + if advertising_interval_min is not None: + self.advertising_interval_min = advertising_interval_min + if advertising_interval_max is not None: + self.advertising_interval_max = advertising_interval_max # Decide what peer address to use if advertising_type.is_directed: if target is None: - raise ValueError('directed advertising requires a target address') - + raise ValueError('directed advertising requires a target') peer_address = target - peer_address_type = target.address_type else: - peer_address = Address('00:00:00:00:00:00') - peer_address_type = Address.PUBLIC_DEVICE_ADDRESS + peer_address = Address.ANY + + # If we're already advertising, stop now because we'll be re-creating + # a new advertiser or advertising set. + await self.stop_advertising() + assert self.legacy_advertiser is None + assert self.legacy_advertising_set is None + + if self.supports_le_extended_advertising: + # Use extended advertising commands with legacy PDUs. + self.legacy_advertising_set = await self.create_advertising_set( + auto_restart=auto_restart, + random_address=self.random_address, + advertising_parameters=AdvertisingParameters( + advertising_event_properties=( + AdvertisingEventProperties.from_advertising_type( + advertising_type + ) + ), + primary_advertising_interval_min=self.advertising_interval_min, + primary_advertising_interval_max=self.advertising_interval_max, + own_address_type=OwnAddressType(own_address_type), + peer_address=peer_address, + ), + advertising_data=( + self.advertising_data if advertising_type.has_data else b'' + ), + scan_response_data=( + self.scan_response_data if advertising_type.is_scannable else b'' + ), + ) - # Set the advertising parameters - await self.send_command( - HCI_LE_Set_Advertising_Parameters_Command( - advertising_interval_min=self.advertising_interval_min, - advertising_interval_max=self.advertising_interval_max, - advertising_type=int(advertising_type), - own_address_type=own_address_type, - peer_address_type=peer_address_type, + await self.legacy_advertising_set.start() + else: + # Use legacy commands. + self.legacy_advertiser = LegacyAdvertiser( + device=self, + advertising_type=advertising_type, + own_address_type=OwnAddressType(own_address_type), peer_address=peer_address, - advertising_channel_map=7, - advertising_filter_policy=0, - ), - check_result=True, - ) - - # Enable advertising - await self.send_command( - HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), - check_result=True, - ) + auto_restart=auto_restart, + ) - self.legacy_advertiser = LegacyAdvertiser( - device=self, - advertising_type=advertising_type, - own_address_type=own_address_type, - auto_restart=auto_restart, - advertising_data=advertising_data, - scan_response_data=scan_response_data, - ) - return self.legacy_advertiser + await self.legacy_advertiser.start() - @deprecated("Please use stop_legacy_advertising.") async def stop_advertising(self) -> None: - await self.stop_legacy_advertising() - - async def stop_legacy_advertising(self) -> None: + """Stop legacy advertising.""" # Disable advertising - if self.legacy_advertiser: - await self.send_command( - HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), - check_result=True, - ) - + if self.legacy_advertising_set: + await self.legacy_advertising_set.stop() + self.legacy_advertising_set = None + elif self.legacy_advertiser: + await self.legacy_advertiser.stop() self.legacy_advertiser = None - @experimental('Extended Advertising is still experimental - Might be changed soon.') - async def start_extended_advertising( + async def create_advertising_set( self, - advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING, - target: Address = Address.ANY, - own_address_type: OwnAddressType = OwnAddressType.RANDOM, - auto_restart: bool = True, - advertising_data: Optional[bytes] = None, - scan_response_data: Optional[bytes] = None, - ) -> ExtendedAdvertiser: - """Starts an extended advertising set. - - Args: - advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command - target: Directed advertising target. Directed property should be set in advertising_properties arg. - own_address_type: own address type to use in the advertising. - auto_restart: whether the advertisement will be restarted after disconnection. - advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent. - scan_response_data: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent. - - Returns: - ExtendedAdvertiser object containing the metadata of advertisement. - """ - if self.legacy_advertiser: - logger.warning( - 'Trying to start Legacy and Extended Advertising at the same time!' + advertising_parameters: Optional[AdvertisingParameters] = None, + random_address: Optional[Address] = None, + advertising_data: bytes = b'', + scan_response_data: bytes = b'', + periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None, + periodic_advertising_data: bytes = b'', + auto_restart: bool = False, + ) -> AdvertisingSet: + # Allocate a new handle + try: + advertising_handle = next( + handle + for handle in range( + DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, + DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, + ) + if handle not in self.extended_advertising_sets ) + except StopIteration as exc: + raise RuntimeError("all valid advertising handles already in use") from exc - adv_handle = -1 - # Find a free handle - for i in range( - DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, - DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, + # Instantiate default values + if advertising_parameters is None: + advertising_parameters = AdvertisingParameters() + + # Use the device's random address if a random address is needed but none was + # provided. + if ( + advertising_parameters.own_address_type + in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) + and random_address is None ): - if i not in self.extended_advertisers: - adv_handle = i - break + random_address = self.random_address + + # Create the object that represents the set. + advertising_set = AdvertisingSet( + device=self, + advertising_handle=advertising_handle, + auto_restart=auto_restart, + random_address=random_address, + advertising_parameters=advertising_parameters, + advertising_data=advertising_data, + scan_response_data=scan_response_data, + periodic_advertising_parameters=periodic_advertising_parameters, + periodic_advertising_data=periodic_advertising_data, + ) - if adv_handle == -1: - raise InvalidStateError('No available advertising set.') + # Create the set in the controller. + await advertising_set.set_advertising_parameters(advertising_parameters) + # Update the set in the controller. try: - # Set the advertising parameters - await self.send_command( - HCI_LE_Set_Extended_Advertising_Parameters_Command( - advertising_handle=adv_handle, - advertising_event_properties=advertising_properties, - primary_advertising_interval_min=self.advertising_interval_min, - primary_advertising_interval_max=self.advertising_interval_max, - primary_advertising_channel_map=( - HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37 - | HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38 - | HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39 - ), - own_address_type=own_address_type, - peer_address_type=target.address_type, - peer_address=target, - advertising_tx_power=7, - advertising_filter_policy=0, - primary_advertising_phy=1, # LE 1M - secondary_advertising_max_skip=0, - secondary_advertising_phy=1, # LE 1M - advertising_sid=0, - scan_request_notification_enable=0, - ), - check_result=True, - ) + if random_address: + await advertising_set.set_random_address(random_address) - # Set the advertising data if present - if advertising_data is not None: - await self.send_command( - HCI_LE_Set_Extended_Advertising_Data_Command( - advertising_handle=adv_handle, - operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, - fragment_preference=0x01, # Should not fragment - advertising_data=advertising_data, - ), - check_result=True, - ) + if advertising_data: + await advertising_set.set_advertising_data(advertising_data) - # Set the scan response if present - if scan_response_data is not None: - await self.send_command( - HCI_LE_Set_Extended_Scan_Response_Data_Command( - advertising_handle=adv_handle, - operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, - fragment_preference=0x01, # Should not fragment - scan_response_data=scan_response_data, - ), - check_result=True, - ) + if scan_response_data: + await advertising_set.set_scan_response_data(scan_response_data) - if own_address_type in ( - OwnAddressType.RANDOM, - OwnAddressType.RESOLVABLE_OR_RANDOM, - ): - await self.send_command( - HCI_LE_Set_Advertising_Set_Random_Address_Command( - advertising_handle=adv_handle, - random_address=self.random_address, - ), - check_result=True, - ) + if periodic_advertising_parameters: + # TODO: call LE Set Periodic Advertising Parameters command + pass + + if periodic_advertising_data: + # TODO: call LE Set Periodic Advertising Data command + pass - # Enable advertising - await self.send_command( - HCI_LE_Set_Extended_Advertising_Enable_Command( - enable=1, - advertising_handles=[adv_handle], - durations=[0], # Forever - max_extended_advertising_events=[0], # Infinite - ), - check_result=True, - ) except HCI_Error as error: - # When any step fails, cleanup the advertising handle. + # Remove the advertising set so that it doesn't stay dangling in the + # controller. await self.send_command( - HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), + HCI_LE_Remove_Advertising_Set_Command( + advertising_handle=advertising_data + ), check_result=False, ) raise error - advertiser = self.extended_advertisers[adv_handle] = ExtendedAdvertiser( - device=self, - handle=adv_handle, - advertising_properties=advertising_properties, - own_address_type=own_address_type, - auto_restart=auto_restart, - advertising_data=advertising_data, - scan_response_data=scan_response_data, - ) - return advertiser - - @experimental('Extended Advertising is still experimental - Might be changed soon.') - async def stop_extended_advertising(self, adv_handle: int) -> None: - """Stops an extended advertising set. + # Remember the set. + self.extended_advertising_sets[advertising_handle] = advertising_set - Args: - adv_handle: Handle of the advertising set to stop. - """ - # Disable advertising - await self.send_command( - HCI_LE_Set_Extended_Advertising_Enable_Command( - enable=0, - advertising_handles=[adv_handle], - durations=[0], - max_extended_advertising_events=[0], - ), - check_result=True, - ) - # Remove advertising set - await self.send_command( - HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), - check_result=True, - ) - del self.extended_advertisers[adv_handle] + return advertising_set @property def is_advertising(self): - return self.legacy_advertiser or self.extended_advertisers + if self.legacy_advertiser: + return True + + if self.legacy_advertising_set and self.legacy_advertising_set.enabled: + return True + + for advertising_set in self.extended_advertising_sets.values(): + if advertising_set.enabled: + return True + + return False async def start_scanning( self, @@ -1920,9 +2174,7 @@ async def start_scanning( self.advertisement_accumulators = {} # Enable scanning - if not legacy and self.supports_le_feature( - HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE - ): + if not legacy and self.supports_le_extended_advertising: # Set the scanning parameters scan_type = ( HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING @@ -1998,9 +2250,9 @@ async def start_scanning( self.scanning_is_passive = not active self.scanning = True - async def stop_scanning(self) -> None: + async def stop_scanning(self, legacy: bool = False) -> None: # Disable scanning - if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE): + if not legacy and self.supports_le_extended_advertising: await self.send_command( HCI_LE_Set_Extended_Scan_Enable_Command( enable=0, filter_duplicates=0, duration=0, period=0 @@ -3193,6 +3445,73 @@ async def indicate_subscriber(self, connection, attribute, value=None, force=Fal async def indicate_subscribers(self, attribute, value=None, force=False): await self.gatt_server.indicate_subscribers(attribute, value, force) + @host_event_handler + def on_advertising_set_termination( + self, + status, + advertising_handle, + connection_handle, + number_of_completed_extended_advertising_events, + ): + if not ( + advertising_set := self.extended_advertising_sets.get(advertising_handle) + ): + logger.warning(f'advertising set {advertising_handle} not found') + return + + advertising_set.on_termination(status) + + if status != HCI_SUCCESS: + logger.debug( + f'advertising set {advertising_handle} ' + f'terminated with status {status}' + ) + return + + if not (connection := self.lookup_connection(connection_handle)): + logger.warning(f'no connection for handle 0x{connection_handle:04x}') + return + + # Update the connection address. + connection.self_address = ( + advertising_set.random_address + if advertising_set.advertising_parameters.own_address_type + in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) + else self.public_address + ) + + # Setup auto-restart of the advertising set if needed. + if advertising_set.auto_restart: + connection.once( + 'disconnection', + lambda _: self.abort_on('flush', advertising_set.start()), + ) + + self.emit_le_connection(connection) + + def emit_le_connection(self, connection: Connection) -> None: + # If supported, read which PHY we're connected with before + # notifying listeners of the new connection. + if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): + + async def read_phy(): + result = await self.send_command( + HCI_LE_Read_PHY_Command(connection_handle=connection.handle), + check_result=True, + ) + connection.phy = ConnectionPHY( + result.return_parameters.tx_phy, result.return_parameters.rx_phy + ) + # Emit an event to notify listeners of the new connection + self.emit('connection', connection) + + # Do so asynchronously to not block the current event handler + connection.abort_on('disconnection', read_phy()) + + return + + self.emit('connection', connection) + @host_event_handler def on_connection( self, @@ -3211,8 +3530,6 @@ def on_connection( 'new connection reuses the same handle as a previous connection' ) - peer_resolvable_address = None - if transport == BT_BR_EDR_TRANSPORT: # Create a new connection connection = self.pending_connections.pop(peer_address) @@ -3221,76 +3538,75 @@ def on_connection( # Emit an event to notify listeners of the new connection self.emit('connection', connection) - else: - # Resolve the peer address if we can - if self.address_resolver: - if peer_address.is_resolvable: - resolved_address = self.address_resolver.resolve(peer_address) - if resolved_address is not None: - logger.debug(f'*** Address resolved as {resolved_address}') - peer_resolvable_address = peer_address - peer_address = resolved_address - - # Guess which own address type is used for this connection. - # This logic is somewhat correct but may need to be improved - # when multiple advertising are run simultaneously. - advertiser = None - if self.connect_own_address_type is not None: - own_address_type = self.connect_own_address_type - elif self.legacy_advertiser: - own_address_type = self.legacy_advertiser.own_address_type - # Store advertiser for restarting - it's only required for legacy, since - # extended advertisement produces HCI_Advertising_Set_Terminated. - if self.legacy_advertiser.auto_restart: - advertiser = self.legacy_advertiser - else: - # For extended advertisement, determining own address type later. - own_address_type = OwnAddressType.RANDOM - if own_address_type in ( - OwnAddressType.PUBLIC, - OwnAddressType.RESOLVABLE_OR_PUBLIC, - ): - self_address = self.public_address - else: - self_address = self.random_address - - # Create a new connection - connection = Connection( - self, - connection_handle, - transport, - self_address, - peer_address, - peer_resolvable_address, - role, - connection_parameters, - ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), - ) - connection.advertiser_after_disconnection = advertiser - self.connections[connection_handle] = connection + return - # If supported, read which PHY we're connected with before - # notifying listeners of the new connection. - if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): + # Resolve the peer address if we can + peer_resolvable_address = None + if self.address_resolver: + if peer_address.is_resolvable: + resolved_address = self.address_resolver.resolve(peer_address) + if resolved_address is not None: + logger.debug(f'*** Address resolved as {resolved_address}') + peer_resolvable_address = peer_address + peer_address = resolved_address + + self_address = None + if role == HCI_CENTRAL_ROLE: + own_address_type = self.connect_own_address_type + assert own_address_type + else: + if self.supports_le_extended_advertising: + # We'll know the address when the advertising set terminates, + # Use a temporary placeholder value for self_address. + self_address = Address.ANY_RANDOM + else: + # We were connected via a legacy advertisement. + if self.legacy_advertiser: + own_address_type = self.legacy_advertiser.own_address_type + else: + # This should not happen, but just in case, pick a default. + logger.warning("connection without an advertiser") + self_address = self.random_address + + if self_address is None: + self_address = ( + self.public_address + if own_address_type + in ( + OwnAddressType.PUBLIC, + OwnAddressType.RESOLVABLE_OR_PUBLIC, + ) + else self.random_address + ) - async def read_phy(): - result = await self.send_command( - HCI_LE_Read_PHY_Command(connection_handle=connection_handle), - check_result=True, - ) - connection.phy = ConnectionPHY( - result.return_parameters.tx_phy, result.return_parameters.rx_phy - ) - # Emit an event to notify listeners of the new connection - self.emit('connection', connection) + # Create a connection. + connection = Connection( + self, + connection_handle, + transport, + self_address, + peer_address, + peer_resolvable_address, + role, + connection_parameters, + ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), + ) + self.connections[connection_handle] = connection - # Do so asynchronously to not block the current event handler - connection.abort_on('disconnection', read_phy()) + if ( + role == HCI_PERIPHERAL_ROLE + and self.legacy_advertiser + and self.legacy_advertiser.auto_restart + ): + connection.once( + 'disconnection', + lambda _: self.abort_on('flush', self.legacy_advertiser.start()), + ) - else: - # Emit an event to notify listeners of the new connection - self.emit('connection', connection) + if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising: + # We can emit now, we have all the info we need + self.emit_le_connection(connection) @host_event_handler def on_connection_failure(self, transport, peer_address, error_code): @@ -3362,32 +3678,6 @@ def on_disconnection(self, connection_handle: int, reason: int) -> None: # Cleanup subsystems that maintain per-connection state self.gatt_server.on_disconnection(connection) - - # Restart advertising if auto-restart is enabled - if advertiser := connection.advertiser_after_disconnection: - logger.debug('restarting advertising') - if isinstance(advertiser, LegacyAdvertiser): - self.abort_on( - 'flush', - self.start_legacy_advertising( - advertising_type=advertiser.advertising_type, - own_address_type=advertiser.own_address_type, - advertising_data=advertiser.advertising_data, - scan_response_data=advertiser.scan_response_data, - auto_restart=True, - ), - ) - elif isinstance(advertiser, ExtendedAdvertiser): - self.abort_on( - 'flush', - self.start_extended_advertising( - advertising_properties=advertiser.advertising_properties, - own_address_type=advertiser.own_address_type, - advertising_data=advertiser.advertising_data, - scan_response_data=advertiser.scan_response_data, - auto_restart=True, - ), - ) elif sco_link := self.sco_links.pop(connection_handle, None): sco_link.emit('disconnection', reason) elif cis_link := self.cis_links.pop(connection_handle, None): @@ -3710,30 +4000,6 @@ def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> N if sco_link := self.sco_links.get(sco_handle, None): sco_link.emit('pdu', packet) - # [LE only] - @host_event_handler - @experimental('Only for testing') - def on_advertising_set_termination( - self, - status: int, - advertising_handle: int, - connection_handle: int, - ) -> None: - if status == HCI_SUCCESS: - connection = self.lookup_connection(connection_handle) - if advertiser := self.extended_advertisers.pop(advertising_handle, None): - if connection: - if advertiser.auto_restart: - connection.advertiser_after_disconnection = advertiser - if advertiser.own_address_type in ( - OwnAddressType.PUBLIC, - OwnAddressType.RESOLVABLE_OR_PUBLIC, - ): - connection.self_address = self.public_address - else: - connection.self_address = self.random_address - advertiser.emit('termination', status) - # [LE only] @host_event_handler @with_connection_from_handle diff --git a/bumble/hci.py b/bumble/hci.py index 25d3ec28..f366428b 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -23,7 +23,7 @@ import logging import secrets import struct -from typing import Any, Dict, Callable, Optional, Type, Union, List +from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union from bumble import crypto from .colors import color @@ -223,41 +223,47 @@ def phy_list_to_bits(phys): # HCI Subevent Codes -HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01 -HCI_LE_ADVERTISING_REPORT_EVENT = 0x02 -HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03 -HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04 -HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05 -HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06 -HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07 -HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08 -HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09 -HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A -HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B -HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C -HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D -HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E -HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F -HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10 -HCI_LE_SCAN_TIMEOUT_EVENT = 0x11 -HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12 -HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13 -HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14 -HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15 -HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16 -HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17 -HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18 -HCI_LE_CIS_ESTABLISHED_EVENT = 0X19 -HCI_LE_CIS_REQUEST_EVENT = 0X1A -HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B -HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C -HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D -HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E -HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F -HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20 -HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21 -HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22 -HCI_LE_SUBRATE_CHANGE_EVENT = 0X23 +HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01 +HCI_LE_ADVERTISING_REPORT_EVENT = 0x02 +HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03 +HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04 +HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05 +HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06 +HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07 +HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08 +HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09 +HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A +HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B +HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C +HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D +HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E +HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F +HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10 +HCI_LE_SCAN_TIMEOUT_EVENT = 0x11 +HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12 +HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13 +HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14 +HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15 +HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16 +HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17 +HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18 +HCI_LE_CIS_ESTABLISHED_EVENT = 0X19 +HCI_LE_CIS_REQUEST_EVENT = 0X1A +HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B +HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C +HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D +HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E +HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F +HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20 +HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21 +HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22 +HCI_LE_SUBRATE_CHANGE_EVENT = 0X23 +HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT = 0X24 +HCI_LE_PERIODIC_ADVERTISING_REPORT_V2_EVENT = 0X25 +HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_V2_EVENT = 0X26 +HCI_LE_PERIODIC_ADVERTISING_SUBEVENT_DATA_REQUEST_EVENT = 0X27 +HCI_LE_PERIODIC_ADVERTISING_RESPONSE_REPORT_EVENT = 0X28 +HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT = 0X29 # HCI Command @@ -650,47 +656,6 @@ def phy_list_to_bits(phys): # Command Status codes HCI_COMMAND_STATUS_PENDING = 0 -# LE Event Masks -HCI_LE_CONNECTION_COMPLETE_EVENT_MASK = (1 << 0) -HCI_LE_ADVERTISING_REPORT_EVENT_MASK = (1 << 1) -HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT_MASK = (1 << 2) -HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT_MASK = (1 << 3) -HCI_LE_LONG_TERM_KEY_REQUEST_EVENT_MASK = (1 << 4) -HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT_MASK = (1 << 5) -HCI_LE_DATA_LENGTH_CHANGE_EVENT_MASK = (1 << 6) -HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT_MASK = (1 << 7) -HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT_MASK = (1 << 8) -HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT_MASK = (1 << 9) -HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT_MASK = (1 << 10) -HCI_LE_PHY_UPDATE_COMPLETE_EVENT_MASK = (1 << 11) -HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT_MASK = (1 << 12) -HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT_MASK = (1 << 13) -HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT_MASK = (1 << 14) -HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT_MASK = (1 << 15) -HCI_LE_EXTENDED_SCAN_TIMEOUT_EVENT_MASK = (1 << 16) -HCI_LE_EXTENDED_ADVERTISING_SET_TERMINATED_EVENT_MASK = (1 << 17) -HCI_LE_SCAN_REQUEST_RECEIVED_EVENT_MASK = (1 << 18) -HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT_MASK = (1 << 19) -HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT_MASK = (1 << 20) -HCI_LE_CONNECTION_IQ_REPORT_EVENT_MASK = (1 << 21) -HCI_LE_CTE_REQUEST_FAILED_EVENT_MASK = (1 << 22) -HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT_MASK = (1 << 23) -HCI_LE_CIS_ESTABLISHED_EVENT_MASK = (1 << 24) -HCI_LE_CIS_REQUEST_EVENT_MASK = (1 << 25) -HCI_LE_CREATE_BIG_COMPLETE_EVENT_MASK = (1 << 26) -HCI_LE_TERMINATE_BIG_COMPLETE_EVENT_MASK = (1 << 27) -HCI_LE_BIG_SYNC_ESTABLISHED_EVENT_MASK = (1 << 28) -HCI_LE_BIG_SYNC_LOST_EVENT_MASK = (1 << 29) -HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT_MASK = (1 << 30) -HCI_LE_PATH_LOSS_THRESHOLD_EVENT_MASK = (1 << 31) -HCI_LE_TRANSMIT_POWER_REPORTING_EVENT_MASK = (1 << 32) -HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT_MASK = (1 << 33) -HCI_LE_SUBRATE_CHANGE_EVENT_MASK = (1 << 34) - -HCI_LE_EVENT_MASK_NAMES = { - mask: mask_name for (mask_name, mask) in globals().items() - if mask_name.startswith('HCI_LE_') and mask_name.endswith('_EVENT_MASK') -} # ACL HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0 @@ -732,15 +697,15 @@ def phy_list_to_bits(phys): class Phy(enum.IntEnum): - LE_1M = 0x01 - LE_2M = 0x02 - LE_CODED = 0x03 + LE_1M = HCI_LE_1M_PHY + LE_2M = HCI_LE_2M_PHY + LE_CODED = HCI_LE_CODED_PHY class PhyBit(enum.IntFlag): - LE_1M = 0b00000001 - LE_2M = 0b00000010 - LE_CODED = 0b00000100 + LE_1M = 1 << HCI_LE_1M_PHY_BIT + LE_2M = 1 << HCI_LE_2M_PHY_BIT + LE_CODED = 1 << HCI_LE_CODED_PHY_BIT # Connection Parameters @@ -2857,6 +2822,20 @@ class HCI_Set_Event_Mask_Command(HCI_Command): See Bluetooth spec @ 7.3.1 Set Event Mask Command ''' + @staticmethod + def mask(event_codes: Iterable[int]) -> bytes: + ''' + Compute the event mask value for a list of events. + ''' + # NOTE: this implementation takes advantage of the fact that as of version 5.4 + # of the core specification, the bit number for each event code is equal to one + # less than the event code. + # If future versions of the specification deviate from that, a different + # implementation would be needed. + return sum((1 << event_code - 1) for event_code in event_codes).to_bytes( + 8, 'little' + ) + # ----------------------------------------------------------------------------- @HCI_Command.command() @@ -3359,6 +3338,20 @@ class HCI_LE_Set_Event_Mask_Command(HCI_Command): See Bluetooth spec @ 7.8.1 LE Set Event Mask Command ''' + @staticmethod + def mask(event_codes: Iterable[int]) -> bytes: + ''' + Compute the event mask value for a list of events. + ''' + # NOTE: this implementation takes advantage of the fact that as of version 5.4 + # of the core specification, the bit number for each event code is equal to one + # less than the event code. + # If future versions of the specification deviate from that, a different + # implementation would be needed. + return sum((1 << event_code - 1) for event_code in event_codes).to_bytes( + 8, 'little' + ) + # ----------------------------------------------------------------------------- @HCI_Command.command( @@ -3966,13 +3959,16 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): ('advertising_sid', 1), ('scan_request_notification_enable', 1), ], - return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx__power', 1)], + return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx_power', 1)], ) class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): ''' See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command ''' + TX_POWER_NO_PREFERENCE = 0x7F + SHOULD_NOT_FRAGMENT = 0x01 + class AdvertisingProperties(enum.IntFlag): CONNECTABLE_ADVERTISING = 1 << 0 SCANNABLE_ADVERTISING = 1 << 1 @@ -4217,7 +4213,7 @@ def __str__(self): ('scanning_filter_policy:', self.scanning_filter_policy), ('scanning_phys: ', ','.join(scanning_phys_strs)), ] - for (i, scanning_phy_str) in enumerate(scanning_phys_strs): + for i, scanning_phy_str in enumerate(scanning_phys_strs): fields.append( ( f'{scanning_phy_str}.scan_type: ', @@ -4360,7 +4356,7 @@ def __str__(self): ('peer_address: ', str(self.peer_address)), ('initiating_phys: ', ','.join(initiating_phys_strs)), ] - for (i, initiating_phys_str) in enumerate(initiating_phys_strs): + for i, initiating_phys_str in enumerate(initiating_phys_strs): fields.append( ( f'{initiating_phys_str}.scan_interval: ', @@ -5242,7 +5238,7 @@ def __str__(self): ('status', 1), ('advertising_handle', 1), ('connection_handle', 2), - ('number_completed_extended_advertising_events', 1), + ('num_completed_extended_advertising_events', 1), ] ) class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event): @@ -6183,7 +6179,7 @@ def from_bytes(packet: bytes) -> HCI_IsoDataPacket: if ts_flag: if not should_include_sdu_info: - logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}') + logger.warning(f'Timestamp included when pb_flag={bin(pb_flag)}') time_stamp, *_ = struct.unpack_from(' Optional[TransportSink]: return self.hci_sink @@ -731,6 +928,7 @@ def on_hci_le_advertising_set_terminated_event(self, event): event.status, event.advertising_handle, event.connection_handle, + event.num_completed_extended_advertising_events, ) def on_hci_le_cis_request_event(self, event): diff --git a/bumble/l2cap.py b/bumble/l2cap.py index ce3385df..7dc45d8e 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -151,8 +151,8 @@ L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23 L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23 L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533 -L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2046 -L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048 +L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048 +L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2046 L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256 L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01 diff --git a/bumble/utils.py b/bumble/utils.py index 81e150cc..80c5647f 100644 --- a/bumble/utils.py +++ b/bumble/utils.py @@ -223,13 +223,13 @@ def listener(self, listener): if self._listener: # Call the deregistration methods for each base class that has them for cls in self._listener.__class__.mro(): - if hasattr(cls, '_bumble_register_composite'): - cls._bumble_deregister_composite(listener, self) + if '_bumble_register_composite' in cls.__dict__: + cls._bumble_deregister_composite(self._listener, self) self._listener = listener if listener: # Call the registration methods for each base class that has them for cls in listener.__class__.mro(): - if hasattr(cls, '_bumble_deregister_composite'): + if '_bumble_deregister_composite' in cls.__dict__: cls._bumble_register_composite(listener, self) diff --git a/examples/run_advertiser.py b/examples/run_advertiser.py index 56b1b8bd..fb594267 100644 --- a/examples/run_advertiser.py +++ b/examples/run_advertiser.py @@ -19,9 +19,11 @@ import logging import sys import os +import struct + +from bumble.core import AdvertisingData from bumble.device import AdvertisingType, Device from bumble.hci import Address - from bumble.transport import open_transport_or_link @@ -52,6 +54,16 @@ async def main(): print('<<< connected') device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + + if advertising_type.is_scannable: + device.scan_response_data = bytes( + AdvertisingData( + [ + (AdvertisingData.APPEARANCE, struct.pack(' None: devices[1].cis_enabled = True await asyncio.gather(*[device.power_on() for device in devices]) - await devices[0].start_extended_advertising( - advertising_properties=( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING - ), - own_address_type=OwnAddressType.PUBLIC, - ) + advertising_set = await devices[0].create_advertising_set() + await advertising_set.start() connection = await devices[1].connect( devices[0].public_address, own_address_type=OwnAddressType.PUBLIC diff --git a/examples/run_extended_advertiser.py b/examples/run_extended_advertiser.py index 20b0b341..bcd08474 100644 --- a/examples/run_extended_advertiser.py +++ b/examples/run_extended_advertiser.py @@ -19,8 +19,13 @@ import logging import sys import os -from bumble.device import AdvertisingType, Device -from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command +from bumble.device import ( + AdvertisingParameters, + AdvertisingEventProperties, + AdvertisingType, + Device, +) +from bumble.hci import Address from bumble.transport import open_transport_or_link @@ -35,20 +40,16 @@ async def main() -> None: return if len(sys.argv) >= 4: - advertising_properties = ( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( - int(sys.argv[3]) - ) + advertising_properties = AdvertisingEventProperties.from_advertising_type( + AdvertisingType(int(sys.argv[3])) ) else: - advertising_properties = ( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING - ) + advertising_properties = AdvertisingEventProperties() if len(sys.argv) >= 5: - target = Address(sys.argv[4]) + peer_address = Address(sys.argv[4]) else: - target = Address.ANY + peer_address = Address.ANY print('<<< connecting to HCI...') async with await open_transport_or_link(sys.argv[2]) as hci_transport: @@ -58,9 +59,14 @@ async def main() -> None: sys.argv[1], hci_transport.source, hci_transport.sink ) await device.power_on() - await device.start_extended_advertising( - advertising_properties=advertising_properties, target=target - ) + await ( + await device.create_advertising_set( + advertising_parameters=AdvertisingParameters( + advertising_event_properties=advertising_properties, + peer_address=peer_address, + ) + ) + ).start() await hci_transport.source.terminated diff --git a/examples/run_extended_advertiser_2.py b/examples/run_extended_advertiser_2.py new file mode 100644 index 00000000..8ea94442 --- /dev/null +++ b/examples/run_extended_advertiser_2.py @@ -0,0 +1,100 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import logging +import sys +import os +from bumble.device import AdvertisingParameters, AdvertisingEventProperties, Device +from bumble.hci import Address +from bumble.core import AdvertisingData +from bumble.transport import open_transport_or_link + + +# ----------------------------------------------------------------------------- +async def main() -> None: + if len(sys.argv) < 3: + print('Usage: run_extended_advertiser_2.py ') + print('example: run_extended_advertiser_2.py device1.json usb:0') + return + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + print('<<< connected') + + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) + await device.power_on() + + if not device.supports_le_extended_advertising: + print("Device does not support extended adverising") + return + + print("Max advertising sets:", device.host.number_of_supported_advertising_sets) + print( + "Max advertising data length:", device.host.maximum_advertising_data_length + ) + + if device.host.number_of_supported_advertising_sets >= 1: + advertising_data1 = AdvertisingData( + [(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 1".encode("utf-8"))] + ) + + set1 = await device.create_advertising_set( + advertising_data=bytes(advertising_data1), + ) + print("Selected TX power 1:", set1.selected_tx_power) + await set1.start() + + advertising_data2 = AdvertisingData( + [(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))] + ) + + if device.host.number_of_supported_advertising_sets >= 2: + set2 = await device.create_advertising_set( + random_address=Address("F0:F0:F0:F0:F0:F1"), + advertising_parameters=AdvertisingParameters(), + advertising_data=bytes(advertising_data2), + auto_restart=True, + ) + print("Selected TX power 2:", set2.selected_tx_power) + await set2.start() + + if device.host.number_of_supported_advertising_sets >= 3: + scan_response_data3 = AdvertisingData( + [(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 3".encode("utf-8"))] + ) + + set3 = await device.create_advertising_set( + random_address=Address("F0:F0:F0:F0:F0:F2"), + advertising_parameters=AdvertisingParameters( + advertising_event_properties=AdvertisingEventProperties( + is_connectable=False, is_scannable=True + ) + ), + scan_response_data=bytes(scan_response_data3), + ) + print("Selected TX power 3:", set2.selected_tx_power) + await set3.start() + + await hci_transport.source.terminated + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main()) diff --git a/examples/run_unicast_server.py b/examples/run_unicast_server.py index 35e124d4..db0d82b6 100644 --- a/examples/run_unicast_server.py +++ b/examples/run_unicast_server.py @@ -22,13 +22,12 @@ import struct import secrets from bumble.core import AdvertisingData -from bumble.device import Device, CisLink +from bumble.device import Device, CisLink, AdvertisingParameters from bumble.hci import ( CodecID, CodingFormat, OwnAddressType, HCI_IsoDataPacket, - HCI_LE_Set_Extended_Advertising_Parameters_Command, ) from bumble.profiles.bap import ( CodecSpecificCapabilities, @@ -179,13 +178,10 @@ def on_cis(cis_link: CisLink): device.once('cis_establishment', on_cis) - await device.start_extended_advertising( - advertising_properties=( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING - ), - own_address_type=OwnAddressType.RANDOM, + advertising_set = await device.create_advertising_set( advertising_data=advertising_data, ) + await advertising_set.start() await hci_transport.source.terminated diff --git a/tests/device_test.py b/tests/device_test.py index d51431f5..e5e7892d 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -28,7 +28,7 @@ BT_PERIPHERAL_ROLE, ConnectionParameters, ) -from bumble.device import Connection, Device +from bumble.device import AdvertisingParameters, Connection, Device from bumble.host import Host from bumble.hci import ( HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, @@ -245,12 +245,12 @@ async def test_legacy_advertising(): device = Device(host=mock.AsyncMock(Host)) # Start advertising - advertiser = await device.start_legacy_advertising() - assert device.legacy_advertiser + await device.start_advertising() + assert device.is_advertising # Stop advertising - await advertiser.stop() - assert not device.legacy_advertiser + await device.stop_advertising() + assert not device.is_advertising # ----------------------------------------------------------------------------- @@ -264,7 +264,7 @@ async def test_legacy_advertising_connection(own_address_type): peer_address = Address('F0:F1:F2:F3:F4:F5') # Start advertising - advertiser = await device.start_legacy_advertising() + await device.start_advertising() device.on_connection( 0x0001, BT_LE_TRANSPORT, @@ -292,7 +292,7 @@ async def test_legacy_advertising_connection(own_address_type): async def test_legacy_advertising_disconnection(auto_restart): device = Device(host=mock.AsyncMock(spec=Host)) peer_address = Address('F0:F1:F2:F3:F4:F5') - advertiser = await device.start_legacy_advertising(auto_restart=auto_restart) + await device.start_advertising(auto_restart=auto_restart) device.on_connection( 0x0001, BT_LE_TRANSPORT, @@ -301,20 +301,14 @@ async def test_legacy_advertising_disconnection(auto_restart): ConnectionParameters(0, 0, 0), ) - device.start_legacy_advertising = mock.AsyncMock() + device.start_advertising = mock.AsyncMock() device.on_disconnection(0x0001, 0) if auto_restart: - device.start_legacy_advertising.assert_called_with( - advertising_type=advertiser.advertising_type, - own_address_type=advertiser.own_address_type, - auto_restart=advertiser.auto_restart, - advertising_data=advertiser.advertising_data, - scan_response_data=advertiser.scan_response_data, - ) + assert device.is_advertising else: - device.start_legacy_advertising.assert_not_called() + not device.is_advertising # ----------------------------------------------------------------------------- @@ -323,12 +317,14 @@ async def test_extended_advertising(): device = Device(host=mock.AsyncMock(Host)) # Start advertising - advertiser = await device.start_extended_advertising() - assert device.extended_advertisers + advertising_set = await device.create_advertising_set() + await advertising_set.start() + assert device.extended_advertising_sets + assert advertising_set.enabled # Stop advertising - await advertiser.stop() - assert not device.extended_advertisers + await advertising_set.stop() + assert not advertising_set.enabled # ----------------------------------------------------------------------------- @@ -340,9 +336,10 @@ async def test_extended_advertising(): async def test_extended_advertising_connection(own_address_type): device = Device(host=mock.AsyncMock(spec=Host)) peer_address = Address('F0:F1:F2:F3:F4:F5') - advertiser = await device.start_extended_advertising( - own_address_type=own_address_type + advertising_set = await device.create_advertising_set( + advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) ) + await advertising_set.start() device.on_connection( 0x0001, BT_LE_TRANSPORT, @@ -352,8 +349,9 @@ async def test_extended_advertising_connection(own_address_type): ) device.on_advertising_set_termination( HCI_SUCCESS, - advertiser.handle, + advertising_set.advertising_handle, 0x0001, + 0, ) if own_address_type == OwnAddressType.PUBLIC: @@ -366,45 +364,6 @@ async def test_extended_advertising_connection(own_address_type): await asyncio.sleep(0.0001) -# ----------------------------------------------------------------------------- -@pytest.mark.parametrize( - 'auto_restart,', - (True, False), -) -@pytest.mark.asyncio -async def test_extended_advertising_disconnection(auto_restart): - device = Device(host=mock.AsyncMock(spec=Host)) - peer_address = Address('F0:F1:F2:F3:F4:F5') - advertiser = await device.start_extended_advertising(auto_restart=auto_restart) - device.on_connection( - 0x0001, - BT_LE_TRANSPORT, - peer_address, - BT_PERIPHERAL_ROLE, - ConnectionParameters(0, 0, 0), - ) - device.on_advertising_set_termination( - HCI_SUCCESS, - advertiser.handle, - 0x0001, - ) - - device.start_extended_advertising = mock.AsyncMock() - - device.on_disconnection(0x0001, 0) - - if auto_restart: - device.start_extended_advertising.assert_called_with( - advertising_properties=advertiser.advertising_properties, - own_address_type=advertiser.own_address_type, - auto_restart=advertiser.auto_restart, - advertising_data=advertiser.advertising_data, - scan_response_data=advertiser.scan_response_data, - ) - else: - device.start_extended_advertising.assert_not_called() - - # ----------------------------------------------------------------------------- def test_gatt_services_with_gas(): device = Device(host=Host(None, None)) diff --git a/tests/hci_test.py b/tests/hci_test.py index 1504f200..72f4022d 100644 --- a/tests/hci_test.py +++ b/tests/hci_test.py @@ -23,6 +23,8 @@ HCI_LE_READ_BUFFER_SIZE_COMMAND, HCI_RESET_COMMAND, HCI_SUCCESS, + HCI_LE_CONNECTION_COMPLETE_EVENT, + HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, Address, CodingFormat, CodecID, @@ -274,8 +276,14 @@ def test_HCI_Set_Event_Mask_Command(): # ----------------------------------------------------------------------------- def test_HCI_LE_Set_Event_Mask_Command(): command = HCI_LE_Set_Event_Mask_Command( - le_event_mask=bytes.fromhex('0011223344556677') + le_event_mask=HCI_LE_Set_Event_Mask_Command.mask( + [ + HCI_LE_CONNECTION_COMPLETE_EVENT, + HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, + ] + ) ) + assert command.le_event_mask == bytes.fromhex('0100000000010000') basic_check(command)