Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add advertiser classes and handle adv set terminated events #366

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 180 additions & 46 deletions bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,38 @@ def is_directed(self):
)


# -----------------------------------------------------------------------------
@dataclass
class LegacyAdvertiser:
device: Device
advertising_type: AdvertisingType
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]

async def stop(self) -> None:
await self.device.stop_legacy_advertising()


# -----------------------------------------------------------------------------
@dataclass
class ExtendedAdvertiser(CompositeEventEmitter):
device: Device
handle: int
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]

def __post_init__(self) -> None:
super().__init__()

async def stop(self) -> None:
await self.device.stop_extended_advertising(self.handle)


# -----------------------------------------------------------------------------
class LePhyOptions:
# Coded PHY preference
Expand Down Expand Up @@ -658,6 +690,9 @@ class Connection(CompositeEventEmitter):
gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
advertiser_after_disconnection: Union[
LegacyAdvertiser, ExtendedAdvertiser, None
] = None

@composite_listener
class Listener:
Expand Down Expand Up @@ -1063,7 +1098,8 @@ class Device(CompositeEventEmitter):
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
extended_advertising_handles: Set[int]
legacy_advertiser: Optional[LegacyAdvertiser]
extended_advertisers: Dict[int, ExtendedAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]]
Expand Down Expand Up @@ -1141,10 +1177,7 @@ def __init__(

self._host = None
self.powered_on = False
self.advertising = False
self.advertising_type = None
self.auto_restart_inquiry = True
self.auto_restart_advertising = False
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self)
Expand All @@ -1168,10 +1201,10 @@ def __init__(
self.classic_pending_accepts = {
Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set()
self.legacy_advertiser = None
self.extended_advertisers = {}

# Own address type cache
self.advertising_own_address_type = None
self.connect_own_address_type = None

# Use the initial config or a default
Expand Down Expand Up @@ -1579,22 +1612,57 @@ def supports_le_phy(self, phy):

return self.host.supports_le_feature(feature_map[phy])

@deprecated("Please use start_legacy_advertising.")
async def start_advertising(
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[Address] = None,
own_address_type: int = OwnAddressType.RANDOM,
auto_restart: bool = False,
) -> None:
await self.start_legacy_advertising(
advertising_type=advertising_type,
target=target,
own_address_type=OwnAddressType(own_address_type),
auto_restart=auto_restart,
)

async def start_legacy_advertising(
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[Address] = None,
own_address_type: OwnAddressType = OwnAddressType.RANDOM,
auto_restart: bool = False,
advertising_data: Optional[bytes] = None,
scan_response_data: Optional[bytes] = None,
) -> LegacyAdvertiser:
"""Starts an legacy advertisement.

Args:
advertising_type: Advertising type passed to HCI_LE_Set_Advertising_Parameters_Command.
target: Directed advertising target. Directed type should be set in advertising_type arg.
own_address_type: own address type to use in the advertising.
auto_restart: whether the advertisement will be restarted after disconnection.
scan_response_data: raw scan response.
advertising_data: raw advertising data.

Returns:
LegacyAdvertiser object containing the metadata of advertisement.
"""
if self.extended_advertisers:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)

# If we're advertising, stop first
if self.advertising:
if self.legacy_advertiser:
await self.stop_advertising()

# Set/update the advertising data if the advertising type allows it
if advertising_type.has_data:
await self.send_command(
HCI_LE_Set_Advertising_Data_Command(
advertising_data=self.advertising_data
advertising_data=advertising_data or self.advertising_data or b''
),
check_result=True,
)
Expand All @@ -1603,7 +1671,9 @@ async def start_advertising(
if advertising_type.is_scannable:
await self.send_command(
HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data=self.scan_response_data
scan_response_data=scan_response_data
or self.scan_response_data
or b''
),
check_result=True,
)
Expand Down Expand Up @@ -1640,53 +1710,65 @@ async def start_advertising(
check_result=True,
)

self.advertising_type = advertising_type
self.advertising_own_address_type = own_address_type
self.advertising = True
self.auto_restart_advertising = auto_restart
self.legacy_advertiser = LegacyAdvertiser(
device=self,
advertising_type=advertising_type,
own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return self.legacy_advertiser

@deprecated("Please use stop_legacy_advertising.")
async def stop_advertising(self) -> None:
await self.stop_legacy_advertising()

async def stop_legacy_advertising(self) -> None:
# Disable advertising
if self.advertising:
if self.legacy_advertiser:
await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
check_result=True,
)

self.advertising_type = None
self.advertising_own_address_type = None
self.advertising = False
self.auto_restart_advertising = False
self.legacy_advertiser = None

@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising(
self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None,
own_address_type: OwnAddressType = OwnAddressType.RANDOM,
auto_restart: bool = True,
advertising_data: Optional[bytes] = None,
) -> int:
scan_response_data: Optional[bytes] = None,
) -> ExtendedAdvertiser:
"""Starts an extended advertising set.

Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
auto_restart: whether the advertisement will be restarted after disconnection.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
scan_response_data: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.

Returns:
Handle of the new advertising set.
ExtendedAdvertiser object containing the metadata of advertisement.
"""
if self.legacy_advertiser:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)

adv_handle = -1
# Find a free handle
for i in range(
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
):
if i not in self.extended_advertising_handles:
if i not in self.extended_advertisers:
adv_handle = i
break

Expand Down Expand Up @@ -1733,13 +1815,13 @@ async def start_extended_advertising(
)

# Set the scan response if present
if scan_response is not None:
if scan_response_data is not None:
await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
scan_response_data=scan_response_data,
),
check_result=True,
)
Expand Down Expand Up @@ -1774,8 +1856,16 @@ async def start_extended_advertising(
)
raise error

self.extended_advertising_handles.add(adv_handle)
return adv_handle
advertiser = self.extended_advertisers[adv_handle] = ExtendedAdvertiser(
device=self,
handle=adv_handle,
advertising_properties=advertising_properties,
own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return advertiser

@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None:
Expand All @@ -1799,11 +1889,11 @@ async def stop_extended_advertising(self, adv_handle: int) -> None:
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle),
check_result=True,
)
self.extended_advertising_handles.remove(adv_handle)
del self.extended_advertisers[adv_handle]

@property
def is_advertising(self):
return self.advertising
return self.legacy_advertiser or self.extended_advertisers

async def start_scanning(
self,
Expand Down Expand Up @@ -3144,13 +3234,18 @@ def on_connection(
# Guess which own address type is used for this connection.
# This logic is somewhat correct but may need to be improved
# when multiple advertising are run simultaneously.
advertiser = None
if self.connect_own_address_type is not None:
own_address_type = self.connect_own_address_type
elif self.legacy_advertiser:
own_address_type = self.legacy_advertiser.own_address_type
# Store advertiser for restarting - it's only required for legacy, since
# extended advertisement produces HCI_Advertising_Set_Terminated.
if self.legacy_advertiser.auto_restart:
advertiser = self.legacy_advertiser
else:
own_address_type = self.advertising_own_address_type

# We are no longer advertising
self.advertising = False
# For extended advertisement, determining own address type later.
own_address_type = OwnAddressType.RANDOM

if own_address_type in (
OwnAddressType.PUBLIC,
Expand All @@ -3172,6 +3267,7 @@ def on_connection(
connection_parameters,
ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
)
connection.advertiser_after_disconnection = advertiser
self.connections[connection_handle] = connection

# If supported, read which PHY we're connected with before
Expand Down Expand Up @@ -3203,10 +3299,10 @@ def on_connection_failure(self, transport, peer_address, error_code):
# For directed advertising, this means a timeout
if (
transport == BT_LE_TRANSPORT
and self.advertising
and self.advertising_type.is_directed
and self.legacy_advertiser
and self.legacy_advertiser.advertising_type.is_directed
):
self.advertising = False
self.legacy_advertiser = None

# Notify listeners
error = core.ConnectionError(
Expand Down Expand Up @@ -3268,16 +3364,30 @@ def on_disconnection(self, connection_handle: int, reason: int) -> None:
self.gatt_server.on_disconnection(connection)

# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
if advertiser := connection.advertiser_after_disconnection:
logger.debug('restarting advertising')
self.abort_on(
'flush',
self.start_advertising(
advertising_type=self.advertising_type, # type: ignore[arg-type]
own_address_type=self.advertising_own_address_type, # type: ignore[arg-type]
auto_restart=True,
),
)
if isinstance(advertiser, LegacyAdvertiser):
self.abort_on(
'flush',
self.start_legacy_advertising(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
auto_restart=True,
),
)
elif isinstance(advertiser, ExtendedAdvertiser):
self.abort_on(
'flush',
self.start_extended_advertising(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
auto_restart=True,
),
)
elif sco_link := self.sco_links.pop(connection_handle, None):
sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None):
Expand Down Expand Up @@ -3600,6 +3710,30 @@ def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> N
if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet)

# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_advertising_set_termination(
self,
status: int,
advertising_handle: int,
connection_handle: int,
) -> None:
if status == HCI_SUCCESS:
connection = self.lookup_connection(connection_handle)
if advertiser := self.extended_advertisers.pop(advertising_handle, None):
if connection:
if advertiser.auto_restart:
connection.advertiser_after_disconnection = advertiser
if advertiser.own_address_type in (
OwnAddressType.PUBLIC,
OwnAddressType.RESOLVABLE_OR_PUBLIC,
):
connection.self_address = self.public_address
else:
connection.self_address = self.random_address
advertiser.emit('termination', status)

# [LE only]
@host_event_handler
@with_connection_from_handle
Expand Down
Loading