From 48685c8587fab465ddee3aeb5258b38b8d3a8e08 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Sat, 23 Nov 2024 08:55:50 -0800 Subject: [PATCH] improve vendor event support --- apps/show.py | 12 ++++---- bumble/drivers/intel.py | 15 ++++++---- bumble/hci.py | 49 +++++++++++++++++++++++++------- bumble/vendor/android/hci.py | 31 ++++++++++++++++++-- docs/mkdocs/src/drivers/intel.md | 18 ++++++++++-- tools/intel_util.py | 38 +++++++++++++++---------- 6 files changed, 121 insertions(+), 42 deletions(-) diff --git a/apps/show.py b/apps/show.py index 97640a37..8c386817 100644 --- a/apps/show.py +++ b/apps/show.py @@ -144,18 +144,18 @@ def print(self, message: str) -> None: help='Format of the input file', ) @click.option( - '--vendors', + '--vendor', type=click.Choice(['android', 'zephyr']), multiple=True, help='Support vendor-specific commands (list one or more)', ) @click.argument('filename') # pylint: disable=redefined-builtin -def main(format, vendors, filename): - for vendor in vendors: - if vendor == 'android': +def main(format, vendor, filename): + for vendor_name in vendor: + if vendor_name == 'android': import bumble.vendor.android.hci - elif vendor == 'zephyr': + elif vendor_name == 'zephyr': import bumble.vendor.zephyr.hci input = open(filename, 'rb') @@ -180,7 +180,7 @@ def read_next_packet(): else: printer.print(color("[TRUNCATED]", "red")) except Exception as error: - logger.exception() + logger.exception('') print(color(f'!!! {error}', 'red')) diff --git a/bumble/drivers/intel.py b/bumble/drivers/intel.py index 3c8ae784..cdb7e72e 100644 --- a/bumble/drivers/intel.py +++ b/bumble/drivers/intel.py @@ -11,6 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +Support for Intel USB controllers. +Loosely based on the Fuchsia OS implementation. +""" # ----------------------------------------------------------------------------- # Imports @@ -405,7 +409,8 @@ def on_packet(self, packet: bytes) -> None: if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets: logger.debug( - f"max_in_flight_firmware_load_commands update: {event.num_hci_command_packets}" + "max_in_flight_firmware_load_commands update: " + f"{event.num_hci_command_packets}" ) self.max_in_flight_firmware_load_commands = event.num_hci_command_packets logger.debug(f"event: {event}") @@ -521,7 +526,7 @@ def on_vendor_event(event: hci.HCI_Vendor_Event): # because they are formatted as HCI event packets but are received # on the ACL channel, so the host parser would get confused. saved_on_packet = self.host.on_packet - self.host.on_packet = self.on_packet + self.host.on_packet = self.on_packet # type: ignore self.firmware_load_complete.clear() # Send the CSS header @@ -577,7 +582,7 @@ def on_vendor_event(event: hci.HCI_Vendor_Event): logger.debug("firmware loaded") # Restore the original packet handler. - self.host.on_packet = saved_on_packet + self.host.on_packet = saved_on_packet # type: ignore # Reset self.reset_complete.clear() @@ -653,10 +658,10 @@ async def read_device_info(self) -> dict[ValueType, Any]: if not isinstance(response, hci.HCI_Command_Complete_Event): raise DriverError("unexpected HCI response") - if response.return_parameters.status != 0: + if response.return_parameters.status != 0: # type: ignore raise DriverError("HCI_Intel_Read_Version_Command error") - tlvs = _parse_tlv(response.return_parameters.tlv) + tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore # Convert the list to a dict. That's Ok here because we only expect each type # to appear just once. diff --git a/bumble/hci.py b/bumble/hci.py index 3236dd42..93769344 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -4996,6 +4996,7 @@ class HCI_Event(HCI_Packet): hci_packet_type = HCI_EVENT_PACKET event_names: Dict[int, str] = {} event_classes: Dict[int, Type[HCI_Event]] = {} + vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None @staticmethod def event(fields=()): @@ -5053,31 +5054,41 @@ def registered(event_class): return event_class - @staticmethod - def from_bytes(packet: bytes) -> HCI_Event: + @classmethod + def from_bytes(cls, packet: bytes) -> HCI_Event: event_code = packet[1] length = packet[2] parameters = packet[3:] if len(parameters) != length: raise InvalidPacketError('invalid packet length') - cls: Any + subclass: Any if event_code == HCI_LE_META_EVENT: # We do this dispatch here and not in the subclass in order to avoid call # loops subevent_code = parameters[0] - cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code) - if cls is None: + subclass = HCI_LE_Meta_Event.subevent_classes.get(subevent_code) + if subclass is None: # No class registered, just use a generic class instance return HCI_LE_Meta_Event(subevent_code, parameters) + elif event_code == HCI_VENDOR_EVENT: + # Invoke all the registered factories to see if any of them can handle + # the event + if cls.vendor_factory: + if event := cls.vendor_factory(parameters): + return event + + # No factory, or the factory could not create an instance, + # return a generic vendor event + return HCI_Event(event_code, parameters) else: - cls = HCI_Event.event_classes.get(event_code) - if cls is None: + subclass = HCI_Event.event_classes.get(event_code) + if subclass is None: # No class registered, just use a generic class instance return HCI_Event(event_code, parameters) # Invoke the factory to create a new instance - return cls.from_parameters(parameters) # type: ignore + return subclass.from_parameters(parameters) # type: ignore @classmethod def from_parameters(cls, parameters): @@ -5127,7 +5138,7 @@ class HCI_Extended_Event(HCI_Event): ''' subevent_names: Dict[int, str] = {} - subevent_classes: Dict[int, Type[HCI_Extended_Event]] + subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {} @classmethod def event(cls, fields=()): @@ -5178,7 +5189,22 @@ def register_subevents(cls, symbols: Dict[str, Any]) -> None: cls.subevent_names.update(cls.subevent_map(symbols)) @classmethod - def from_parameters(cls, parameters): + def subclass_from_parameters( + cls, parameters: bytes + ) -> Optional[HCI_Extended_Event]: + """ + Factory method that parses the subevent code, finds a registered subclass, + and creates an instance if found. + """ + subevent_code = parameters[0] + if subclass := cls.subevent_classes.get(subevent_code): + return subclass.from_parameters(parameters) + + return None + + @classmethod + def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event: + """Factory method for subclasses (the subevent code has already been parsed)""" self = cls.__new__(cls) HCI_Extended_Event.__init__(self, self.subevent_code, parameters) if fields := getattr(self, 'fields', None): @@ -6092,8 +6118,9 @@ class HCI_Command_Complete_Event(HCI_Event): See Bluetooth spec @ 7.7.14 Command Complete Event ''' - return_parameters = b'' + num_hci_command_packets: int command_opcode: int + return_parameters = b'' def map_return_parameters(self, return_parameters): '''Map simple 'status' return parameters to their named constant form''' diff --git a/bumble/vendor/android/hci.py b/bumble/vendor/android/hci.py index 126d580f..0aaa23ae 100644 --- a/bumble/vendor/android/hci.py +++ b/bumble/vendor/android/hci.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- import struct +from typing import Dict, Optional, Type from bumble.hci import ( name_or_number, @@ -24,6 +25,9 @@ HCI_Constant, HCI_Object, HCI_Command, + HCI_Event, + HCI_Extended_Event, + HCI_VENDOR_EVENT, STATUS_SPEC, ) @@ -277,7 +281,29 @@ def opcode_name(cls, opcode): # ----------------------------------------------------------------------------- -@HCI_Vendor_Event.event( +class HCI_Android_Vendor_Event(HCI_Extended_Event): + event_code: int = HCI_VENDOR_EVENT + subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {} + + @classmethod + def subclass_from_parameters( + cls, parameters: bytes + ) -> Optional[HCI_Extended_Event]: + subevent_code = parameters[0] + if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT: + quality_report_id = parameters[1] + if quality_report_id in (0x01, 0x02, 0x03, 0x04, 0x07, 0x08, 0x09): + return HCI_Bluetooth_Quality_Report_Event.from_parameters(parameters) + + return None + + +HCI_Android_Vendor_Event.register_subevents(globals()) +HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters + + +# ----------------------------------------------------------------------------- +@HCI_Extended_Event.event( fields=[ ('quality_report_id', 1), ('packet_types', 1), @@ -306,10 +332,11 @@ def opcode_name(cls, opcode): ('tx_last_subevent_packets', 4), ('crc_error_packets', 4), ('rx_duplicate_packets', 4), + ('rx_unreceived_packets', 4), ('vendor_specific_parameters', '*'), ] ) -class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event): +class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event): # pylint: disable=line-too-long ''' See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event diff --git a/docs/mkdocs/src/drivers/intel.md b/docs/mkdocs/src/drivers/intel.md index 89f9acbb..372b6c5e 100644 --- a/docs/mkdocs/src/drivers/intel.md +++ b/docs/mkdocs/src/drivers/intel.md @@ -4,14 +4,15 @@ INTEL DRIVER This driver supports loading firmware images and optional config data to Intel USB controllers. A number of USB dongles are supported, but likely not all. -When using a USB dongle, the USB product ID and vendor ID are used +The initial implementation has been tested on BE200 and AX210 controllers. +When using a USB controller, the USB product ID and vendor ID are used to find whether a matching set of firmware image and config data is needed for that specific model. If a match exists, the driver will try load the firmware image and, if needed, config data. Alternatively, the metadata property ``driver=intel`` may be specified in a transport name to force that driver to be used (ex: ``usb:[driver=intel]0`` instead of just ``usb:0`` for the first USB device). -The driver will look for those files by name, in order, in: +The driver will look for the firmware and config files by name, in order, in: * The directory specified by the environment variable `BUMBLE_INTEL_FIRMWARE_DIR` if set. @@ -19,6 +20,17 @@ The driver will look for those files by name, in order, in: where the `bumble` package is installed. * The current directory. +It is also possible to override or extend the config data with parameters passed via the +transport name. The driver name `intel` may be suffixed with `/[+]...` +The supported params are: + * `ddc_addon`: configuration data to add to the data loaded from the config data file + * `ddc_override`: configuration data to use instead of the data loaded from the config data file. + +With both `dcc_addon` and `dcc_override`, the param value is a hex-encoded byte array containing +the config data (same format as the config file). +Example transport name: +`usb:[driver=intel/dcc_addon:03E40200]0` + Obtaining Firmware Images and Config Data ----------------------------------------- @@ -35,7 +47,7 @@ Usage: bumble-intel-fw-download [OPTIONS] Options: --output-dir TEXT Output directory where the files will be saved. - Defaults to the OS-specificapp data dir, which the + Defaults to the OS-specific app data dir, which the driver will check when trying to find firmware --source [linux-kernel] [default: linux-kernel] --single TEXT Only download a single image set, by its base name diff --git a/tools/intel_util.py b/tools/intel_util.py index 89a68757..0333a010 100644 --- a/tools/intel_util.py +++ b/tools/intel_util.py @@ -18,7 +18,7 @@ import logging import asyncio import os -from typing import Any +from typing import Any, Optional import click @@ -45,33 +45,42 @@ def print_device_info(device_info: dict[intel.ValueType, Any]) -> None: print(f" {color(key.name, 'green')}: {value}") +# ----------------------------------------------------------------------------- +async def get_driver(host: Host, force: bool) -> Optional[intel.Driver]: + # Create a driver + driver = await intel.Driver.for_host(host, force) + if driver is None: + print("Device does not appear to be an Intel device") + return None + + return driver + + # ----------------------------------------------------------------------------- async def do_info(usb_transport, force): async with await transport.open_transport(usb_transport) as ( hci_source, hci_sink, ): - # Create a host to communicate with the device host = Host(hci_source, hci_sink) - - # Create a driver - driver = await intel.Driver.for_host(host, force) + driver = await get_driver(host, force) + if driver is None: + return # Get and print the device info print_device_info(await driver.read_device_info()) # ----------------------------------------------------------------------------- -async def do_load(usb_transport, force): +async def do_load(usb_transport: str, force: bool) -> None: async with await transport.open_transport(usb_transport) as ( hci_source, hci_sink, ): - # Create a host to communicate with the device host = Host(hci_source, hci_sink) - - # Create a driver - driver = await intel.Driver.for_host(host, force) + driver = await get_driver(host, force) + if driver is None: + return # Reboot in bootloader mode await driver.load_firmware() @@ -81,16 +90,15 @@ async def do_load(usb_transport, force): # ----------------------------------------------------------------------------- -async def do_bootloader(usb_transport, force): +async def do_bootloader(usb_transport: str, force: bool) -> None: async with await transport.open_transport(usb_transport) as ( hci_source, hci_sink, ): - # Create a host to communicate with the device host = Host(hci_source, hci_sink) - - # Create a driver - driver = await intel.Driver.for_host(host, force) + driver = await get_driver(host, force) + if driver is None: + return # Reboot in bootloader mode await driver.reboot_bootloader()