Skip to content

Commit

Permalink
ASCS: Add Source ASE operations
Browse files Browse the repository at this point in the history
  • Loading branch information
zxzxwu committed Jan 15, 2024
1 parent 46ceea7 commit 76ccc80
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 81 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"ansired",
"ansiyellow",
"appendleft",
"ascs",
"ASHA",
"asyncio",
"ATRAC",
Expand Down
46 changes: 31 additions & 15 deletions bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ def __init__(
} # Futures, by BD address OR [Futures] for Address.ANY
self.legacy_advertiser = None
self.extended_advertisers = {}
self.cis_lock = asyncio.Lock()

# Own address type cache
self.connect_own_address_type = None
Expand Down Expand Up @@ -3110,26 +3111,41 @@ def on_cis_establishment(cis_link: CisLink) -> None:
# [LE only]
@experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink:
result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Accept_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
"""[LE Only] Accepts an incoming CIS request.
When the specified CIS handle is already created, this method returns the
existed CIS link object immediately.
Args:
handle: CIS handle to accept.
Returns:
CIS link object on the given handle.
"""
async with self.cis_lock:
if cis_link := self.cis_links.get(handle):
return cis_link

result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
)
raise HCI_StatusError(result)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Accept_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)

pending_cis_establishment = asyncio.get_running_loop().create_future()
pending_cis_establishment = asyncio.get_running_loop().create_future()

with closing(EventWatcher()) as watcher:
with closing(EventWatcher()) as watcher:

@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if cis_link.handle == handle:
pending_cis_establishment.set_result(cis_link)
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if cis_link.handle == handle:
pending_cis_establishment.set_result(cis_link)

return await pending_cis_establishment
return await pending_cis_establishment

# [LE only]
@experimental('Only for testing.')
Expand Down
37 changes: 32 additions & 5 deletions bumble/profiles/bap.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ class FrameDuration(enum.IntEnum):
DURATION_7500_US = 0x00
DURATION_10000_US = 0x01

@property
def us(self) -> int:
return {
FrameDuration.DURATION_7500_US: 7500,
FrameDuration.DURATION_10000_US: 10000,
}[self]


class SupportedFrameDuration(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration'''
Expand Down Expand Up @@ -833,15 +840,23 @@ def on_cis_request(
cig_id: int,
cis_id: int,
) -> None:
if cis_id == self.cis_id and self.state == self.State.ENABLING:
if (
cig_id == self.cig_id
and cis_id == self.cis_id
and self.state == self.State.ENABLING
):
acl_connection.abort_on(
'flush', self.service.device.accept_cis_request(cis_handle)
)

def on_cis_establishment(self, cis_link: device.CisLink) -> None:
if cis_link.cis_id == self.cis_id and self.state == self.State.ENABLING:
self.state = self.State.STREAMING
if (
cis_link.cig_id == self.cig_id
and cis_link.cis_id == self.cis_id
and self.state == self.State.ENABLING
):
self.cis_link = cis_link
self.cis_link.on('disconnection', self.on_cis_disconnection)

async def post_cis_established():
await self.service.device.send_command(
Expand All @@ -854,10 +869,15 @@ async def post_cis_established():
codec_configuration=b'',
)
)
if self.role == AudioRole.SINK:
self.state = self.State.STREAMING
await self.service.device.notify_subscribers(self, self.value)

cis_link.acl_connection.abort_on('flush', post_cis_established())

def on_cis_disconnection(self, _reason) -> None:
self.cis_link = None

def on_config_codec(
self,
target_latency: int,
Expand Down Expand Up @@ -954,11 +974,17 @@ def on_disable(self) -> Tuple[AseResponseCode, AseReasonCode]:
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.state = self.State.DISABLING
if self.role == AudioRole.SINK:
self.state = self.State.QOS_CONFIGURED
else:
self.state = self.State.DISABLING
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)

def on_receiver_stop_ready(self) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state != AseStateMachine.State.DISABLING:
if (
self.role != AudioRole.SOURCE
or self.state != AseStateMachine.State.DISABLING
):
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
Expand Down Expand Up @@ -1009,6 +1035,7 @@ def state(self) -> State:
def state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}')
self._state = new_state
self.emit('state_change', new_state)

@property
def value(self):
Expand Down
166 changes: 105 additions & 61 deletions examples/run_unicast_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import os
import struct
import secrets
import functools
from bumble.core import AdvertisingData
from bumble.device import Device, CisLink
from bumble.device import Device
from bumble.hci import (
CodecID,
CodingFormat,
Expand All @@ -39,6 +40,8 @@
PacRecord,
PublishedAudioCapabilitiesService,
AudioStreamControlService,
AudioRole,
AseStateMachine,
)
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
Expand Down Expand Up @@ -70,10 +73,10 @@ async def main() -> None:
device.add_service(CommonAudioServiceService(csis))
device.add_service(
PublishedAudioCapabilitiesService(
supported_source_context=ContextType.PROHIBITED,
available_source_context=ContextType.PROHIBITED,
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
supported_source_context=ContextType.CONVERSATIONAL,
available_source_context=ContextType.CONVERSATIONAL,
supported_sink_context=ContextType.MEDIA | ContextType.CONVERSATIONAL,
available_sink_context=ContextType.MEDIA | ContextType.CONVERSATIONAL,
sink_audio_locations=(
AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT
),
Expand Down Expand Up @@ -111,73 +114,114 @@ async def main() -> None:
),
),
],
source_audio_locations=(AudioLocation.FRONT_LEFT),
source_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
],
)
)

device.add_service(AudioStreamControlService(device, sink_ase_id=[1, 2]))

advertising_data = (
bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
ascs = AudioStreamControlService(
device,
sink_ase_id=[1, 2],
source_ase_id=[3],
)
device.add_service(ascs)

advertising_data = bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
(
AdvertisingData.FLAGS,
bytes(
[
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
| AdvertisingData.BR_EDR_HOST_FLAG
| AdvertisingData.BR_EDR_CONTROLLER_FLAG
]
),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
)
)

def on_streaming(ase: AseStateMachine):
print('on_streaming')

async def on_cis_async():
print('on_cis_async')
subprocess = await asyncio.create_subprocess_shell(
f'dlc3 | ffplay pipe:0',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

stdin = subprocess.stdin
assert stdin

# Write a fake LC3 header to dlc3.
stdin.write(
bytes([0x1C, 0xCC]) # Header.
+ struct.pack(
'<HHHHHHI',
18, # Header length.
(
AdvertisingData.FLAGS,
bytes(
[
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
| AdvertisingData.BR_EDR_HOST_FLAG
| AdvertisingData.BR_EDR_CONTROLLER_FLAG
]
),
),
ase.codec_specific_configuration.sampling_frequency.hz
// 100
), # Sampling Rate(/100Hz).
0, # Bitrate(unused).
ase.codec_specific_configuration.audio_channel_allocation, # Channels.
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
ase.codec_specific_configuration.frame_duration.us // 10
), # Frame duration(/10us).
0, # RFU.
0x0FFFFFFF, # Frame counts.
)
)
)
+ csis.get_advertising_data()
)
subprocess = await asyncio.create_subprocess_shell(
f'dlc3 | ffplay pipe:0',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

stdin = subprocess.stdin
assert stdin

# Write a fake LC3 header to dlc3.
stdin.write(
bytes([0x1C, 0xCC]) # Header.
+ struct.pack(
'<HHHHHHI',
18, # Header length.
24000 // 100, # Sampling Rate(/100Hz).
0, # Bitrate(unused).
1, # Channels.
10000 // 10, # Frame duration(/10us).
0, # RFU.
0x0FFFFFFF, # Frame counts.
)
)
def on_pdu(pdu: HCI_IsoDataPacket):
# LC3 format: |frame_length(2)| + |frame(length)|.
if pdu.iso_sdu_length:
stdin.write(struct.pack('<H', pdu.iso_sdu_length))
stdin.write(pdu.iso_sdu_fragment)

ase.cis_link.on('pdu', on_pdu)

device.abort_on('flush', on_cis_async())

def on_pdu(pdu: HCI_IsoDataPacket):
# LC3 format: |frame_length(2)| + |frame(length)|.
if pdu.iso_sdu_length:
stdin.write(struct.pack('<H', pdu.iso_sdu_length))
stdin.write(pdu.iso_sdu_fragment)
for ase in ascs.ase_state_machines.values():
if ase.role == AudioRole.SINK:

def on_cis(cis_link: CisLink):
cis_link.on('pdu', on_pdu)
def on_state_change(*_, ase: AseStateMachine):
print(ase)
if ase.state == AseStateMachine.State.STREAMING:
on_streaming(ase)

device.once('cis_establishment', on_cis)
ase.on('state_change', functools.partial(on_state_change, ase=ase))

await device.start_extended_advertising(
advertising_properties=(
Expand Down

0 comments on commit 76ccc80

Please sign in to comment.