Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support multiple event factories #618

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions bumble/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -5070,7 +5070,7 @@ class HCI_Event(HCI_Packet):
hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None
vendor_factories: list[Callable[[bytes], Optional[HCI_Event]]] = []

@staticmethod
def event(fields=()):
Expand Down Expand Up @@ -5128,6 +5128,19 @@ def registered(event_class):

return event_class

@classmethod
def add_vendor_factory(
cls, factory: Callable[[bytes], Optional[HCI_Event]]
) -> None:
cls.vendor_factories.append(factory)

@classmethod
def remove_vendor_factory(
cls, factory: Callable[[bytes], Optional[HCI_Event]]
) -> None:
if factory in cls.vendor_factories:
cls.vendor_factories.remove(factory)

@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Event:
event_code = packet[1]
Expand All @@ -5148,13 +5161,13 @@ def from_bytes(cls, packet: bytes) -> HCI_Event:
elif event_code == HCI_VENDOR_EVENT:
# Invoke all the registered factories to see if any of them can handle
# the event
if cls.vendor_factory:
if event := cls.vendor_factory(parameters):
for vendor_factory in cls.vendor_factories:
if event := vendor_factory(parameters):
return event

# No factory, or the factory could not create an instance,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should here return HCI_Vendor_Event instead of HCI_Event?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done. I've also added a test for it.

# return a generic vendor event
return HCI_Event(event_code, parameters)
return HCI_Vendor_Event(data=parameters)
else:
subclass = HCI_Event.event_classes.get(event_code)
if subclass is None:
Expand Down
2 changes: 1 addition & 1 deletion bumble/vendor/android/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def subclass_from_parameters(


HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters
HCI_Event.add_vendor_factory(HCI_Android_Vendor_Event.subclass_from_parameters)


# -----------------------------------------------------------------------------
Expand Down
39 changes: 39 additions & 0 deletions tests/hci_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct

from bumble.hci import (
HCI_DISCONNECT_COMMAND,
HCI_LE_1M_PHY_BIT,
HCI_LE_CODED_PHY_BIT,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND,
HCI_VENDOR_EVENT,
HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
Expand Down Expand Up @@ -67,6 +69,7 @@
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_Vendor_Event,
)


Expand Down Expand Up @@ -213,6 +216,41 @@ def test_HCI_Number_Of_Completed_Packets_Event():
basic_check(event)


# -----------------------------------------------------------------------------
def test_HCI_Vendor_Event():
data = bytes.fromhex('01020304')
event = HCI_Vendor_Event(data=data)
event_bytes = bytes(event)
parsed = HCI_Packet.from_bytes(event_bytes)
assert isinstance(parsed, HCI_Vendor_Event)
assert parsed.data == data

class HCI_Custom_Event(HCI_Event):
def __init__(self, blabla):
super().__init__(HCI_VENDOR_EVENT, parameters=struct.pack("<I", blabla))
self.name = 'HCI_CUSTOM_EVENT'
self.blabla = blabla

def create_event(payload):
if payload[0] == 1:
return HCI_Custom_Event(blabla=struct.unpack('<I', payload)[0])
return None

HCI_Event.add_vendor_factory(create_event)
parsed = HCI_Packet.from_bytes(event_bytes)
assert isinstance(parsed, HCI_Custom_Event)
assert parsed.blabla == 0x04030201
event_bytes2 = event_bytes[:3] + bytes([7]) + event_bytes[4:]
parsed = HCI_Packet.from_bytes(event_bytes2)
assert not isinstance(parsed, HCI_Custom_Event)
assert isinstance(parsed, HCI_Vendor_Event)
HCI_Event.remove_vendor_factory(create_event)

parsed = HCI_Packet.from_bytes(event_bytes)
assert not isinstance(parsed, HCI_Custom_Event)
assert isinstance(parsed, HCI_Vendor_Event)


# -----------------------------------------------------------------------------
def test_HCI_Command():
command = HCI_Command(0x5566)
Expand Down Expand Up @@ -576,6 +614,7 @@ def run_test_events():
test_HCI_Command_Complete_Event()
test_HCI_Command_Status_Event()
test_HCI_Number_Of_Completed_Packets_Event()
test_HCI_Vendor_Event()


# -----------------------------------------------------------------------------
Expand Down
Loading