Skip to content

Commit

Permalink
Add advertiser classes and handle adv set terminated events
Browse files Browse the repository at this point in the history
* Convert hci.OwnAddressType to enum
* Add LegacyAdvertiser and ExtendedAdvertiser classes
* Rename start/stop_advertising() => start/stop_legacy_advertising()
* Handle HCI_Advertising_Set_Terminated
* Properly restart advertisement on disconnection
  • Loading branch information
zxzxwu committed Dec 6, 2023
1 parent 88b4cbd commit c244d2d
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 63 deletions.
193 changes: 149 additions & 44 deletions bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,31 @@ def is_directed(self):
)


# -----------------------------------------------------------------------------
@dataclass
class LegacyAdvertiser:
advertising_type: AdvertisingType
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]


# -----------------------------------------------------------------------------
@dataclass
class ExtendedAdvertiser:
device: Device
handle: int
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]

async def stop(self) -> None:
self.device.stop_extended_advertising(self.handle)


# -----------------------------------------------------------------------------
class LePhyOptions:
# Coded PHY preference
Expand Down Expand Up @@ -658,6 +683,9 @@ class Connection(CompositeEventEmitter):
gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
advertiser_after_disconnection: Union[
LegacyAdvertiser, ExtendedAdvertiser, None
] = None

@composite_listener
class Listener:
Expand Down Expand Up @@ -1063,7 +1091,8 @@ class Device(CompositeEventEmitter):
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
extended_advertising_handles: Set[int]
legacy_advertiser: Optional[LegacyAdvertiser]
extended_advertisers: Dict[int, ExtendedAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]]
Expand Down Expand Up @@ -1141,10 +1170,7 @@ def __init__(

self._host = None
self.powered_on = False
self.advertising = False
self.advertising_type = None
self.auto_restart_inquiry = True
self.auto_restart_advertising = False
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self)
Expand All @@ -1168,10 +1194,10 @@ def __init__(
self.classic_pending_accepts = {
Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set()
self.legacy_advertiser = None
self.extended_advertisers = {}

# Own address type cache
self.advertising_own_address_type = None
self.connect_own_address_type = None

# Use the initial config or a default
Expand Down Expand Up @@ -1586,15 +1612,36 @@ async def start_advertising(
own_address_type: int = OwnAddressType.RANDOM,
auto_restart: bool = False,
) -> None:
await self.start_legacy_advertising(
advertising_type=advertising_type,
target=target,
own_address_type=OwnAddressType(own_address_type),
auto_restart=auto_restart,
)

async def start_legacy_advertising(
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[Address] = None,
own_address_type: OwnAddressType = OwnAddressType.RANDOM,
auto_restart: bool = False,
advertising_data: Optional[bytes] = None,
scan_response_data: Optional[bytes] = None,
) -> LegacyAdvertiser:
if self.extended_advertisers:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)

# If we're advertising, stop first
if self.advertising:
if self.legacy_advertiser:
await self.stop_advertising()

# Set/update the advertising data if the advertising type allows it
if advertising_type.has_data:
await self.send_command(
HCI_LE_Set_Advertising_Data_Command(
advertising_data=self.advertising_data
advertising_data=advertising_data or self.advertising_data or b''
),
check_result=True,
)
Expand All @@ -1603,7 +1650,9 @@ async def start_advertising(
if advertising_type.is_scannable:
await self.send_command(
HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data=self.scan_response_data
scan_response_data=scan_response_data
or self.scan_response_data
or b''
),
check_result=True,
)
Expand Down Expand Up @@ -1640,33 +1689,38 @@ async def start_advertising(
check_result=True,
)

self.advertising_type = advertising_type
self.advertising_own_address_type = own_address_type
self.advertising = True
self.auto_restart_advertising = auto_restart
self.legacy_advertiser = LegacyAdvertiser(
advertising_type=advertising_type,
own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return self.legacy_advertiser

async def stop_advertising(self) -> None:
await self.stop_legacy_advertising()

async def stop_legacy_advertising(self) -> None:
# Disable advertising
if self.advertising:
if self.legacy_advertiser:
await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
check_result=True,
)

self.advertising_type = None
self.advertising_own_address_type = None
self.advertising = False
self.auto_restart_advertising = False
self.legacy_advertiser = None

@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising(
self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None,
own_address_type: OwnAddressType = OwnAddressType.RANDOM,
scan_response_data: Optional[bytes] = None,
advertising_data: Optional[bytes] = None,
) -> int:
auto_restart: bool = True,
) -> ExtendedAdvertiser:
"""Starts an extended advertising set.
Args:
Expand All @@ -1679,14 +1733,18 @@ async def start_extended_advertising(
Returns:
Handle of the new advertising set.
"""
if self.legacy_advertiser:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)

adv_handle = -1
# Find a free handle
for i in range(
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
):
if i not in self.extended_advertising_handles:
if i not in self.extended_advertisers.keys():
adv_handle = i
break

Expand Down Expand Up @@ -1733,13 +1791,13 @@ async def start_extended_advertising(
)

# Set the scan response if present
if scan_response is not None:
if scan_response_data is not None:
await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
scan_response_data=scan_response_data,
),
check_result=True,
)
Expand Down Expand Up @@ -1774,8 +1832,16 @@ async def start_extended_advertising(
)
raise error

self.extended_advertising_handles.add(adv_handle)
return adv_handle
advertiser = self.extended_advertisers[adv_handle] = ExtendedAdvertiser(
device=self,
handle=adv_handle,
advertising_properties=advertising_properties,
own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return advertiser

@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None:
Expand All @@ -1799,11 +1865,11 @@ async def stop_extended_advertising(self, adv_handle: int) -> None:
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle),
check_result=True,
)
self.extended_advertising_handles.remove(adv_handle)
del self.extended_advertisers[adv_handle]

@property
def is_advertising(self):
return self.advertising
return self.legacy_advertiser or self.extended_advertisers

async def start_scanning(
self,
Expand Down Expand Up @@ -3144,13 +3210,17 @@ def on_connection(
# Guess which own address type is used for this connection.
# This logic is somewhat correct but may need to be improved
# when multiple advertising are run simultaneously.
advertiser = None
if self.connect_own_address_type is not None:
own_address_type = self.connect_own_address_type
elif self.legacy_advertiser:
own_address_type = self.legacy_advertiser.own_address_type
# Store advertiser for restarting - it's only required for legacy, since
# extended advertisement produces HCI_Advertising_Set_Terminated.
advertiser = self.legacy_advertiser
else:
own_address_type = self.advertising_own_address_type

# We are no longer advertising
self.advertising = False
# For extended advertisement, determining own address type later.
own_address_type = OwnAddressType.RANDOM

if own_address_type in (
OwnAddressType.PUBLIC,
Expand All @@ -3172,6 +3242,7 @@ def on_connection(
connection_parameters,
ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
)
connection.advertiser_after_disconnection = advertiser
self.connections[connection_handle] = connection

# If supported, read which PHY we're connected with before
Expand Down Expand Up @@ -3203,10 +3274,10 @@ def on_connection_failure(self, transport, peer_address, error_code):
# For directed advertising, this means a timeout
if (
transport == BT_LE_TRANSPORT
and self.advertising
and self.advertising_type.is_directed
and self.legacy_advertiser
and self.legacy_advertiser.advertising_type.is_directed
):
self.advertising = False
self.legacy_advertiser = None

# Notify listeners
error = core.ConnectionError(
Expand Down Expand Up @@ -3268,16 +3339,28 @@ def on_disconnection(self, connection_handle: int, reason: int) -> None:
self.gatt_server.on_disconnection(connection)

# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
if advertiser := connection.advertiser_after_disconnection:
logger.debug('restarting advertising')
self.abort_on(
'flush',
self.start_advertising(
advertising_type=self.advertising_type, # type: ignore[arg-type]
own_address_type=self.advertising_own_address_type, # type: ignore[arg-type]
auto_restart=True,
),
)
if isinstance(advertiser, LegacyAdvertiser):
self.abort_on(
'flush',
self.start_legacy_advertising(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=True,
),
)
elif isinstance(advertiser, ExtendedAdvertiser):
self.abort_on(
'flush',
self.start_extended_advertising(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
auto_restart=True,
),
)
elif sco_link := self.sco_links.pop(connection_handle, None):
sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None):
Expand Down Expand Up @@ -3600,6 +3683,28 @@ def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> N
if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet)

# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_advertising_set_terminated(
self,
status: int,
advertising_handle: int,
connection_handle: int,
) -> None:
if status == HCI_SUCCESS:
connection = self.lookup_connection(connection_handle)
if advertiser := self.extended_advertisers.pop(advertising_handle, None):
if connection:
connection.advertiser_after_disconnection = advertiser
if advertiser.own_address_type in (
OwnAddressType.PUBLIC,
OwnAddressType.RESOLVABLE_OR_PUBLIC,
):
connection.self_address = self.public_address
else:
connection.self_address = self.random_address

# [LE only]
@host_event_handler
@with_connection_from_handle
Expand Down
Loading

0 comments on commit c244d2d

Please sign in to comment.