Skip to content

Commit

Permalink
CS commands and events
Browse files Browse the repository at this point in the history
  • Loading branch information
zxzxwu committed Jan 6, 2025
1 parent fe3fa3d commit 79c86d3
Show file tree
Hide file tree
Showing 5 changed files with 756 additions and 3 deletions.
74 changes: 74 additions & 0 deletions bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,31 @@ async def terminate(self) -> None:
await terminated.wait()


# -----------------------------------------------------------------------------
@dataclass
class ChannelSoundingCapabilities:
num_config_supported: int
max_consecutive_procedures_supported: int
num_antennas_supported: int
max_antenna_paths_supported: int
roles_supported: int
modes_supported: int
rtt_capability: int
rtt_aa_only_n: int
rtt_sounding_n: int
rtt_random_payload_n: int
nadm_sounding_capability: int
nadm_random_capability: int
cs_sync_phys_supported: int
subfeatures_supported: int
t_ip1_times_supported: int
t_ip2_times_supported: int
t_fcs_times_supported: int
t_pm_times_supported: int
t_sw_time_supported: int
tx_snr_capability: int


# -----------------------------------------------------------------------------
class LePhyOptions:
# Coded PHY preference
Expand Down Expand Up @@ -4481,6 +4506,55 @@ def on_failure(handle: int, status: int):
)
return await read_feature_future

async def get_remote_cs_capabilities(
self, connection: Connection
) -> ChannelSoundingCapabilities:
with closing(EventWatcher()) as watcher:
complete_future: asyncio.Future[ChannelSoundingCapabilities] = (
asyncio.get_running_loop().create_future()
)

def on_event(
event: hci.HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event,
):
if event.connection_handle != connection.handle:
return
if event.status != hci.HCI_SUCCESS:
complete_future.set_exception(hci.HCI_Error(event.status))
complete_future.set_result(
ChannelSoundingCapabilities(
num_config_supported=event.num_config_supported,
max_consecutive_procedures_supported=event.max_consecutive_procedures_supported,
num_antennas_supported=event.num_antennas_supported,
max_antenna_paths_supported=event.max_antenna_paths_supported,
roles_supported=event.roles_supported,
modes_supported=event.modes_supported,
rtt_capability=event.rtt_capability,
rtt_aa_only_n=event.rtt_aa_only_n,
rtt_sounding_n=event.rtt_sounding_n,
rtt_random_payload_n=event.rtt_random_payload_n,
nadm_sounding_capability=event.nadm_sounding_capability,
nadm_random_capability=event.nadm_random_capability,
cs_sync_phys_supported=event.cs_sync_phys_supported,
subfeatures_supported=event.subfeatures_supported,
t_ip1_times_supported=event.t_ip1_times_supported,
t_ip2_times_supported=event.t_ip2_times_supported,
t_fcs_times_supported=event.t_fcs_times_supported,
t_pm_times_supported=event.t_pm_times_supported,
t_sw_time_supported=event.t_sw_time_supported,
tx_snr_capability=event.tx_snr_capability,
)
)

watcher.on(self.host, 'cs_read_remote_supported_capabilities', on_event)
await self.send_command(
hci.HCI_LE_CS_Read_Remote_Supported_Capabilities_Command(
connection_handle=connection.handle
),
check_result=True,
)
return await complete_future

@host_event_handler
def on_flush(self):
self.emit('flush')
Expand Down
Loading

0 comments on commit 79c86d3

Please sign in to comment.