diff --git a/bumble/device.py b/bumble/device.py index d4db4962..46ddc48e 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -23,7 +23,7 @@ import logging import secrets from contextlib import asynccontextmanager, AsyncExitStack, closing -from dataclasses import dataclass +from dataclasses import dataclass, field from collections.abc import Iterable from typing import ( Any, @@ -41,6 +41,8 @@ TYPE_CHECKING, ) +from pyee import EventEmitter + from .colors import color from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU from .gatt import Characteristic, Descriptor, Service @@ -48,6 +50,7 @@ HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_CENTRAL_ROLE, + HCI_PERIPHERAL_ROLE, HCI_COMMAND_STATUS_PENDING, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, HCI_DISPLAY_YES_NO_IO_CAPABILITY, @@ -74,7 +77,6 @@ HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, HCI_SUCCESS, HCI_WRITE_LE_HOST_SUPPORT_COMMAND, - Address, HCI_Accept_Connection_Request_Command, HCI_Authentication_Requested_Command, HCI_Command_Status_Event, @@ -120,6 +122,7 @@ HCI_LE_Set_Extended_Advertising_Enable_Command, HCI_LE_Set_Extended_Advertising_Parameters_Command, HCI_LE_Set_Host_Feature_Command, + HCI_LE_Set_Periodic_Advertising_Enable_Command, HCI_LE_Set_PHY_Command, HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Scan_Enable_Command, @@ -147,9 +150,11 @@ HCI_Write_Scan_Enable_Command, HCI_Write_Secure_Connections_Host_Support_Command, HCI_Write_Simple_Pairing_Mode_Command, + Address, OwnAddressType, LeFeature, LeFeatureMask, + Phy, phy_list_to_bits, ) from .host import Host @@ -232,10 +237,16 @@ DEVICE_DEFAULT_L2CAP_COC_MTU = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU DEVICE_DEFAULT_L2CAP_COC_MPS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS +DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE +) # fmt: on # pylint: enable=line-too-long +# As specified in 7.8.56 LE Set Extended Advertising Enable command +DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION = 1.28 + # ----------------------------------------------------------------------------- # Classes @@ -423,6 +434,10 @@ def is_directed(self) -> bool: AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, ) + @property + def is_high_duty_cycle_directed_connectable(self): + return self == AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY + # ----------------------------------------------------------------------------- @dataclass @@ -430,30 +445,344 @@ class LegacyAdvertiser: device: Device advertising_type: AdvertisingType own_address_type: OwnAddressType + peer_address: Address auto_restart: bool - advertising_data: Optional[bytes] - scan_response_data: Optional[bytes] + + async def start(self) -> None: + # Set/update the advertising data if the advertising type allows it + if self.advertising_type.has_data: + await self.device.send_command( + HCI_LE_Set_Advertising_Data_Command( + advertising_data=self.device.advertising_data + ), + check_result=True, + ) + + # Set/update the scan response data if the advertising is scannable + if self.advertising_type.is_scannable: + await self.device.send_command( + HCI_LE_Set_Scan_Response_Data_Command( + scan_response_data=self.device.scan_response_data + ), + check_result=True, + ) + + # Set the advertising parameters + await self.device.send_command( + HCI_LE_Set_Advertising_Parameters_Command( + advertising_interval_min=self.device.advertising_interval_min, + advertising_interval_max=self.device.advertising_interval_max, + advertising_type=int(self.advertising_type), + own_address_type=self.own_address_type, + peer_address_type=self.peer_address.address_type, + peer_address=self.peer_address, + advertising_channel_map=7, + advertising_filter_policy=0, + ), + check_result=True, + ) + + # Enable advertising + await self.device.send_command( + HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), + check_result=True, + ) async def stop(self) -> None: - await self.device.stop_legacy_advertising() + # Disable advertising + await self.device.send_command( + HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), + check_result=True, + ) # ----------------------------------------------------------------------------- @dataclass -class ExtendedAdvertiser(CompositeEventEmitter): +class AdvertisingEventProperties: + is_connectable: bool = True + is_scannable: bool = False + is_directed: bool = False + is_high_duty_cycle_directed_connectable: bool = False + is_legacy: bool = False + is_anonymous: bool = False + include_tx_power: bool = False + + def __int__(self) -> int: + properties = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0) + ) + if self.is_connectable: + properties |= properties.CONNECTABLE_ADVERTISING + if self.is_scannable: + properties |= properties.SCANNABLE_ADVERTISING + if self.is_directed: + properties |= properties.DIRECTED_ADVERTISING + if self.is_high_duty_cycle_directed_connectable: + properties |= properties.HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING + if self.is_legacy: + properties |= properties.USE_LEGACY_ADVERTISING_PDUS + if self.is_anonymous: + properties |= properties.ANONYMOUS_ADVERTISING + if self.include_tx_power: + properties |= properties.INCLUDE_TX_POWER + + return int(properties) + + @classmethod + def from_advertising_type( + cls: Type[AdvertisingEventProperties], + advertising_type: AdvertisingType, + ) -> AdvertisingEventProperties: + return cls( + is_connectable=advertising_type.is_connectable, + is_scannable=advertising_type.is_scannable, + is_directed=advertising_type.is_directed, + is_high_duty_cycle_directed_connectable=advertising_type.is_high_duty_cycle_directed_connectable, + is_legacy=True, + is_anonymous=False, + include_tx_power=False, + ) + + +# ----------------------------------------------------------------------------- +# TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10 +AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap + + +# ----------------------------------------------------------------------------- +@dataclass +class AdvertisingParameters: + # pylint: disable=line-too-long + advertising_event_properties: AdvertisingEventProperties = field( + default_factory=AdvertisingEventProperties + ) + primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL + primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL + primary_advertising_channel_map: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap = ( + AdvertisingChannelMap.CHANNEL_37 + | AdvertisingChannelMap.CHANNEL_38 + | AdvertisingChannelMap.CHANNEL_39 + ) + own_address_type: OwnAddressType = OwnAddressType.RANDOM + peer_address: Address = Address.ANY + advertising_filter_policy: int = 0 + advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER + primary_advertising_phy: Phy = Phy.LE_1M + secondary_advertising_max_skip: int = 0 + secondary_advertising_phy: Phy = Phy.LE_1M + advertising_sid: int = 0 + enable_scan_request_notifications: bool = False + primary_advertising_phy_options: int = 0 + secondary_advertising_phy_options: int = 0 + + +# ----------------------------------------------------------------------------- +@dataclass +class PeriodicAdvertisingParameters: + # TODO implement this class + pass + + +# ----------------------------------------------------------------------------- +@dataclass +class AdvertisingSet(EventEmitter): device: Device - handle: int - advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties - own_address_type: OwnAddressType + advertising_handle: int auto_restart: bool - advertising_data: Optional[bytes] - scan_response_data: Optional[bytes] + random_address: Optional[Address] + advertising_parameters: AdvertisingParameters + advertising_data: bytes + scan_response_data: bytes + periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] + periodic_advertising_data: bytes + selected_tx_power: int = 0 + enabled: bool = False def __post_init__(self) -> None: super().__init__() + async def set_advertising_parameters( + self, advertising_parameters: AdvertisingParameters + ) -> None: + # Compliance check + if ( + not advertising_parameters.advertising_event_properties.is_legacy + and advertising_parameters.advertising_event_properties.is_connectable + and advertising_parameters.advertising_event_properties.is_scannable + ): + logger.warning( + "non-legacy extended advertising event properties may not be both " + "connectable and scannable" + ) + + response = await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Parameters_Command( + advertising_handle=self.advertising_handle, + advertising_event_properties=int( + advertising_parameters.advertising_event_properties + ), + primary_advertising_interval_min=( + int(advertising_parameters.primary_advertising_interval_min / 0.625) + ), + primary_advertising_interval_max=( + int(advertising_parameters.primary_advertising_interval_min / 0.625) + ), + primary_advertising_channel_map=int( + advertising_parameters.primary_advertising_channel_map + ), + own_address_type=advertising_parameters.own_address_type, + peer_address_type=advertising_parameters.peer_address.address_type, + peer_address=advertising_parameters.peer_address, + advertising_tx_power=advertising_parameters.advertising_tx_power, + advertising_filter_policy=( + advertising_parameters.advertising_filter_policy + ), + primary_advertising_phy=advertising_parameters.primary_advertising_phy, + secondary_advertising_max_skip=( + advertising_parameters.secondary_advertising_max_skip + ), + secondary_advertising_phy=( + advertising_parameters.secondary_advertising_phy + ), + advertising_sid=advertising_parameters.advertising_sid, + scan_request_notification_enable=( + 1 if advertising_parameters.enable_scan_request_notifications else 0 + ), + ), + check_result=True, + ) + self.selected_tx_power = response.return_parameters.selected_tx_power + self.advertising_parameters = advertising_parameters + + async def set_advertising_data(self, advertising_data: bytes) -> None: + # pylint: disable=line-too-long + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Data_Command( + advertising_handle=self.advertising_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, + advertising_data=advertising_data, + ), + check_result=True, + ) + self.advertising_data = advertising_data + + async def set_scan_response_data(self, scan_response_data: bytes) -> None: + # pylint: disable=line-too-long + if ( + scan_response_data + and not self.advertising_parameters.advertising_event_properties.is_scannable + ): + logger.warning( + "ignoring attempt to set non-empty scan response data on non-scannable " + "advertising set" + ) + return + + await self.device.send_command( + HCI_LE_Set_Extended_Scan_Response_Data_Command( + advertising_handle=self.advertising_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, + scan_response_data=scan_response_data, + ), + check_result=True, + ) + self.scan_response_data = scan_response_data + + async def set_periodic_advertising_parameters( + self, advertising_parameters: PeriodicAdvertisingParameters + ) -> None: + # TODO: send command + self.periodic_advertising_parameters = advertising_parameters + + async def set_periodic_advertising_data(self, advertising_data: bytes) -> None: + # TODO: send command + self.periodic_advertising_data = advertising_data + + async def set_random_address(self, random_address: Address) -> None: + await self.device.send_command( + HCI_LE_Set_Advertising_Set_Random_Address_Command( + advertising_handle=self.advertising_handle, + random_address=(random_address or self.device.random_address), + ), + check_result=True, + ) + + async def start( + self, duration: float = 0.0, max_advertising_events: int = 0 + ) -> None: + """ + Start advertising. + + Args: + duration: How long to advertise for, in seconds. Use 0 (the default) for + an unlimited duration, unless this advertising set is a High Duty Cycle + Directed Advertisement type. + max_advertising_events: Maximum number of events to advertise for. Use 0 + (the default) for an unlimited number of advertisements. + """ + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=1, + advertising_handles=[self.advertising_handle], + durations=[round(duration * 100)], + max_extended_advertising_events=[max_advertising_events], + ), + check_result=True, + ) + self.enabled = True + + self.emit('start') + + async def start_periodic(self, include_adi: bool = False) -> None: + await self.device.send_command( + HCI_LE_Set_Periodic_Advertising_Enable_Command( + enable=1 | (2 if include_adi else 0), + advertising_handles=self.advertising_handle, + ), + check_result=True, + ) + + self.emit('start_periodic') + async def stop(self) -> None: - await self.device.stop_extended_advertising(self.handle) + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=0, + advertising_handles=[self.advertising_handle], + durations=[0], + max_extended_advertising_events=[0], + ), + check_result=True, + ) + self.enabled = False + + self.emit('stop') + + async def stop_periodic(self) -> None: + await self.device.send_command( + HCI_LE_Set_Periodic_Advertising_Enable_Command( + enable=0, + advertising_handles=self.advertising_handle, + ), + check_result=True, + ) + + self.emit('stop_periodic') + + async def remove(self) -> None: + await self.device.send_command( + HCI_LE_Remove_Advertising_Set_Command( + advertising_handle=self.advertising_handle + ), + check_result=True, + ) + del self.device.extended_advertising_sets[self.advertising_handle] + + def on_termination(self, status: int) -> None: + self.enabled = False + self.emit('termination', status) # ----------------------------------------------------------------------------- @@ -678,9 +1007,6 @@ class Connection(CompositeEventEmitter): gatt_client: gatt_client.Client pairing_peer_io_capability: Optional[int] pairing_peer_authentication_requirements: Optional[int] - advertiser_after_disconnection: Union[ - LegacyAdvertiser, ExtendedAdvertiser, None - ] = None @composite_listener class Listener: @@ -920,7 +1246,8 @@ def __str__(self): return ( f'Connection(handle=0x{self.handle:04X}, ' f'role={self.role_name}, ' - f'address={self.peer_address})' + f'self_address={self.self_address}, ' + f'peer_address={self.peer_address})' ) @@ -1033,7 +1360,7 @@ def with_connection_from_handle(function): @functools.wraps(function) def wrapper(self, connection_handle, *args, **kwargs): if (connection := self.lookup_connection(connection_handle)) is None: - raise ValueError(f"no connection for handle: 0x{connection_handle:04x}") + raise ValueError(f'no connection for handle: 0x{connection_handle:04x}') return function(self, connection, *args, **kwargs) return wrapper @@ -1100,7 +1427,6 @@ class Device(CompositeEventEmitter): advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] config: DeviceConfiguration legacy_advertiser: Optional[LegacyAdvertiser] - extended_advertisers: Dict[int, ExtendedAdvertiser] sco_links: Dict[int, ScoLink] cis_links: Dict[int, CisLink] _pending_cis: Dict[int, Tuple[int, int]] @@ -1202,8 +1528,6 @@ def __init__( self.classic_pending_accepts = { Address.ANY: [] } # Futures, by BD address OR [Futures] for Address.ANY - self.legacy_advertiser = None - self.extended_advertisers = {} # Own address type cache self.connect_own_address_type = None @@ -1216,10 +1540,6 @@ def __init__( self.name = config.name self.random_address = config.address self.class_of_device = config.class_of_device - self.scan_response_data = config.scan_response_data - self.advertising_data = config.advertising_data - self.advertising_interval_min = config.advertising_interval_min - self.advertising_interval_max = config.advertising_interval_max self.keystore = None self.irk = config.irk self.le_enabled = config.le_enabled @@ -1234,6 +1554,22 @@ def __init__( self.classic_accept_any = config.classic_accept_any self.address_resolution_offload = config.address_resolution_offload + # Extended advertising. + self.extended_advertising_sets: Dict[int, AdvertisingSet] = {} + + # Legacy advertising. + # The advertising and scan response data, as well as the advertising interval + # values are stored as properties of this object for convenience so that they + # can be initialized from a config object, and for backward compatibility for + # client code that may set those values directly before calling + # start_advertising(). + self.legacy_advertising_set: Optional[AdvertisingSet] = None + self.legacy_advertiser: Optional[LegacyAdvertiser] = None + self.advertising_data = config.advertising_data + self.scan_response_data = config.scan_response_data + self.advertising_interval_min = config.advertising_interval_min + self.advertising_interval_max = config.advertising_interval_max + for service in config.gatt_services: characteristics = [] for characteristic in service.get("characteristics", []): @@ -1242,7 +1578,8 @@ def __init__( # Leave this check until 5/25/2023 if descriptor.get("permission", False): raise Exception( - "Error parsing Device Config's GATT Services. The key 'permission' must be renamed to 'permissions'" + "Error parsing Device Config's GATT Services. " + "The key 'permission' must be renamed to 'permissions'" ) new_descriptor = Descriptor( attribute_type=descriptor["descriptor_type"], @@ -1619,290 +1956,264 @@ def supports_le_phy(self, phy): if phy not in feature_map: raise ValueError('invalid PHY') - return self.host.supports_le_features(feature_map[phy]) + return self.supports_le_features(feature_map[phy]) + + @property + def supports_le_extended_advertising(self): + return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING) - @deprecated("Please use start_legacy_advertising.") async def start_advertising( self, advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, target: Optional[Address] = None, own_address_type: int = OwnAddressType.RANDOM, auto_restart: bool = False, - ) -> None: - await self.start_legacy_advertising( - advertising_type=advertising_type, - target=target, - own_address_type=OwnAddressType(own_address_type), - auto_restart=auto_restart, - ) - - async def start_legacy_advertising( - self, - advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, - target: Optional[Address] = None, - own_address_type: OwnAddressType = OwnAddressType.RANDOM, - auto_restart: bool = False, advertising_data: Optional[bytes] = None, scan_response_data: Optional[bytes] = None, - ) -> LegacyAdvertiser: - """Starts an legacy advertisement. + advertising_interval_min: Optional[int] = None, + advertising_interval_max: Optional[int] = None, + ) -> None: + """Start legacy advertising. - Args: - advertising_type: Advertising type passed to HCI_LE_Set_Advertising_Parameters_Command. - target: Directed advertising target. Directed type should be set in advertising_type arg. - own_address_type: own address type to use in the advertising. - auto_restart: whether the advertisement will be restarted after disconnection. - scan_response_data: raw scan response. - advertising_data: raw advertising data. + If the controller supports it, extended advertising commands with legacy PDUs + will be used to advertise. If not, legacy advertising commands will be used. - Returns: - LegacyAdvertiser object containing the metadata of advertisement. + Args: + advertising_type: + Type of advertising events. + target: + Peer address for directed advertising target. + (Ignored if `advertising_type` is not directed) + own_address_type: + Own address type to use in the advertising. + auto_restart: + Whether the advertisement will be restarted after disconnection. + advertising_data: + Raw advertising data. If None, the value of the property + self.advertising_data will be used. + scan_response_data: + Raw scan response. If None, the value of the property + self.scan_response_data will be used. + advertising_interval_min: + Minimum advertising interval, in milliseconds. If None, the value of the + property self.advertising_interval_min will be used. + advertising_interval_max: + Maximum advertising interval, in milliseconds. If None, the value of the + property self.advertising_interval_max will be used. """ - if self.extended_advertisers: - logger.warning( - 'Trying to start Legacy and Extended Advertising at the same time!' - ) - - # If we're advertising, stop first - if self.legacy_advertiser: - await self.stop_advertising() - - # Set/update the advertising data if the advertising type allows it - if advertising_type.has_data: - await self.send_command( - HCI_LE_Set_Advertising_Data_Command( - advertising_data=advertising_data or self.advertising_data or b'' - ), - check_result=True, - ) - - # Set/update the scan response data if the advertising is scannable - if advertising_type.is_scannable: - await self.send_command( - HCI_LE_Set_Scan_Response_Data_Command( - scan_response_data=scan_response_data - or self.scan_response_data - or b'' - ), - check_result=True, - ) + # Update backing properties. + if advertising_data is not None: + self.advertising_data = advertising_data + if scan_response_data is not None: + self.scan_response_data = scan_response_data + if advertising_interval_min is not None: + self.advertising_interval_min = advertising_interval_min + if advertising_interval_max is not None: + self.advertising_interval_max = advertising_interval_max # Decide what peer address to use if advertising_type.is_directed: if target is None: - raise ValueError('directed advertising requires a target address') - + raise ValueError('directed advertising requires a target') peer_address = target - peer_address_type = target.address_type else: peer_address = Address.ANY - peer_address_type = Address.ANY.address_type - # Set the advertising parameters - await self.send_command( - HCI_LE_Set_Advertising_Parameters_Command( - advertising_interval_min=self.advertising_interval_min, - advertising_interval_max=self.advertising_interval_max, - advertising_type=int(advertising_type), - own_address_type=own_address_type, - peer_address_type=peer_address_type, + # If we're already advertising, stop now because we'll be re-creating + # a new advertiser or advertising set. + await self.stop_advertising() + assert self.legacy_advertiser is None + assert self.legacy_advertising_set is None + + if self.supports_le_extended_advertising: + # Use extended advertising commands with legacy PDUs. + self.legacy_advertising_set = await self.create_advertising_set( + auto_start=True, + auto_restart=auto_restart, + random_address=self.random_address, + advertising_parameters=AdvertisingParameters( + advertising_event_properties=( + AdvertisingEventProperties.from_advertising_type( + advertising_type + ) + ), + primary_advertising_interval_min=self.advertising_interval_min, + primary_advertising_interval_max=self.advertising_interval_max, + own_address_type=OwnAddressType(own_address_type), + peer_address=peer_address, + ), + advertising_data=( + self.advertising_data if advertising_type.has_data else b'' + ), + scan_response_data=( + self.scan_response_data if advertising_type.is_scannable else b'' + ), + ) + else: + # Use legacy commands. + self.legacy_advertiser = LegacyAdvertiser( + device=self, + advertising_type=advertising_type, + own_address_type=OwnAddressType(own_address_type), peer_address=peer_address, - advertising_channel_map=7, - advertising_filter_policy=0, - ), - check_result=True, - ) + auto_restart=auto_restart, + ) - # Enable advertising - await self.send_command( - HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), - check_result=True, - ) + await self.legacy_advertiser.start() - self.legacy_advertiser = LegacyAdvertiser( - device=self, - advertising_type=advertising_type, - own_address_type=own_address_type, - auto_restart=auto_restart, - advertising_data=advertising_data, - scan_response_data=scan_response_data, - ) - return self.legacy_advertiser - - @deprecated("Please use stop_legacy_advertising.") async def stop_advertising(self) -> None: - await self.stop_legacy_advertising() - - async def stop_legacy_advertising(self) -> None: + """Stop legacy advertising.""" # Disable advertising - if self.legacy_advertiser: - await self.send_command( - HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), - check_result=True, - ) - + if self.legacy_advertising_set: + await self.legacy_advertising_set.stop() + self.legacy_advertising_set = None + elif self.legacy_advertiser: + await self.legacy_advertiser.stop() self.legacy_advertiser = None - @experimental('Extended Advertising is still experimental - Might be changed soon.') - async def start_extended_advertising( + async def create_advertising_set( self, - advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING, - target: Address = Address.ANY, - own_address_type: OwnAddressType = OwnAddressType.RANDOM, - auto_restart: bool = True, - advertising_data: Optional[bytes] = None, - scan_response_data: Optional[bytes] = None, - ) -> ExtendedAdvertiser: - """Starts an extended advertising set. + advertising_parameters: Optional[AdvertisingParameters] = None, + random_address: Optional[Address] = None, + advertising_data: bytes = b'', + scan_response_data: bytes = b'', + periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None, + periodic_advertising_data: bytes = b'', + auto_start: bool = True, + auto_restart: bool = False, + ) -> AdvertisingSet: + """ + Create an advertising set. + + This method allows the creation of advertising sets for controllers that + support extended advertising. Args: - advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command - target: Directed advertising target. Directed property should be set in advertising_properties arg. - own_address_type: own address type to use in the advertising. - auto_restart: whether the advertisement will be restarted after disconnection. - advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent. - scan_response_data: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent. + advertising_parameters: + The parameters to use for this set. If None, default parameters are used. + random_address: + The random address to use (only relevant when the parameters specify that + own_address_type is random). + advertising_data: + Initial value for the set's advertising data. + scan_response_data: + Initial value for the set's scan response data. + periodic_advertising_parameters: + The parameters to use for periodic advertising (if needed). + periodic_advertising_data: + Initial value for the set's periodic advertising data. + auto_start: + True if the set should be automatically started upon creation. + auto_restart: + True if the set should be automatically restated after a disconnection. Returns: - ExtendedAdvertiser object containing the metadata of advertisement. + An AdvertisingSet instance. """ - if self.legacy_advertiser: - logger.warning( - 'Trying to start Legacy and Extended Advertising at the same time!' + # Allocate a new handle + try: + advertising_handle = next( + handle + for handle in range( + DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, + DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, + ) + if handle not in self.extended_advertising_sets ) + except StopIteration as exc: + raise RuntimeError("all valid advertising handles already in use") from exc + + # Instantiate default values + if advertising_parameters is None: + advertising_parameters = AdvertisingParameters() - adv_handle = -1 - # Find a free handle - for i in range( - DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, - DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, + # Use the device's random address if a random address is needed but none was + # provided. + if ( + advertising_parameters.own_address_type + in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) + and random_address is None ): - if i not in self.extended_advertisers: - adv_handle = i - break + random_address = self.random_address + + # Create the object that represents the set. + advertising_set = AdvertisingSet( + device=self, + advertising_handle=advertising_handle, + auto_restart=auto_restart, + random_address=random_address, + advertising_parameters=advertising_parameters, + advertising_data=advertising_data, + scan_response_data=scan_response_data, + periodic_advertising_parameters=periodic_advertising_parameters, + periodic_advertising_data=periodic_advertising_data, + ) - if adv_handle == -1: - raise InvalidStateError('No available advertising set.') + # Create the set in the controller. + await advertising_set.set_advertising_parameters(advertising_parameters) + # Update the set in the controller. try: - # Set the advertising parameters - await self.send_command( - HCI_LE_Set_Extended_Advertising_Parameters_Command( - advertising_handle=adv_handle, - advertising_event_properties=advertising_properties, - primary_advertising_interval_min=self.advertising_interval_min, - primary_advertising_interval_max=self.advertising_interval_max, - primary_advertising_channel_map=( - HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37 - | HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38 - | HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39 - ), - own_address_type=own_address_type, - peer_address_type=target.address_type, - peer_address=target, - advertising_tx_power=7, - advertising_filter_policy=0, - primary_advertising_phy=1, # LE 1M - secondary_advertising_max_skip=0, - secondary_advertising_phy=1, # LE 1M - advertising_sid=0, - scan_request_notification_enable=0, - ), - check_result=True, - ) + if random_address: + await advertising_set.set_random_address(random_address) - # Set the advertising data if present - if advertising_data is not None: - await self.send_command( - HCI_LE_Set_Extended_Advertising_Data_Command( - advertising_handle=adv_handle, - operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, - fragment_preference=0x01, # Should not fragment - advertising_data=advertising_data, - ), - check_result=True, - ) + if advertising_data: + await advertising_set.set_advertising_data(advertising_data) - # Set the scan response if present - if scan_response_data is not None: - await self.send_command( - HCI_LE_Set_Extended_Scan_Response_Data_Command( - advertising_handle=adv_handle, - operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, - fragment_preference=0x01, # Should not fragment - scan_response_data=scan_response_data, - ), - check_result=True, - ) + if scan_response_data: + await advertising_set.set_scan_response_data(scan_response_data) - if own_address_type in ( - OwnAddressType.RANDOM, - OwnAddressType.RESOLVABLE_OR_RANDOM, - ): - await self.send_command( - HCI_LE_Set_Advertising_Set_Random_Address_Command( - advertising_handle=adv_handle, - random_address=self.random_address, - ), - check_result=True, - ) + if periodic_advertising_parameters: + # TODO: call LE Set Periodic Advertising Parameters command + raise NotImplementedError('periodic advertising not yet supported') + + if periodic_advertising_data: + # TODO: call LE Set Periodic Advertising Data command + raise NotImplementedError('periodic advertising not yet supported') - # Enable advertising - await self.send_command( - HCI_LE_Set_Extended_Advertising_Enable_Command( - enable=1, - advertising_handles=[adv_handle], - durations=[0], # Forever - max_extended_advertising_events=[0], # Infinite - ), - check_result=True, - ) except HCI_Error as error: - # When any step fails, cleanup the advertising handle. + # Remove the advertising set so that it doesn't stay dangling in the + # controller. await self.send_command( - HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), + HCI_LE_Remove_Advertising_Set_Command( + advertising_handle=advertising_data + ), check_result=False, ) raise error - advertiser = self.extended_advertisers[adv_handle] = ExtendedAdvertiser( - device=self, - handle=adv_handle, - advertising_properties=advertising_properties, - own_address_type=own_address_type, - auto_restart=auto_restart, - advertising_data=advertising_data, - scan_response_data=scan_response_data, - ) - return advertiser + # Remember the set. + self.extended_advertising_sets[advertising_handle] = advertising_set - @experimental('Extended Advertising is still experimental - Might be changed soon.') - async def stop_extended_advertising(self, adv_handle: int) -> None: - """Stops an extended advertising set. + # Try to start the set if requested. + if auto_start: + try: + # pylint: disable=line-too-long + duration = ( + DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION + if advertising_parameters.advertising_event_properties.is_high_duty_cycle_directed_connectable + else 0 + ) + await advertising_set.start(duration=duration) + except Exception as error: + logger.exception(f'failed to start advertising set: {error}') + await advertising_set.remove() + raise - Args: - adv_handle: Handle of the advertising set to stop. - """ - # Disable advertising - await self.send_command( - HCI_LE_Set_Extended_Advertising_Enable_Command( - enable=0, - advertising_handles=[adv_handle], - durations=[0], - max_extended_advertising_events=[0], - ), - check_result=True, - ) - # Remove advertising set - await self.send_command( - HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), - check_result=True, - ) - del self.extended_advertisers[adv_handle] + return advertising_set @property def is_advertising(self): - return self.legacy_advertiser or self.extended_advertisers + if self.legacy_advertiser: + return True + + if self.legacy_advertising_set and self.legacy_advertising_set.enabled: + return True + + return any( + advertising_set.enabled + for advertising_set in self.extended_advertising_sets.values() + ) async def start_scanning( self, @@ -1929,9 +2240,7 @@ async def start_scanning( self.advertisement_accumulators = {} # Enable scanning - if not legacy and self.supports_le_features( - LeFeatureMask.LE_EXTENDED_ADVERTISING - ): + if not legacy and self.supports_le_extended_advertising: # Set the scanning parameters scan_type = ( HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING @@ -2007,9 +2316,9 @@ async def start_scanning( self.scanning_is_passive = not active self.scanning = True - async def stop_scanning(self) -> None: + async def stop_scanning(self, legacy: bool = False) -> None: # Disable scanning - if self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING): + if not legacy and self.supports_le_extended_advertising: await self.send_command( HCI_LE_Set_Extended_Scan_Enable_Command( enable=0, filter_duplicates=0, duration=0, period=0 @@ -3224,6 +3533,76 @@ async def indicate_subscriber(self, connection, attribute, value=None, force=Fal async def indicate_subscribers(self, attribute, value=None, force=False): await self.gatt_server.indicate_subscribers(attribute, value, force) + @host_event_handler + def on_advertising_set_termination( + self, + status, + advertising_handle, + connection_handle, + number_of_completed_extended_advertising_events, + ): + if not ( + advertising_set := ( + self.extended_advertising_sets.get(advertising_handle) + or self.legacy_advertising_set + ) + ): + logger.warning(f'advertising set {advertising_handle} not found') + return + + advertising_set.on_termination(status) + + if status != HCI_SUCCESS: + logger.debug( + f'advertising set {advertising_handle} ' + f'terminated with status {status}' + ) + return + + if not (connection := self.lookup_connection(connection_handle)): + logger.warning(f'no connection for handle 0x{connection_handle:04x}') + return + + # Update the connection address. + connection.self_address = ( + advertising_set.random_address + if advertising_set.advertising_parameters.own_address_type + in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) + else self.public_address + ) + + # Setup auto-restart of the advertising set if needed. + if advertising_set.auto_restart: + connection.once( + 'disconnection', + lambda _: self.abort_on('flush', advertising_set.start()), + ) + + self._emit_le_connection(connection) + + def _emit_le_connection(self, connection: Connection) -> None: + # If supported, read which PHY we're connected with before + # notifying listeners of the new connection. + if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): + + async def read_phy(): + result = await self.send_command( + HCI_LE_Read_PHY_Command(connection_handle=connection.handle), + check_result=True, + ) + connection.phy = ConnectionPHY( + result.return_parameters.tx_phy, result.return_parameters.rx_phy + ) + # Emit an event to notify listeners of the new connection + self.emit('connection', connection) + + # Do so asynchronously to not block the current event handler + connection.abort_on('disconnection', read_phy()) + + return + + self.emit('connection', connection) + @host_event_handler def on_connection( self, @@ -3242,8 +3621,6 @@ def on_connection( 'new connection reuses the same handle as a previous connection' ) - peer_resolvable_address = None - if transport == BT_BR_EDR_TRANSPORT: # Create a new connection connection = self.pending_connections.pop(peer_address) @@ -3252,76 +3629,76 @@ def on_connection( # Emit an event to notify listeners of the new connection self.emit('connection', connection) - else: - # Resolve the peer address if we can - if self.address_resolver: - if peer_address.is_resolvable: - resolved_address = self.address_resolver.resolve(peer_address) - if resolved_address is not None: - logger.debug(f'*** Address resolved as {resolved_address}') - peer_resolvable_address = peer_address - peer_address = resolved_address - - # Guess which own address type is used for this connection. - # This logic is somewhat correct but may need to be improved - # when multiple advertising are run simultaneously. - advertiser = None - if self.connect_own_address_type is not None: - own_address_type = self.connect_own_address_type - elif self.legacy_advertiser: - own_address_type = self.legacy_advertiser.own_address_type - # Store advertiser for restarting - it's only required for legacy, since - # extended advertisement produces HCI_Advertising_Set_Terminated. - if self.legacy_advertiser.auto_restart: - advertiser = self.legacy_advertiser - else: - # For extended advertisement, determining own address type later. - own_address_type = OwnAddressType.RANDOM - - if own_address_type in ( - OwnAddressType.PUBLIC, - OwnAddressType.RESOLVABLE_OR_PUBLIC, - ): - self_address = self.public_address - else: - self_address = self.random_address - # Create a new connection - connection = Connection( - self, - connection_handle, - transport, - self_address, - peer_address, - peer_resolvable_address, - role, - connection_parameters, - ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), - ) - connection.advertiser_after_disconnection = advertiser - self.connections[connection_handle] = connection + return - # If supported, read which PHY we're connected with before - # notifying listeners of the new connection. - if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): + # Resolve the peer address if we can + peer_resolvable_address = None + if self.address_resolver: + if peer_address.is_resolvable: + resolved_address = self.address_resolver.resolve(peer_address) + if resolved_address is not None: + logger.debug(f'*** Address resolved as {resolved_address}') + peer_resolvable_address = peer_address + peer_address = resolved_address + + self_address = None + if role == HCI_CENTRAL_ROLE: + own_address_type = self.connect_own_address_type + assert own_address_type is not None + else: + if self.supports_le_extended_advertising: + # We'll know the address when the advertising set terminates, + # Use a temporary placeholder value for self_address. + self_address = Address.ANY_RANDOM + else: + # We were connected via a legacy advertisement. + if self.legacy_advertiser: + own_address_type = self.legacy_advertiser.own_address_type + self.legacy_advertiser = None + else: + # This should not happen, but just in case, pick a default. + logger.warning("connection without an advertiser") + self_address = self.random_address + + if self_address is None: + self_address = ( + self.public_address + if own_address_type + in ( + OwnAddressType.PUBLIC, + OwnAddressType.RESOLVABLE_OR_PUBLIC, + ) + else self.random_address + ) - async def read_phy(): - result = await self.send_command( - HCI_LE_Read_PHY_Command(connection_handle=connection_handle), - check_result=True, - ) - connection.phy = ConnectionPHY( - result.return_parameters.tx_phy, result.return_parameters.rx_phy - ) - # Emit an event to notify listeners of the new connection - self.emit('connection', connection) + # Create a connection. + connection = Connection( + self, + connection_handle, + transport, + self_address, + peer_address, + peer_resolvable_address, + role, + connection_parameters, + ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), + ) + self.connections[connection_handle] = connection - # Do so asynchronously to not block the current event handler - connection.abort_on('disconnection', read_phy()) + if ( + role == HCI_PERIPHERAL_ROLE + and self.legacy_advertiser + and self.legacy_advertiser.auto_restart + ): + connection.once( + 'disconnection', + lambda _: self.abort_on('flush', self.legacy_advertiser.start()), + ) - else: - # Emit an event to notify listeners of the new connection - self.emit('connection', connection) + if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising: + # We can emit now, we have all the info we need + self._emit_le_connection(connection) @host_event_handler def on_connection_failure(self, transport, peer_address, error_code): @@ -3406,32 +3783,6 @@ def on_disconnection(self, connection_handle: int, reason: int) -> None: # Cleanup subsystems that maintain per-connection state self.gatt_server.on_disconnection(connection) - - # Restart advertising if auto-restart is enabled - if advertiser := connection.advertiser_after_disconnection: - logger.debug('restarting advertising') - if isinstance(advertiser, LegacyAdvertiser): - self.abort_on( - 'flush', - self.start_legacy_advertising( - advertising_type=advertiser.advertising_type, - own_address_type=advertiser.own_address_type, - advertising_data=advertiser.advertising_data, - scan_response_data=advertiser.scan_response_data, - auto_restart=True, - ), - ) - elif isinstance(advertiser, ExtendedAdvertiser): - self.abort_on( - 'flush', - self.start_extended_advertising( - advertising_properties=advertiser.advertising_properties, - own_address_type=advertiser.own_address_type, - advertising_data=advertiser.advertising_data, - scan_response_data=advertiser.scan_response_data, - auto_restart=True, - ), - ) elif sco_link := self.sco_links.pop(connection_handle, None): sco_link.emit('disconnection', reason) elif cis_link := self.cis_links.pop(connection_handle, None): @@ -3754,30 +4105,6 @@ def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> N if sco_link := self.sco_links.get(sco_handle): sco_link.emit('pdu', packet) - # [LE only] - @host_event_handler - @experimental('Only for testing') - def on_advertising_set_termination( - self, - status: int, - advertising_handle: int, - connection_handle: int, - ) -> None: - if status == HCI_SUCCESS: - connection = self.lookup_connection(connection_handle) - if advertiser := self.extended_advertisers.pop(advertising_handle, None): - if connection: - if advertiser.auto_restart: - connection.advertiser_after_disconnection = advertiser - if advertiser.own_address_type in ( - OwnAddressType.PUBLIC, - OwnAddressType.RESOLVABLE_OR_PUBLIC, - ): - connection.self_address = self.public_address - else: - connection.self_address = self.random_address - advertiser.emit('termination', status) - # [LE only] @host_event_handler @with_connection_from_handle diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py index d5a8ec7c..2079a658 100644 --- a/bumble/gatt_client.py +++ b/bumble/gatt_client.py @@ -1068,7 +1068,7 @@ def on_gatt_pdu(self, att_pdu: ATT_PDU) -> None: logger.warning('!!! unexpected response, there is no pending request') return - # Sanity check: the response should match the pending request unless it is + # The response should match the pending request unless it is # an error response if att_pdu.op_code != ATT_ERROR_RESPONSE: expected_response_name = self.pending_request.name.replace( diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py index d574d520..3be41851 100644 --- a/bumble/gatt_server.py +++ b/bumble/gatt_server.py @@ -328,7 +328,7 @@ def write_cccd( f'handle=0x{characteristic.handle:04X}: {value.hex()}' ) - # Sanity check + # Check parameters if len(value) != 2: logger.warning('CCCD value not 2 bytes long') return diff --git a/bumble/hci.py b/bumble/hci.py index 66ce415c..5d22ea07 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -23,7 +23,7 @@ import logging import secrets import struct -from typing import Any, Dict, Callable, Optional, Type, Union, List +from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union from bumble import crypto from .colors import color @@ -223,41 +223,47 @@ def phy_list_to_bits(phys): # HCI Subevent Codes -HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01 -HCI_LE_ADVERTISING_REPORT_EVENT = 0x02 -HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03 -HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04 -HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05 -HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06 -HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07 -HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08 -HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09 -HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A -HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B -HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C -HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D -HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E -HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F -HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10 -HCI_LE_SCAN_TIMEOUT_EVENT = 0x11 -HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12 -HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13 -HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14 -HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15 -HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16 -HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17 -HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18 -HCI_LE_CIS_ESTABLISHED_EVENT = 0X19 -HCI_LE_CIS_REQUEST_EVENT = 0X1A -HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B -HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C -HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D -HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E -HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F -HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20 -HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21 -HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22 -HCI_LE_SUBRATE_CHANGE_EVENT = 0X23 +HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01 +HCI_LE_ADVERTISING_REPORT_EVENT = 0x02 +HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03 +HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04 +HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05 +HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06 +HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07 +HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08 +HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09 +HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A +HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B +HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C +HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D +HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E +HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F +HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10 +HCI_LE_SCAN_TIMEOUT_EVENT = 0x11 +HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12 +HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13 +HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14 +HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15 +HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16 +HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17 +HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18 +HCI_LE_CIS_ESTABLISHED_EVENT = 0X19 +HCI_LE_CIS_REQUEST_EVENT = 0X1A +HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B +HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C +HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D +HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E +HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F +HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20 +HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21 +HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22 +HCI_LE_SUBRATE_CHANGE_EVENT = 0X23 +HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT = 0X24 +HCI_LE_PERIODIC_ADVERTISING_REPORT_V2_EVENT = 0X25 +HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_V2_EVENT = 0X26 +HCI_LE_PERIODIC_ADVERTISING_SUBEVENT_DATA_REQUEST_EVENT = 0X27 +HCI_LE_PERIODIC_ADVERTISING_RESPONSE_REPORT_EVENT = 0X28 +HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT = 0X29 # HCI Command @@ -650,47 +656,6 @@ def phy_list_to_bits(phys): # Command Status codes HCI_COMMAND_STATUS_PENDING = 0 -# LE Event Masks -HCI_LE_CONNECTION_COMPLETE_EVENT_MASK = (1 << 0) -HCI_LE_ADVERTISING_REPORT_EVENT_MASK = (1 << 1) -HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT_MASK = (1 << 2) -HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT_MASK = (1 << 3) -HCI_LE_LONG_TERM_KEY_REQUEST_EVENT_MASK = (1 << 4) -HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT_MASK = (1 << 5) -HCI_LE_DATA_LENGTH_CHANGE_EVENT_MASK = (1 << 6) -HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT_MASK = (1 << 7) -HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT_MASK = (1 << 8) -HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT_MASK = (1 << 9) -HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT_MASK = (1 << 10) -HCI_LE_PHY_UPDATE_COMPLETE_EVENT_MASK = (1 << 11) -HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT_MASK = (1 << 12) -HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT_MASK = (1 << 13) -HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT_MASK = (1 << 14) -HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT_MASK = (1 << 15) -HCI_LE_EXTENDED_SCAN_TIMEOUT_EVENT_MASK = (1 << 16) -HCI_LE_EXTENDED_ADVERTISING_SET_TERMINATED_EVENT_MASK = (1 << 17) -HCI_LE_SCAN_REQUEST_RECEIVED_EVENT_MASK = (1 << 18) -HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT_MASK = (1 << 19) -HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT_MASK = (1 << 20) -HCI_LE_CONNECTION_IQ_REPORT_EVENT_MASK = (1 << 21) -HCI_LE_CTE_REQUEST_FAILED_EVENT_MASK = (1 << 22) -HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT_MASK = (1 << 23) -HCI_LE_CIS_ESTABLISHED_EVENT_MASK = (1 << 24) -HCI_LE_CIS_REQUEST_EVENT_MASK = (1 << 25) -HCI_LE_CREATE_BIG_COMPLETE_EVENT_MASK = (1 << 26) -HCI_LE_TERMINATE_BIG_COMPLETE_EVENT_MASK = (1 << 27) -HCI_LE_BIG_SYNC_ESTABLISHED_EVENT_MASK = (1 << 28) -HCI_LE_BIG_SYNC_LOST_EVENT_MASK = (1 << 29) -HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT_MASK = (1 << 30) -HCI_LE_PATH_LOSS_THRESHOLD_EVENT_MASK = (1 << 31) -HCI_LE_TRANSMIT_POWER_REPORTING_EVENT_MASK = (1 << 32) -HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT_MASK = (1 << 33) -HCI_LE_SUBRATE_CHANGE_EVENT_MASK = (1 << 34) - -HCI_LE_EVENT_MASK_NAMES = { - mask: mask_name for (mask_name, mask) in globals().items() - if mask_name.startswith('HCI_LE_') and mask_name.endswith('_EVENT_MASK') -} # ACL HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0 @@ -732,15 +697,15 @@ def phy_list_to_bits(phys): class Phy(enum.IntEnum): - LE_1M = 0x01 - LE_2M = 0x02 - LE_CODED = 0x03 + LE_1M = HCI_LE_1M_PHY + LE_2M = HCI_LE_2M_PHY + LE_CODED = HCI_LE_CODED_PHY class PhyBit(enum.IntFlag): - LE_1M = 0b00000001 - LE_2M = 0b00000010 - LE_CODED = 0b00000100 + LE_1M = 1 << HCI_LE_1M_PHY_BIT + LE_2M = 1 << HCI_LE_2M_PHY_BIT + LE_CODED = 1 << HCI_LE_CODED_PHY_BIT # Connection Parameters @@ -2910,6 +2875,20 @@ class HCI_Set_Event_Mask_Command(HCI_Command): See Bluetooth spec @ 7.3.1 Set Event Mask Command ''' + @staticmethod + def mask(event_codes: Iterable[int]) -> bytes: + ''' + Compute the event mask value for a list of events. + ''' + # NOTE: this implementation takes advantage of the fact that as of version 5.4 + # of the core specification, the bit number for each event code is equal to one + # less than the event code. + # If future versions of the specification deviate from that, a different + # implementation would be needed. + return sum((1 << event_code - 1) for event_code in event_codes).to_bytes( + 8, 'little' + ) + # ----------------------------------------------------------------------------- @HCI_Command.command() @@ -3433,6 +3412,20 @@ class HCI_LE_Set_Event_Mask_Command(HCI_Command): See Bluetooth spec @ 7.8.1 LE Set Event Mask Command ''' + @staticmethod + def mask(event_codes: Iterable[int]) -> bytes: + ''' + Compute the event mask value for a list of events. + ''' + # NOTE: this implementation takes advantage of the fact that as of version 5.4 + # of the core specification, the bit number for each event code is equal to one + # less than the event code. + # If future versions of the specification deviate from that, a different + # implementation would be needed. + return sum((1 << event_code - 1) for event_code in event_codes).to_bytes( + 8, 'little' + ) + # ----------------------------------------------------------------------------- @HCI_Command.command( @@ -4040,13 +4033,16 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): ('advertising_sid', 1), ('scan_request_notification_enable', 1), ], - return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx__power', 1)], + return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx_power', 1)], ) class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): ''' See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command ''' + TX_POWER_NO_PREFERENCE = 0x7F + SHOULD_NOT_FRAGMENT = 0x01 + class AdvertisingProperties(enum.IntFlag): CONNECTABLE_ADVERTISING = 1 << 0 SCANNABLE_ADVERTISING = 1 << 1 @@ -4291,7 +4287,7 @@ def __str__(self): ('scanning_filter_policy:', self.scanning_filter_policy), ('scanning_phys: ', ','.join(scanning_phys_strs)), ] - for (i, scanning_phy_str) in enumerate(scanning_phys_strs): + for i, scanning_phy_str in enumerate(scanning_phys_strs): fields.append( ( f'{scanning_phy_str}.scan_type: ', @@ -4434,7 +4430,7 @@ def __str__(self): ('peer_address: ', str(self.peer_address)), ('initiating_phys: ', ','.join(initiating_phys_strs)), ] - for (i, initiating_phys_str) in enumerate(initiating_phys_strs): + for i, initiating_phys_str in enumerate(initiating_phys_strs): fields.append( ( f'{initiating_phys_str}.scan_interval: ', @@ -5321,7 +5317,7 @@ def __str__(self): ('status', 1), ('advertising_handle', 1), ('connection_handle', 2), - ('number_completed_extended_advertising_events', 1), + ('num_completed_extended_advertising_events', 1), ] ) class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event): @@ -6262,7 +6258,7 @@ def from_bytes(packet: bytes) -> HCI_IsoDataPacket: if ts_flag: if not should_include_sdu_info: - logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}') + logger.warning(f'Timestamp included when pb_flag={bin(pb_flag)}') time_stamp, *_ = struct.unpack_from(' None: self.current_data = None self.l2cap_pdu_length = 0 else: - # Sanity check + # Compliance check if len(self.current_data) > self.l2cap_pdu_length + 4: logger.warning('!!! ACL data exceeds L2CAP PDU') self.current_data = None diff --git a/bumble/host.py b/bumble/host.py index cbf14bcc..52a9a73a 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -28,59 +28,15 @@ from bumble.l2cap import L2CAP_PDU from bumble.snoop import Snooper from bumble import drivers - -from .hci import ( - Address, - HCI_ACL_DATA_PACKET, - HCI_COMMAND_PACKET, - HCI_EVENT_PACKET, - HCI_ISO_DATA_PACKET, - HCI_LE_READ_BUFFER_SIZE_COMMAND, - HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND, - HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, - HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, - HCI_READ_BUFFER_SIZE_COMMAND, - HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND, - HCI_RESET_COMMAND, - HCI_SUCCESS, - HCI_SUPPORTED_COMMANDS_FLAGS, - HCI_SYNCHRONOUS_DATA_PACKET, - HCI_VERSION_BLUETOOTH_CORE_4_0, - HCI_AclDataPacket, - HCI_AclDataPacketAssembler, - HCI_Command, - HCI_Command_Complete_Event, - HCI_Constant, - HCI_Error, - HCI_Event, - HCI_IsoDataPacket, - HCI_LE_Long_Term_Key_Request_Negative_Reply_Command, - HCI_LE_Long_Term_Key_Request_Reply_Command, - HCI_LE_Read_Buffer_Size_Command, - HCI_LE_Read_Local_Supported_Features_Command, - HCI_LE_Read_Suggested_Default_Data_Length_Command, - HCI_LE_Remote_Connection_Parameter_Request_Reply_Command, - HCI_LE_Set_Event_Mask_Command, - HCI_LE_Write_Suggested_Default_Data_Length_Command, - HCI_Link_Key_Request_Negative_Reply_Command, - HCI_Link_Key_Request_Reply_Command, - HCI_Packet, - HCI_Read_Buffer_Size_Command, - HCI_Read_Local_Supported_Commands_Command, - HCI_Read_Local_Version_Information_Command, - HCI_Reset_Command, - HCI_Set_Event_Mask_Command, - HCI_SynchronousDataPacket, - LeFeatureMask, -) -from .core import ( +from bumble import hci +from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT, ConnectionPHY, ConnectionParameters, ) -from .utils import AbortableEventEmitter -from .transport.common import TransportLostError +from bumble.utils import AbortableEventEmitter +from bumble.transport.common import TransportLostError if TYPE_CHECKING: from .transport.common import TransportSink, TransportSource @@ -100,15 +56,15 @@ def __init__( self, max_packet_size: int, max_in_flight: int, - send: Callable[[HCI_Packet], None], + send: Callable[[hci.HCI_Packet], None], ) -> None: self.max_packet_size = max_packet_size self.max_in_flight = max_in_flight self.in_flight = 0 self.send = send - self.packets: Deque[HCI_AclDataPacket] = collections.deque() + self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque() - def enqueue(self, packet: HCI_AclDataPacket) -> None: + def enqueue(self, packet: hci.HCI_AclDataPacket) -> None: self.packets.appendleft(packet) self.check_queue() @@ -140,11 +96,13 @@ def on_packets_completed(self, packet_count: int) -> None: # ----------------------------------------------------------------------------- class Connection: - def __init__(self, host: Host, handle: int, peer_address: Address, transport: int): + def __init__( + self, host: Host, handle: int, peer_address: hci.Address, transport: int + ): self.host = host self.handle = handle self.peer_address = peer_address - self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) + self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu) self.transport = transport acl_packet_queue: Optional[AclPacketQueue] = ( host.le_acl_packet_queue @@ -154,7 +112,7 @@ def __init__(self, host: Host, handle: int, peer_address: Address, transport: in assert acl_packet_queue self.acl_packet_queue = acl_packet_queue - def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None: + def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None: self.assembler.feed_packet(packet) def on_acl_pdu(self, pdu: bytes) -> None: @@ -165,14 +123,14 @@ def on_acl_pdu(self, pdu: bytes) -> None: # ----------------------------------------------------------------------------- @dataclasses.dataclass class ScoLink: - peer_address: Address + peer_address: hci.Address handle: int # ----------------------------------------------------------------------------- @dataclasses.dataclass class CisLink: - peer_address: Address + peer_address: hci.Address handle: int @@ -188,7 +146,7 @@ class Host(AbortableEventEmitter): long_term_key_provider: Optional[ Callable[[int, bytes, int], Awaitable[Optional[bytes]]] ] - link_key_provider: Optional[Callable[[Address], Awaitable[Optional[bytes]]]] + link_key_provider: Optional[Callable[[hci.Address], Awaitable[Optional[bytes]]]] def __init__( self, @@ -204,6 +162,8 @@ def __init__( self.sco_links = {} # SCO links, by connection handle self.pending_command = None self.pending_response = None + self.number_of_supported_advertising_sets = 0 + self.maximum_advertising_data_length = 31 self.local_version = None self.local_supported_commands = bytes(64) self.local_le_features = 0 @@ -223,7 +183,7 @@ def __init__( def find_connection_by_bd_addr( self, - bd_addr: Address, + bd_addr: hci.Address, transport: Optional[int] = None, check_address_type: bool = False, ) -> Optional[Connection]: @@ -265,49 +225,139 @@ async def reset(self, driver_factory=drivers.get_driver_for_host): # Send a reset command unless a driver has already done so. if reset_needed: - await self.send_command(HCI_Reset_Command(), check_result=True) + await self.send_command(hci.HCI_Reset_Command(), check_result=True) self.ready = True response = await self.send_command( - HCI_Read_Local_Supported_Commands_Command(), check_result=True + hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True ) self.local_supported_commands = response.return_parameters.supported_commands - if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): + if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): response = await self.send_command( - HCI_LE_Read_Local_Supported_Features_Command(), check_result=True + hci.HCI_LE_Read_Local_Supported_Features_Command(), check_result=True ) self.local_le_features = struct.unpack( ' Optional[TransportSink]: return self.hci_sink @@ -394,7 +468,7 @@ def set_packet_source(self, source: TransportSource) -> None: source.set_packet_sink(self) self.hci_metadata = getattr(source, 'metadata', self.hci_metadata) - def send_hci_packet(self, packet: HCI_Packet) -> None: + def send_hci_packet(self, packet: hci.HCI_Packet) -> None: logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}') if self.snooper: self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER) @@ -425,11 +499,12 @@ async def send_command(self, command, check_result=False): else: status = response.return_parameters.status - if status != HCI_SUCCESS: + if status != hci.HCI_SUCCESS: logger.warning( - f'{command.name} failed ({HCI_Constant.error_name(status)})' + f'{command.name} failed ' + f'({hci.HCI_Constant.error_name(status)})' ) - raise HCI_Error(status) + raise hci.HCI_Error(status) return response except Exception as error: @@ -442,8 +517,8 @@ async def send_command(self, command, check_result=False): self.pending_response = None # Use this method to send a command from a task - def send_command_sync(self, command: HCI_Command) -> None: - async def send_command(command: HCI_Command) -> None: + def send_command_sync(self, command: hci.HCI_Command) -> None: + async def send_command(command: hci.HCI_Command) -> None: await self.send_command(command) asyncio.create_task(send_command(command)) @@ -468,7 +543,7 @@ def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: pb_flag = 0 while bytes_remaining: data_total_length = min(bytes_remaining, packet_queue.max_packet_size) - acl_packet = HCI_AclDataPacket( + acl_packet = hci.HCI_AclDataPacket( connection_handle=connection_handle, pb_flag=pb_flag, bc_flag=0, @@ -483,7 +558,7 @@ def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: def supports_command(self, command): # Find the support flag position for this command - for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS): + for octet, flags in enumerate(hci.HCI_SUPPORTED_COMMANDS_FLAGS): for flag_position, value in enumerate(flags): if value == command: # Check if the flag is set @@ -498,16 +573,16 @@ def supports_command(self, command): def supported_commands(self): commands = [] for octet, flags in enumerate(self.local_supported_commands): - if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS): + if octet < len(hci.HCI_SUPPORTED_COMMANDS_FLAGS): for flag in range(8): if flags & (1 << flag) != 0: - command = HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag] + command = hci.HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag] if command is not None: commands.append(command) return commands - def supports_le_features(self, feature: LeFeatureMask) -> bool: + def supports_le_features(self, feature: hci.LeFeatureMask) -> bool: return (self.local_le_features & feature) == feature @property @@ -518,10 +593,10 @@ def supported_le_features(self): # Packet Sink protocol (packets coming from the controller via HCI) def on_packet(self, packet: bytes) -> None: - hci_packet = HCI_Packet.from_bytes(packet) + hci_packet = hci.HCI_Packet.from_bytes(packet) if self.ready or ( - isinstance(hci_packet, HCI_Command_Complete_Event) - and hci_packet.command_opcode == HCI_RESET_COMMAND + isinstance(hci_packet, hci.HCI_Command_Complete_Event) + and hci_packet.command_opcode == hci.HCI_RESET_COMMAND ): self.on_hci_packet(hci_packet) else: @@ -534,44 +609,44 @@ def on_transport_lost(self): self.emit('flush') - def on_hci_packet(self, packet: HCI_Packet) -> None: + def on_hci_packet(self, packet: hci.HCI_Packet) -> None: logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}') if self.snooper: self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST) # If the packet is a command, invoke the handler for this packet - if packet.hci_packet_type == HCI_COMMAND_PACKET: - self.on_hci_command_packet(cast(HCI_Command, packet)) - elif packet.hci_packet_type == HCI_EVENT_PACKET: - self.on_hci_event_packet(cast(HCI_Event, packet)) - elif packet.hci_packet_type == HCI_ACL_DATA_PACKET: - self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet)) - elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET: - self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet)) - elif packet.hci_packet_type == HCI_ISO_DATA_PACKET: - self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet)) + if packet.hci_packet_type == hci.HCI_COMMAND_PACKET: + self.on_hci_command_packet(cast(hci.HCI_Command, packet)) + elif packet.hci_packet_type == hci.HCI_EVENT_PACKET: + self.on_hci_event_packet(cast(hci.HCI_Event, packet)) + elif packet.hci_packet_type == hci.HCI_ACL_DATA_PACKET: + self.on_hci_acl_data_packet(cast(hci.HCI_AclDataPacket, packet)) + elif packet.hci_packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET: + self.on_hci_sco_data_packet(cast(hci.HCI_SynchronousDataPacket, packet)) + elif packet.hci_packet_type == hci.HCI_ISO_DATA_PACKET: + self.on_hci_iso_data_packet(cast(hci.HCI_IsoDataPacket, packet)) else: logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') - def on_hci_command_packet(self, command: HCI_Command) -> None: + def on_hci_command_packet(self, command: hci.HCI_Command) -> None: logger.warning(f'!!! unexpected command packet: {command}') - def on_hci_event_packet(self, event: HCI_Event) -> None: + def on_hci_event_packet(self, event: hci.HCI_Event) -> None: handler_name = f'on_{event.name.lower()}' handler = getattr(self, handler_name, self.on_hci_event) handler(event) - def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None: + def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None: # Look for the connection to which this data belongs if connection := self.connections.get(packet.connection_handle): connection.on_hci_acl_data_packet(packet) - def on_hci_sco_data_packet(self, packet: HCI_SynchronousDataPacket) -> None: + def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None: # Experimental self.emit('sco_packet', packet.connection_handle, packet) - def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None: + def on_hci_iso_data_packet(self, packet: hci.HCI_IsoDataPacket) -> None: # Experimental self.emit('iso_packet', packet.connection_handle, packet) @@ -635,11 +710,11 @@ def on_hci_connection_request_event(self, event): def on_hci_le_connection_complete_event(self, event): # Check if this is a cancellation - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: # Create/update the connection logger.debug( f'### LE CONNECTION: [0x{event.connection_handle:04X}] ' - f'{event.peer_address} as {HCI_Constant.role_name(event.role)}' + f'{event.peer_address} as {hci.HCI_Constant.role_name(event.role)}' ) connection = self.connections.get(event.connection_handle) @@ -679,7 +754,7 @@ def on_hci_le_enhanced_connection_complete_event(self, event): self.on_hci_le_connection_complete_event(event) def on_hci_connection_complete_event(self, event): - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: # Create/update the connection logger.debug( f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] ' @@ -726,7 +801,7 @@ def on_hci_disconnection_complete_event(self, event): logger.warning('!!! DISCONNECTION COMPLETE: unknown handle') return - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: logger.debug( f'### DISCONNECTION: [0x{handle:04X}] ' f'{connection.peer_address} ' @@ -735,7 +810,9 @@ def on_hci_disconnection_complete_event(self, event): # Notify the listeners self.emit('disconnection', handle, event.reason) - ( + + # Remove the handle reference + _ = ( self.connections.pop(handle, 0) or self.cis_links.pop(handle, 0) or self.sco_links.pop(handle, 0) @@ -752,7 +829,7 @@ def on_hci_le_connection_update_complete_event(self, event): return # Notify the client - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: connection_parameters = ConnectionParameters( event.connection_interval, event.peripheral_latency, @@ -772,7 +849,7 @@ def on_hci_le_phy_update_complete_event(self, event): return # Notify the client - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy) self.emit('connection_phy_update', connection.handle, connection_phy) else: @@ -791,6 +868,7 @@ def on_hci_le_advertising_set_terminated_event(self, event): event.status, event.advertising_handle, event.connection_handle, + event.num_completed_extended_advertising_events, ) def on_hci_le_cis_request_event(self, event): @@ -804,10 +882,10 @@ def on_hci_le_cis_request_event(self, event): def on_hci_le_cis_established_event(self, event): # The remaining parameters are unused for now. - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: self.cis_links[event.connection_handle] = CisLink( handle=event.connection_handle, - peer_address=Address.ANY, + peer_address=hci.Address.ANY, ) self.emit('cis_establishment', event.connection_handle) else: @@ -823,7 +901,7 @@ def on_hci_le_remote_connection_parameter_request_event(self, event): # For now, just accept everything # TODO: delegate the decision self.send_command_sync( - HCI_LE_Remote_Connection_Parameter_Request_Reply_Command( + hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command( connection_handle=event.connection_handle, interval_min=event.interval_min, interval_max=event.interval_max, @@ -854,12 +932,12 @@ async def send_long_term_key(): ), ) if long_term_key: - response = HCI_LE_Long_Term_Key_Request_Reply_Command( + response = hci.HCI_LE_Long_Term_Key_Request_Reply_Command( connection_handle=event.connection_handle, long_term_key=long_term_key, ) else: - response = HCI_LE_Long_Term_Key_Request_Negative_Reply_Command( + response = hci.HCI_LE_Long_Term_Key_Request_Negative_Reply_Command( connection_handle=event.connection_handle ) @@ -868,7 +946,7 @@ async def send_long_term_key(): asyncio.create_task(send_long_term_key()) def on_hci_synchronous_connection_complete_event(self, event): - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: # Create/update the connection logger.debug( f'### SCO CONNECTION: [0x{event.connection_handle:04X}] ' @@ -897,16 +975,16 @@ def on_hci_synchronous_connection_changed_event(self, event): pass def on_hci_role_change_event(self, event): - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: logger.debug( f'role change for {event.bd_addr}: ' - f'{HCI_Constant.role_name(event.new_role)}' + f'{hci.HCI_Constant.role_name(event.new_role)}' ) self.emit('role_change', event.bd_addr, event.new_role) else: logger.debug( f'role change for {event.bd_addr} failed: ' - f'{HCI_Constant.error_name(event.status)}' + f'{hci.HCI_Constant.error_name(event.status)}' ) self.emit('role_change_failure', event.bd_addr, event.status) @@ -922,7 +1000,7 @@ def on_hci_le_data_length_change_event(self, event): def on_hci_authentication_complete_event(self, event): # Notify the client - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: self.emit('connection_authentication', event.connection_handle) else: self.emit( @@ -933,7 +1011,7 @@ def on_hci_authentication_complete_event(self, event): def on_hci_encryption_change_event(self, event): # Notify the client - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: self.emit( 'connection_encryption_change', event.connection_handle, @@ -946,7 +1024,7 @@ def on_hci_encryption_change_event(self, event): def on_hci_encryption_key_refresh_complete_event(self, event): # Notify the client - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: self.emit('connection_encryption_key_refresh', event.connection_handle) else: self.emit( @@ -967,16 +1045,16 @@ def on_hci_page_scan_repetition_mode_change_event(self, event): def on_hci_link_key_notification_event(self, event): logger.debug( f'link key for {event.bd_addr}: {event.link_key.hex()}, ' - f'type={HCI_Constant.link_key_type_name(event.key_type)}' + f'type={hci.HCI_Constant.link_key_type_name(event.key_type)}' ) self.emit('link_key', event.bd_addr, event.link_key, event.key_type) def on_hci_simple_pairing_complete_event(self, event): logger.debug( f'simple pairing complete for {event.bd_addr}: ' - f'status={HCI_Constant.status_name(event.status)}' + f'status={hci.HCI_Constant.status_name(event.status)}' ) - if event.status == HCI_SUCCESS: + if event.status == hci.HCI_SUCCESS: self.emit('classic_pairing', event.bd_addr) else: self.emit('classic_pairing_failure', event.bd_addr, event.status) @@ -996,11 +1074,11 @@ async def send_link_key(): self.link_key_provider(event.bd_addr), ) if link_key: - response = HCI_Link_Key_Request_Reply_Command( + response = hci.HCI_Link_Key_Request_Reply_Command( bd_addr=event.bd_addr, link_key=link_key ) else: - response = HCI_Link_Key_Request_Negative_Reply_Command( + response = hci.HCI_Link_Key_Request_Negative_Reply_Command( bd_addr=event.bd_addr ) @@ -1057,7 +1135,7 @@ def on_hci_extended_inquiry_result_event(self, event): ) def on_hci_remote_name_request_complete_event(self, event): - if event.status != HCI_SUCCESS: + if event.status != hci.HCI_SUCCESS: self.emit('remote_name_failure', event.bd_addr, event.status) else: utf8_name = event.remote_name @@ -1075,7 +1153,7 @@ def on_hci_remote_host_supported_features_notification_event(self, event): ) def on_hci_le_read_remote_features_complete_event(self, event): - if event.status != HCI_SUCCESS: + if event.status != hci.HCI_SUCCESS: self.emit( 'le_remote_features_failure', event.connection_handle, event.status ) diff --git a/bumble/l2cap.py b/bumble/l2cap.py index c1fbdf62..f91a269f 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -208,7 +208,7 @@ class L2CAP_PDU: @staticmethod def from_bytes(data: bytes) -> L2CAP_PDU: - # Sanity check + # Check parameters if len(data) < 4: raise ValueError('not enough data for L2CAP header') diff --git a/bumble/utils.py b/bumble/utils.py index 7d6db5ab..e6aae4d2 100644 --- a/bumble/utils.py +++ b/bumble/utils.py @@ -226,13 +226,13 @@ def listener(self, listener): if self._listener: # Call the deregistration methods for each base class that has them for cls in self._listener.__class__.mro(): - if hasattr(cls, '_bumble_register_composite'): - cls._bumble_deregister_composite(listener, self) + if '_bumble_register_composite' in cls.__dict__: + cls._bumble_deregister_composite(self._listener, self) self._listener = listener if listener: # Call the registration methods for each base class that has them for cls in listener.__class__.mro(): - if hasattr(cls, '_bumble_deregister_composite'): + if '_bumble_deregister_composite' in cls.__dict__: cls._bumble_register_composite(listener, self) diff --git a/examples/run_advertiser.py b/examples/run_advertiser.py index 56b1b8bd..fb594267 100644 --- a/examples/run_advertiser.py +++ b/examples/run_advertiser.py @@ -19,9 +19,11 @@ import logging import sys import os +import struct + +from bumble.core import AdvertisingData from bumble.device import AdvertisingType, Device from bumble.hci import Address - from bumble.transport import open_transport_or_link @@ -52,6 +54,16 @@ async def main(): print('<<< connected') device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + + if advertising_type.is_scannable: + device.scan_response_data = bytes( + AdvertisingData( + [ + (AdvertisingData.APPEARANCE, struct.pack(' None: devices[1].cis_enabled = True await asyncio.gather(*[device.power_on() for device in devices]) - await devices[0].start_extended_advertising( - advertising_properties=( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING - ), - own_address_type=OwnAddressType.PUBLIC, - ) + advertising_set = await devices[0].create_advertising_set() connection = await devices[1].connect( devices[0].public_address, own_address_type=OwnAddressType.PUBLIC diff --git a/examples/run_csis_servers.py b/examples/run_csis_servers.py index 88d49a16..98535233 100644 --- a/examples/run_csis_servers.py +++ b/examples/run_csis_servers.py @@ -98,13 +98,7 @@ async def main() -> None: ) + csis.get_advertising_data() ) - await device.start_extended_advertising( - advertising_properties=( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING - ), - own_address_type=OwnAddressType.RANDOM, - advertising_data=advertising_data, - ) + await device.create_advertising_set(advertising_data=advertising_data) await asyncio.gather( *[hci_transport.source.terminated for hci_transport in hci_transports] diff --git a/examples/run_extended_advertiser.py b/examples/run_extended_advertiser.py index 20b0b341..6605cfa9 100644 --- a/examples/run_extended_advertiser.py +++ b/examples/run_extended_advertiser.py @@ -19,8 +19,13 @@ import logging import sys import os -from bumble.device import AdvertisingType, Device -from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command +from bumble.device import ( + AdvertisingParameters, + AdvertisingEventProperties, + AdvertisingType, + Device, +) +from bumble.hci import Address from bumble.transport import open_transport_or_link @@ -35,20 +40,16 @@ async def main() -> None: return if len(sys.argv) >= 4: - advertising_properties = ( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( - int(sys.argv[3]) - ) + advertising_properties = AdvertisingEventProperties.from_advertising_type( + AdvertisingType(int(sys.argv[3])) ) else: - advertising_properties = ( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING - ) + advertising_properties = AdvertisingEventProperties() if len(sys.argv) >= 5: - target = Address(sys.argv[4]) + peer_address = Address(sys.argv[4]) else: - target = Address.ANY + peer_address = Address.ANY print('<<< connecting to HCI...') async with await open_transport_or_link(sys.argv[2]) as hci_transport: @@ -58,8 +59,11 @@ async def main() -> None: sys.argv[1], hci_transport.source, hci_transport.sink ) await device.power_on() - await device.start_extended_advertising( - advertising_properties=advertising_properties, target=target + await device.create_advertising_set( + advertising_parameters=AdvertisingParameters( + advertising_event_properties=advertising_properties, + peer_address=peer_address, + ) ) await hci_transport.source.terminated diff --git a/examples/run_extended_advertiser_2.py b/examples/run_extended_advertiser_2.py new file mode 100644 index 00000000..735e1c5f --- /dev/null +++ b/examples/run_extended_advertiser_2.py @@ -0,0 +1,99 @@ +# Copyright 2021-2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import logging +import sys +import os +from bumble.device import AdvertisingParameters, AdvertisingEventProperties, Device +from bumble.hci import Address +from bumble.core import AdvertisingData +from bumble.transport import open_transport_or_link + + +# ----------------------------------------------------------------------------- +async def main() -> None: + if len(sys.argv) < 3: + print('Usage: run_extended_advertiser_2.py ') + print('example: run_extended_advertiser_2.py device1.json usb:0') + return + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + print('<<< connected') + + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) + await device.power_on() + + if not device.supports_le_extended_advertising: + print("Device does not support extended advertising") + return + + print("Max advertising sets:", device.host.number_of_supported_advertising_sets) + print( + "Max advertising data length:", device.host.maximum_advertising_data_length + ) + + if device.host.number_of_supported_advertising_sets >= 1: + advertising_data1 = AdvertisingData( + [(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 1".encode("utf-8"))] + ) + + set1 = await device.create_advertising_set( + advertising_data=bytes(advertising_data1), + ) + print("Selected TX power 1:", set1.selected_tx_power) + + advertising_data2 = AdvertisingData( + [(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))] + ) + + if device.host.number_of_supported_advertising_sets >= 2: + set2 = await device.create_advertising_set( + random_address=Address("F0:F0:F0:F0:F0:F1"), + advertising_parameters=AdvertisingParameters(), + advertising_data=bytes(advertising_data2), + auto_start=False, + auto_restart=True, + ) + print("Selected TX power 2:", set2.selected_tx_power) + await set2.start() + + if device.host.number_of_supported_advertising_sets >= 3: + scan_response_data3 = AdvertisingData( + [(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 3".encode("utf-8"))] + ) + + set3 = await device.create_advertising_set( + random_address=Address("F0:F0:F0:F0:F0:F2"), + advertising_parameters=AdvertisingParameters( + advertising_event_properties=AdvertisingEventProperties( + is_connectable=False, is_scannable=True + ) + ), + scan_response_data=bytes(scan_response_data3), + ) + print("Selected TX power 3:", set2.selected_tx_power) + + await hci_transport.source.terminated + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main()) diff --git a/examples/run_unicast_server.py b/examples/run_unicast_server.py index 35e124d4..7418b8bf 100644 --- a/examples/run_unicast_server.py +++ b/examples/run_unicast_server.py @@ -22,13 +22,12 @@ import struct import secrets from bumble.core import AdvertisingData -from bumble.device import Device, CisLink +from bumble.device import Device, CisLink, AdvertisingParameters from bumble.hci import ( CodecID, CodingFormat, OwnAddressType, HCI_IsoDataPacket, - HCI_LE_Set_Extended_Advertising_Parameters_Command, ) from bumble.profiles.bap import ( CodecSpecificCapabilities, @@ -179,11 +178,7 @@ def on_cis(cis_link: CisLink): device.once('cis_establishment', on_cis) - await device.start_extended_advertising( - advertising_properties=( - HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING - ), - own_address_type=OwnAddressType.RANDOM, + advertising_set = await device.create_advertising_set( advertising_data=advertising_data, ) diff --git a/setup.cfg b/setup.cfg index 09d87228..8ef11b1c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -99,7 +99,7 @@ development = types-protobuf >= 4.21.0 avatar = pandora-avatar == 0.0.5 - rootcanal == 1.6.0 ; python_version>='3.10' + rootcanal == 1.7.0 ; python_version>='3.10' documentation = mkdocs >= 1.4.0 mkdocs-material >= 8.5.6 diff --git a/tests/device_test.py b/tests/device_test.py index 8af1e9cd..5d872826 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -28,7 +28,7 @@ BT_PERIPHERAL_ROLE, ConnectionParameters, ) -from bumble.device import Connection, Device +from bumble.device import AdvertisingParameters, Connection, Device from bumble.host import AclPacketQueue, Host from bumble.hci import ( HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, @@ -50,7 +50,8 @@ GATT_APPEARANCE_CHARACTERISTIC, ) -from .test_utils import TwoDevices +from .test_utils import TwoDevices, async_barrier + # ----------------------------------------------------------------------------- # Logging @@ -254,12 +255,12 @@ async def test_legacy_advertising(): device = Device(host=mock.AsyncMock(Host)) # Start advertising - advertiser = await device.start_legacy_advertising() - assert device.legacy_advertiser + await device.start_advertising() + assert device.is_advertising # Stop advertising - await advertiser.stop() - assert not device.legacy_advertiser + await device.stop_advertising() + assert not device.is_advertising # ----------------------------------------------------------------------------- @@ -273,7 +274,7 @@ async def test_legacy_advertising_connection(own_address_type): peer_address = Address('F0:F1:F2:F3:F4:F5') # Start advertising - advertiser = await device.start_legacy_advertising() + await device.start_advertising() device.on_connection( 0x0001, BT_LE_TRANSPORT, @@ -301,7 +302,7 @@ async def test_legacy_advertising_connection(own_address_type): async def test_legacy_advertising_disconnection(auto_restart): device = Device(host=mock.AsyncMock(spec=Host)) peer_address = Address('F0:F1:F2:F3:F4:F5') - advertiser = await device.start_legacy_advertising(auto_restart=auto_restart) + await device.start_advertising(auto_restart=auto_restart) device.on_connection( 0x0001, BT_LE_TRANSPORT, @@ -310,20 +311,18 @@ async def test_legacy_advertising_disconnection(auto_restart): ConnectionParameters(0, 0, 0), ) - device.start_legacy_advertising = mock.AsyncMock() + device.on_advertising_set_termination( + HCI_SUCCESS, device.legacy_advertising_set.advertising_handle, 0x0001, 0 + ) device.on_disconnection(0x0001, 0) + await async_barrier() + await async_barrier() if auto_restart: - device.start_legacy_advertising.assert_called_with( - advertising_type=advertiser.advertising_type, - own_address_type=advertiser.own_address_type, - auto_restart=advertiser.auto_restart, - advertising_data=advertiser.advertising_data, - scan_response_data=advertiser.scan_response_data, - ) + assert device.is_advertising else: - device.start_legacy_advertising.assert_not_called() + assert not device.is_advertising # ----------------------------------------------------------------------------- @@ -332,12 +331,13 @@ async def test_extended_advertising(): device = Device(host=mock.AsyncMock(Host)) # Start advertising - advertiser = await device.start_extended_advertising() - assert device.extended_advertisers + advertising_set = await device.create_advertising_set() + assert device.extended_advertising_sets + assert advertising_set.enabled # Stop advertising - await advertiser.stop() - assert not device.extended_advertisers + await advertising_set.stop() + assert not advertising_set.enabled # ----------------------------------------------------------------------------- @@ -349,8 +349,8 @@ async def test_extended_advertising(): async def test_extended_advertising_connection(own_address_type): device = Device(host=mock.AsyncMock(spec=Host)) peer_address = Address('F0:F1:F2:F3:F4:F5') - advertiser = await device.start_extended_advertising( - own_address_type=own_address_type + advertising_set = await device.create_advertising_set( + advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) ) device.on_connection( 0x0001, @@ -361,8 +361,9 @@ async def test_extended_advertising_connection(own_address_type): ) device.on_advertising_set_termination( HCI_SUCCESS, - advertiser.handle, + advertising_set.advertising_handle, 0x0001, + 0, ) if own_address_type == OwnAddressType.PUBLIC: @@ -375,45 +376,6 @@ async def test_extended_advertising_connection(own_address_type): await asyncio.sleep(0.0001) -# ----------------------------------------------------------------------------- -@pytest.mark.parametrize( - 'auto_restart,', - (True, False), -) -@pytest.mark.asyncio -async def test_extended_advertising_disconnection(auto_restart): - device = Device(host=mock.AsyncMock(spec=Host)) - peer_address = Address('F0:F1:F2:F3:F4:F5') - advertiser = await device.start_extended_advertising(auto_restart=auto_restart) - device.on_connection( - 0x0001, - BT_LE_TRANSPORT, - peer_address, - BT_PERIPHERAL_ROLE, - ConnectionParameters(0, 0, 0), - ) - device.on_advertising_set_termination( - HCI_SUCCESS, - advertiser.handle, - 0x0001, - ) - - device.start_extended_advertising = mock.AsyncMock() - - device.on_disconnection(0x0001, 0) - - if auto_restart: - device.start_extended_advertising.assert_called_with( - advertising_properties=advertiser.advertising_properties, - own_address_type=advertiser.own_address_type, - auto_restart=advertiser.auto_restart, - advertising_data=advertiser.advertising_data, - scan_response_data=advertiser.scan_response_data, - ) - else: - device.start_extended_advertising.assert_not_called() - - # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_remote_le_features(): diff --git a/tests/gatt_test.py b/tests/gatt_test.py index 19dff2f2..e3c92097 100644 --- a/tests/gatt_test.py +++ b/tests/gatt_test.py @@ -50,6 +50,7 @@ ATT_Error_Response, ATT_Read_By_Group_Type_Request, ) +from .test_utils import async_barrier # ----------------------------------------------------------------------------- @@ -456,13 +457,6 @@ def __init__(self): self.paired = [None, None, None] -# ----------------------------------------------------------------------------- -async def async_barrier(): - ready = asyncio.get_running_loop().create_future() - asyncio.get_running_loop().call_soon(ready.set_result, None) - await ready - - # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_write(): diff --git a/tests/hci_test.py b/tests/hci_test.py index 1504f200..72f4022d 100644 --- a/tests/hci_test.py +++ b/tests/hci_test.py @@ -23,6 +23,8 @@ HCI_LE_READ_BUFFER_SIZE_COMMAND, HCI_RESET_COMMAND, HCI_SUCCESS, + HCI_LE_CONNECTION_COMPLETE_EVENT, + HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, Address, CodingFormat, CodecID, @@ -274,8 +276,14 @@ def test_HCI_Set_Event_Mask_Command(): # ----------------------------------------------------------------------------- def test_HCI_LE_Set_Event_Mask_Command(): command = HCI_LE_Set_Event_Mask_Command( - le_event_mask=bytes.fromhex('0011223344556677') + le_event_mask=HCI_LE_Set_Event_Mask_Command.mask( + [ + HCI_LE_CONNECTION_COMPLETE_EVENT, + HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, + ] + ) ) + assert command.le_event_mask == bytes.fromhex('0100000000010000') basic_check(command) diff --git a/tests/test_utils.py b/tests/test_utils.py index 331b1860..d193d6e5 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -12,6 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio from typing import List, Optional from bumble.controller import Controller @@ -22,6 +26,7 @@ from bumble.hci import Address +# ----------------------------------------------------------------------------- class TwoDevices: connections: List[Optional[Connection]] @@ -75,3 +80,10 @@ async def setup_connection(self) -> None: def __getitem__(self, index: int) -> Device: return self.devices[index] + + +# ----------------------------------------------------------------------------- +async def async_barrier(): + ready = asyncio.get_running_loop().create_future() + asyncio.get_running_loop().call_soon(ready.set_result, None) + await ready