diff --git a/apps/controller_info.py b/apps/controller_info.py index 50d7845b..83ac3bbf 100644 --- a/apps/controller_info.py +++ b/apps/controller_info.py @@ -27,8 +27,8 @@ from bumble.core import name_or_number from bumble.hci import ( map_null_terminated_utf8_string, + LeFeatureMask, HCI_SUCCESS, - HCI_LE_SUPPORTED_FEATURES_NAMES, HCI_VERSION_NAMES, LMP_VERSION_NAMES, HCI_Command, @@ -140,7 +140,7 @@ async def get_le_info(host: Host) -> None: print(color('LE Features:', 'yellow')) for feature in host.supported_le_features: - print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature)) + print(LeFeatureMask(feature).name) # ----------------------------------------------------------------------------- diff --git a/bumble/controller.py b/bumble/controller.py index 89021831..db8f0e65 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -1212,6 +1212,18 @@ def on_hci_le_read_remote_features_command(self, command): See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command ''' + handle = command.connection_handle + + if not self.find_connection_by_handle(handle): + self.send_hci_packet( + HCI_Command_Status_Event( + status=HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + return + # First, say that the command is pending self.send_hci_packet( HCI_Command_Status_Event( @@ -1225,7 +1237,7 @@ def on_hci_le_read_remote_features_command(self, command): self.send_hci_packet( HCI_LE_Read_Remote_Features_Complete_Event( status=HCI_SUCCESS, - connection_handle=0, + connection_handle=handle, le_features=bytes.fromhex('dd40000000000000'), ) ) diff --git a/bumble/device.py b/bumble/device.py index 0bdcd892..3686878c 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -49,7 +49,6 @@ HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_CENTRAL_ROLE, HCI_COMMAND_STATUS_PENDING, - HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, HCI_DISPLAY_YES_NO_IO_CAPABILITY, HCI_DISPLAY_ONLY_IO_CAPABILITY, @@ -60,11 +59,8 @@ HCI_LE_1M_PHY, HCI_LE_1M_PHY_BIT, HCI_LE_2M_PHY, - HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE, HCI_LE_CODED_PHY, HCI_LE_CODED_PHY_BIT, - HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE, - HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE, HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND, HCI_LE_RAND_COMMAND, HCI_LE_READ_PHY_COMMAND, @@ -106,6 +102,7 @@ HCI_LE_Extended_Create_Connection_Command, HCI_LE_Rand_Command, HCI_LE_Read_PHY_Command, + HCI_LE_Read_Remote_Features_Command, HCI_LE_Reject_CIS_Request_Command, HCI_LE_Remove_Advertising_Set_Command, HCI_LE_Set_Address_Resolution_Enable_Command, @@ -151,6 +148,7 @@ HCI_Write_Secure_Connections_Host_Support_Command, HCI_Write_Simple_Pairing_Mode_Command, OwnAddressType, + LeFeatureMask, phy_list_to_bits, ) from .host import Host @@ -681,6 +679,7 @@ class Connection(CompositeEventEmitter): self_address: Address peer_address: Address peer_resolvable_address: Optional[Address] + peer_le_features: Optional[LeFeatureMask] role: int encryption: int authenticated: bool @@ -757,6 +756,7 @@ def __init__( ) # By default, use the device's shared server self.pairing_peer_io_capability = None self.pairing_peer_authentication_requirements = None + self.peer_le_features = None # [Classic only] @classmethod @@ -905,6 +905,15 @@ async def get_phy(self): async def request_remote_name(self): return await self.device.request_remote_name(self) + async def get_remote_le_features(self) -> LeFeatureMask: + """[LE Only] Reads remote LE supported features. + + Returns: + LE features supported by the remote device. + """ + self.peer_le_features = await self.device.get_remote_le_features(self) + return self.peer_le_features + async def __aenter__(self): return self @@ -1537,9 +1546,7 @@ async def power_on(self) -> None: if self.cis_enabled: await self.send_command( HCI_LE_Set_Host_Feature_Command( - bit_number=( - HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE - ), + bit_number=LeFeatureMask.CONNECTED_ISOCHRONOUS_STREAM, bit_value=1, ) ) @@ -1595,21 +1602,21 @@ async def refresh_resolving_list(self) -> None: ) ) - def supports_le_feature(self, feature): - return self.host.supports_le_feature(feature) + def supports_le_features(self, feature: LeFeatureMask) -> bool: + return self.host.supports_le_features(feature) def supports_le_phy(self, phy): if phy == HCI_LE_1M_PHY: return True feature_map = { - HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE, - HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE, + HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY, + HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY, } if phy not in feature_map: raise ValueError('invalid PHY') - return self.host.supports_le_feature(feature_map[phy]) + return self.host.supports_le_features(feature_map[phy]) @deprecated("Please use start_legacy_advertising.") async def start_advertising( @@ -1919,8 +1926,8 @@ async def start_scanning( self.advertisement_accumulators = {} # Enable scanning - if not legacy and self.supports_le_feature( - HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE + if not legacy and self.supports_le_features( + LeFeatureMask.LE_EXTENDED_ADVERTISING ): # Set the scanning parameters scan_type = ( @@ -1938,7 +1945,7 @@ async def start_scanning( scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT scanning_phy_count += 1 if HCI_LE_CODED_PHY in scanning_phys: - if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE): + if self.supports_le_features(LeFeatureMask.LE_CODED_PHY): scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT scanning_phy_count += 1 @@ -1999,7 +2006,7 @@ async def start_scanning( async def stop_scanning(self) -> None: # Disable scanning - if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE): + if self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING): await self.send_command( HCI_LE_Set_Extended_Scan_Enable_Command( enable=0, filter_duplicates=0, duration=0, period=0 @@ -3141,6 +3148,32 @@ async def reject_cis_request( ) raise HCI_StatusError(result) + async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask: + """[LE Only] Reads remote LE supported features. + + Args: + handle: connection handle to read LE features. + + Returns: + LE features supported by the remote device. + """ + with closing(EventWatcher()) as watcher: + read_feature_future: asyncio.Future[ + LeFeatureMask + ] = asyncio.get_running_loop().create_future() + + def on_le_remote_features(handle: int, features: int): + if handle == connection.handle: + read_feature_future.set_result(LeFeatureMask(features)) + + watcher.on(self.host, 'le_remote_features', on_le_remote_features) + await self.send_command( + HCI_LE_Read_Remote_Features_Command( + connection_handle=connection.handle + ), + ) + return await read_feature_future + @host_event_handler def on_flush(self): self.emit('flush') diff --git a/bumble/hci.py b/bumble/hci.py index cedb9486..4e088281 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1360,55 +1360,51 @@ class PhyBit(enum.IntFlag): # LE Supported Features # See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT -HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0 -HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1 -HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2 -HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3 -HCI_LE_PING_LE_SUPPORTED_FEATURE = 4 -HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5 -HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6 -HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7 -HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8 -HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9 -HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10 -HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11 -HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12 -HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13 -HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14 -HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15 -HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16 -HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17 -HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18 -HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19 -HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20 -HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21 -HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22 -HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23 -HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24 -HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25 -HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26 -HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27 -HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28 -HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29 -HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30 -HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31 -HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32 -HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33 -HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34 -HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35 -HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36 -HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37 -HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38 -HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39 -HCI_ADVERTISING_CODING_SELECTION_LE_SUPPORTED_FEATURE = 40 -HCI_ADVERTISING_CODING_SELECTION_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 41 -HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER_LE_SUPPORTED_FEATURE = 43 -HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER_LE_SUPPORTED_FEATURE = 44 - -HCI_LE_SUPPORTED_FEATURES_NAMES = { - flag: feature_name for (feature_name, flag) in globals().items() - if feature_name.startswith('HCI_') and feature_name.endswith('_LE_SUPPORTED_FEATURE') -} +class LeFeatureMask(enum.IntFlag): + LE_ENCRYPTION = 1 << 0 + CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1 << 1 + EXTENDED_REJECT_INDICATION = 1 << 2 + PERIPHERAL_INITIATED_FEATURE_EXCHANGE = 1 << 3 + LE_PING = 1 << 4 + LE_DATA_PACKET_LENGTH_EXTENSION = 1 << 5 + LL_PRIVACY = 1 << 6 + EXTENDED_SCANNER_FILTER_POLICIES = 1 << 7 + LE_2M_PHY = 1 << 8 + STABLE_MODULATION_INDEX_TRANSMITTER = 1 << 9 + STABLE_MODULATION_INDEX_RECEIVER = 1 << 10 + LE_CODED_PHY = 1 << 11 + LE_EXTENDED_ADVERTISING = 1 << 12 + LE_PERIODIC_ADVERTISING = 1 << 13 + CHANNEL_SELECTION_ALGORITHM_2 = 1 << 14 + LE_POWER_CLASS_1 = 1 << 15 + MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE = 1 << 16 + CONNECTION_CTE_REQUEST = 1 << 17 + CONNECTION_CTE_RESPONSE = 1 << 18 + CONNECTIONLESS_CTE_TRANSMITTER = 1 << 19 + CONNECTIONLESS_CTR_RECEIVER = 1 << 20 + ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION = 1 << 21 + ANTENNA_SWITCHING_DURING_CTE_RECEPTION = 1 << 22 + RECEIVING_CONSTANT_TONE_EXTENSIONS = 1 << 23 + PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER = 1 << 24 + PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT = 1 << 25 + SLEEP_CLOCK_ACCURACY_UPDATES = 1 << 26 + REMOTE_PUBLIC_KEY_VALIDATION = 1 << 27 + CONNECTED_ISOCHRONOUS_STREAM_CENTRAL = 1 << 28 + CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL = 1 << 29 + ISOCHRONOUS_BROADCASTER = 1 << 30 + SYNCHRONIZED_RECEIVER = 1 << 31 + CONNECTED_ISOCHRONOUS_STREAM = 1 << 32 + LE_POWER_CONTROL_REQUEST = 1 << 33 + LE_POWER_CONTROL_REQUEST_DUP = 1 << 34 + LE_PATH_LOSS_MONITORING = 1 << 35 + PERIODIC_ADVERTISING_ADI_SUPPORT = 1 << 36 + CONNECTION_SUBRATING = 1 << 37 + CONNECTION_SUBRATING_HOST_SUPPORT = 1 << 38 + CHANNEL_CLASSIFICATION = 1 << 39 + ADVERTISING_CODING_SELECTION = 1 << 40 + ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 1 << 41 + PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 1 << 43 + PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 1 << 44 # fmt: on diff --git a/bumble/host.py b/bumble/host.py index f284d662..f28d6fc6 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -70,6 +70,7 @@ HCI_Reset_Command, HCI_Set_Event_Mask_Command, HCI_SynchronousDataPacket, + LeFeatureMask, ) from .core import ( BT_BR_EDR_TRANSPORT, @@ -487,8 +488,8 @@ def supported_commands(self): return commands - def supports_le_feature(self, feature): - return (self.local_le_features & (1 << feature)) != 0 + def supports_le_features(self, feature: LeFeatureMask) -> bool: + return (self.local_le_features & feature) == feature @property def supported_le_features(self): @@ -1033,3 +1034,15 @@ def on_hci_remote_host_supported_features_notification_event(self, event): event.bd_addr, event.host_supported_features, ) + + def on_hci_le_read_remote_features_complete_event(self, event): + if event.status != HCI_SUCCESS: + self.emit( + 'le_remote_features_failure', event.connection_handle, event.status + ) + else: + self.emit( + 'le_remote_features', + event.connection_handle, + int.from_bytes(event.le_features, 'little'), + ) diff --git a/tests/device_test.py b/tests/device_test.py index 50dd5f80..f6cd2213 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -50,6 +50,8 @@ GATT_APPEARANCE_CHARACTERISTIC, ) +from .test_utils import TwoDevices + # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- @@ -412,6 +414,15 @@ async def test_extended_advertising_disconnection(auto_restart): device.start_extended_advertising.assert_not_called() +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_get_remote_le_features(): + devices = TwoDevices() + await devices.setup_connection() + + assert (await devices.connections[0].get_remote_le_features()) is not None + + # ----------------------------------------------------------------------------- def test_gatt_services_with_gas(): device = Device(host=Host(None, None))