Skip to content

Commit

Permalink
improve vendor event support
Browse files Browse the repository at this point in the history
  • Loading branch information
barbibulle committed Nov 23, 2024
1 parent 8d90828 commit 48685c8
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 42 deletions.
12 changes: 6 additions & 6 deletions apps/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ def print(self, message: str) -> None:
help='Format of the input file',
)
@click.option(
'--vendors',
'--vendor',
type=click.Choice(['android', 'zephyr']),
multiple=True,
help='Support vendor-specific commands (list one or more)',
)
@click.argument('filename')
# pylint: disable=redefined-builtin
def main(format, vendors, filename):
for vendor in vendors:
if vendor == 'android':
def main(format, vendor, filename):
for vendor_name in vendor:
if vendor_name == 'android':
import bumble.vendor.android.hci
elif vendor == 'zephyr':
elif vendor_name == 'zephyr':
import bumble.vendor.zephyr.hci

input = open(filename, 'rb')
Expand All @@ -180,7 +180,7 @@ def read_next_packet():
else:
printer.print(color("[TRUNCATED]", "red"))
except Exception as error:
logger.exception()
logger.exception('')
print(color(f'!!! {error}', 'red'))


Expand Down
15 changes: 10 additions & 5 deletions bumble/drivers/intel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Support for Intel USB controllers.
Loosely based on the Fuchsia OS implementation.
"""

# -----------------------------------------------------------------------------
# Imports
Expand Down Expand Up @@ -405,7 +409,8 @@ def on_packet(self, packet: bytes) -> None:

if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets:
logger.debug(
f"max_in_flight_firmware_load_commands update: {event.num_hci_command_packets}"
"max_in_flight_firmware_load_commands update: "
f"{event.num_hci_command_packets}"
)
self.max_in_flight_firmware_load_commands = event.num_hci_command_packets
logger.debug(f"event: {event}")
Expand Down Expand Up @@ -521,7 +526,7 @@ def on_vendor_event(event: hci.HCI_Vendor_Event):
# because they are formatted as HCI event packets but are received
# on the ACL channel, so the host parser would get confused.
saved_on_packet = self.host.on_packet
self.host.on_packet = self.on_packet
self.host.on_packet = self.on_packet # type: ignore
self.firmware_load_complete.clear()

# Send the CSS header
Expand Down Expand Up @@ -577,7 +582,7 @@ def on_vendor_event(event: hci.HCI_Vendor_Event):
logger.debug("firmware loaded")

# Restore the original packet handler.
self.host.on_packet = saved_on_packet
self.host.on_packet = saved_on_packet # type: ignore

# Reset
self.reset_complete.clear()
Expand Down Expand Up @@ -653,10 +658,10 @@ async def read_device_info(self) -> dict[ValueType, Any]:
if not isinstance(response, hci.HCI_Command_Complete_Event):
raise DriverError("unexpected HCI response")

if response.return_parameters.status != 0:
if response.return_parameters.status != 0: # type: ignore
raise DriverError("HCI_Intel_Read_Version_Command error")

tlvs = _parse_tlv(response.return_parameters.tlv)
tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore

# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.
Expand Down
49 changes: 38 additions & 11 deletions bumble/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -4996,6 +4996,7 @@ class HCI_Event(HCI_Packet):
hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None

@staticmethod
def event(fields=()):
Expand Down Expand Up @@ -5053,31 +5054,41 @@ def registered(event_class):

return event_class

@staticmethod
def from_bytes(packet: bytes) -> HCI_Event:
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Event:
event_code = packet[1]
length = packet[2]
parameters = packet[3:]
if len(parameters) != length:
raise InvalidPacketError('invalid packet length')

cls: Any
subclass: Any
if event_code == HCI_LE_META_EVENT:
# We do this dispatch here and not in the subclass in order to avoid call
# loops
subevent_code = parameters[0]
cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if cls is None:
subclass = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if subclass is None:
# No class registered, just use a generic class instance
return HCI_LE_Meta_Event(subevent_code, parameters)
elif event_code == HCI_VENDOR_EVENT:
# Invoke all the registered factories to see if any of them can handle
# the event
if cls.vendor_factory:
if event := cls.vendor_factory(parameters):
return event

# No factory, or the factory could not create an instance,
# return a generic vendor event
return HCI_Event(event_code, parameters)
else:
cls = HCI_Event.event_classes.get(event_code)
if cls is None:
subclass = HCI_Event.event_classes.get(event_code)
if subclass is None:
# No class registered, just use a generic class instance
return HCI_Event(event_code, parameters)

# Invoke the factory to create a new instance
return cls.from_parameters(parameters) # type: ignore
return subclass.from_parameters(parameters) # type: ignore

@classmethod
def from_parameters(cls, parameters):
Expand Down Expand Up @@ -5127,7 +5138,7 @@ class HCI_Extended_Event(HCI_Event):
'''

subevent_names: Dict[int, str] = {}
subevent_classes: Dict[int, Type[HCI_Extended_Event]]
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}

@classmethod
def event(cls, fields=()):
Expand Down Expand Up @@ -5178,7 +5189,22 @@ def register_subevents(cls, symbols: Dict[str, Any]) -> None:
cls.subevent_names.update(cls.subevent_map(symbols))

@classmethod
def from_parameters(cls, parameters):
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
"""
Factory method that parses the subevent code, finds a registered subclass,
and creates an instance if found.
"""
subevent_code = parameters[0]
if subclass := cls.subevent_classes.get(subevent_code):
return subclass.from_parameters(parameters)

return None

@classmethod
def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event:
"""Factory method for subclasses (the subevent code has already been parsed)"""
self = cls.__new__(cls)
HCI_Extended_Event.__init__(self, self.subevent_code, parameters)
if fields := getattr(self, 'fields', None):
Expand Down Expand Up @@ -6092,8 +6118,9 @@ class HCI_Command_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.14 Command Complete Event
'''

return_parameters = b''
num_hci_command_packets: int
command_opcode: int
return_parameters = b''

def map_return_parameters(self, return_parameters):
'''Map simple 'status' return parameters to their named constant form'''
Expand Down
31 changes: 29 additions & 2 deletions bumble/vendor/android/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Dict, Optional, Type

from bumble.hci import (
name_or_number,
Expand All @@ -24,6 +25,9 @@
HCI_Constant,
HCI_Object,
HCI_Command,
HCI_Event,
HCI_Extended_Event,
HCI_VENDOR_EVENT,
STATUS_SPEC,
)

Expand Down Expand Up @@ -277,7 +281,29 @@ def opcode_name(cls, opcode):


# -----------------------------------------------------------------------------
@HCI_Vendor_Event.event(
class HCI_Android_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}

@classmethod
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
subevent_code = parameters[0]
if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT:
quality_report_id = parameters[1]
if quality_report_id in (0x01, 0x02, 0x03, 0x04, 0x07, 0x08, 0x09):
return HCI_Bluetooth_Quality_Report_Event.from_parameters(parameters)

return None


HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters


# -----------------------------------------------------------------------------
@HCI_Extended_Event.event(
fields=[
('quality_report_id', 1),
('packet_types', 1),
Expand Down Expand Up @@ -306,10 +332,11 @@ def opcode_name(cls, opcode):
('tx_last_subevent_packets', 4),
('crc_error_packets', 4),
('rx_duplicate_packets', 4),
('rx_unreceived_packets', 4),
('vendor_specific_parameters', '*'),
]
)
class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event):
class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event
Expand Down
18 changes: 15 additions & 3 deletions docs/mkdocs/src/drivers/intel.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,33 @@ INTEL DRIVER
This driver supports loading firmware images and optional config data to
Intel USB controllers.
A number of USB dongles are supported, but likely not all.
When using a USB dongle, the USB product ID and vendor ID are used
The initial implementation has been tested on BE200 and AX210 controllers.
When using a USB controller, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=intel`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=intel]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for those files by name, in order, in:
The driver will look for the firmware and config files by name, in order, in:

* The directory specified by the environment variable `BUMBLE_INTEL_FIRMWARE_DIR`
if set.
* The directory `<package-dir>/drivers/intel_fw` where `<package-dir>` is the directory
where the `bumble` package is installed.
* The current directory.

It is also possible to override or extend the config data with parameters passed via the
transport name. The driver name `intel` may be suffixed with `/<param:value>[+<param:value>]...`
The supported params are:
* `ddc_addon`: configuration data to add to the data loaded from the config data file
* `ddc_override`: configuration data to use instead of the data loaded from the config data file.

With both `dcc_addon` and `dcc_override`, the param value is a hex-encoded byte array containing
the config data (same format as the config file).
Example transport name:
`usb:[driver=intel/dcc_addon:03E40200]0`


Obtaining Firmware Images and Config Data
-----------------------------------------
Expand All @@ -35,7 +47,7 @@ Usage: bumble-intel-fw-download [OPTIONS]
Options:
--output-dir TEXT Output directory where the files will be saved.
Defaults to the OS-specificapp data dir, which the
Defaults to the OS-specific app data dir, which the
driver will check when trying to find firmware
--source [linux-kernel] [default: linux-kernel]
--single TEXT Only download a single image set, by its base name
Expand Down
38 changes: 23 additions & 15 deletions tools/intel_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import logging
import asyncio
import os
from typing import Any
from typing import Any, Optional

import click

Expand All @@ -45,33 +45,42 @@ def print_device_info(device_info: dict[intel.ValueType, Any]) -> None:
print(f" {color(key.name, 'green')}: {value}")


# -----------------------------------------------------------------------------
async def get_driver(host: Host, force: bool) -> Optional[intel.Driver]:
# Create a driver
driver = await intel.Driver.for_host(host, force)
if driver is None:
print("Device does not appear to be an Intel device")
return None

return driver


# -----------------------------------------------------------------------------
async def do_info(usb_transport, force):
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
# Create a host to communicate with the device
host = Host(hci_source, hci_sink)

# Create a driver
driver = await intel.Driver.for_host(host, force)
driver = await get_driver(host, force)
if driver is None:
return

# Get and print the device info
print_device_info(await driver.read_device_info())


# -----------------------------------------------------------------------------
async def do_load(usb_transport, force):
async def do_load(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
# Create a host to communicate with the device
host = Host(hci_source, hci_sink)

# Create a driver
driver = await intel.Driver.for_host(host, force)
driver = await get_driver(host, force)
if driver is None:
return

# Reboot in bootloader mode
await driver.load_firmware()
Expand All @@ -81,16 +90,15 @@ async def do_load(usb_transport, force):


# -----------------------------------------------------------------------------
async def do_bootloader(usb_transport, force):
async def do_bootloader(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
# Create a host to communicate with the device
host = Host(hci_source, hci_sink)

# Create a driver
driver = await intel.Driver.for_host(host, force)
driver = await get_driver(host, force)
if driver is None:
return

# Reboot in bootloader mode
await driver.reboot_bootloader()
Expand Down

0 comments on commit 48685c8

Please sign in to comment.