diff --git a/.vscode/settings.json b/.vscode/settings.json index 93e9ece33..dec6c267c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -72,6 +72,7 @@ "substates", "tobytes", "tsep", + "UNMUTE", "usbmodem", "vhci", "websockets", diff --git a/bumble/profiles/vcp.py b/bumble/profiles/vcp.py new file mode 100644 index 000000000..7d234993e --- /dev/null +++ b/bumble/profiles/vcp.py @@ -0,0 +1,211 @@ +# Copyright 2021-2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import enum + +from bumble import att +from bumble import device +from bumble import gatt +from bumble import gatt_client + +from typing import Optional + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- + +MIN_VOLUME = 0 +MAX_VOLUME = 255 + + +class ErrorCode(enum.IntEnum): + ''' + See Volume Control Service 1.6. Application error codes. + ''' + + INVALID_CHANGE_COUNTER = 0x80 + OPCODE_NOT_SUPPORTED = 0x81 + + +class VolumeFlags(enum.IntFlag): + ''' + See Volume Control Service 3.3. Volume Flags. + ''' + + VOLUME_SETTING_PERSISTED = 0x01 + # RFU + + +class VolumeControlPointOpcode(enum.IntEnum): + ''' + See Volume Control Service Table 3.3: Volume Control Point procedure requirements. + ''' + + # fmt: off + RELATIVE_VOLUME_DOWN = 0x00 + RELATIVE_VOLUME_UP = 0x01 + UNMUTE_RELATIVE_VOLUME_DOWN = 0x02 + UNMUTE_RELATIVE_VOLUME_UP = 0x03 + SET_ABSOLUTE_VOLUME = 0x04 + UNMUTE = 0x05 + MUTE = 0x06 + + +# ----------------------------------------------------------------------------- +# Server +# ----------------------------------------------------------------------------- +class VolumeControlService(gatt.TemplateService): + UUID = gatt.GATT_VOLUME_CONTROL_SERVICE + + volume_state: gatt.Characteristic + volume_control_point: gatt.Characteristic + volume_flags: gatt.Characteristic + + volume_setting: int + muted: int + change_counter: int + + def __init__( + self, + step_size: int = 16, + volume_setting: int = 0, + muted: int = 0, + change_counter: int = 0, + volume_flags: int = 0, + ) -> None: + self.step_size = step_size + self.volume_setting = volume_setting + self.muted = muted + self.change_counter = change_counter + + self.volume_state = gatt.Characteristic( + uuid=gatt.GATT_VOLUME_STATE_CHARACTERISTIC, + properties=( + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY + ), + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, + value=gatt.CharacteristicValue( + read=lambda *_: bytes( + [self.volume_setting, self.muted, self.change_counter] + ) + ), + ) + self.volume_control_point = gatt.Characteristic( + uuid=gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.WRITE, + permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, + value=gatt.CharacteristicValue(write=self._on_write_volume_control_point), + ) + self.volume_flags = gatt.Characteristic( + uuid=gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.READ, + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, + value=bytes([volume_flags]), + ) + + super().__init__( + [ + self.volume_state, + self.volume_control_point, + self.volume_flags, + ] + ) + + def _on_write_volume_control_point( + self, connection: Optional[device.Connection], value: bytes + ) -> None: + assert connection + + opcode = VolumeControlPointOpcode(value[0]) + change_counter = value[1] + + if change_counter != self.change_counter: + raise att.ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER) + + handler = getattr(self, '_on_' + opcode.name) + if handler(*value[2:]): + self.change_counter = (self.change_counter + 1) % 256 + connection.abort_on( + 'disconnection', + connection.device.notify_subscriber(connection, self.volume_setting), + ) + + def _on_relative_volume_down(self) -> bool: + old_volume = self.volume_setting + self.volume_setting = min(self.volume_setting - self.step_size, MIN_VOLUME) + return self.volume_setting != old_volume + + def _on_relative_volume_up(self) -> bool: + old_volume = self.volume_setting + self.volume_setting = max(self.volume_setting + self.step_size, MAX_VOLUME) + return self.volume_setting != old_volume + + def _on_unmute_relative_volume_down(self) -> bool: + old_volume, old_muted_state = self.volume_setting, self.muted + self.volume_setting = min(self.volume_setting - self.step_size, MIN_VOLUME) + self.muted = 0 + return (self.volume_setting, self.muted) != (old_volume, old_muted_state) + + def _on_unmute_relative_volume_up(self) -> bool: + old_volume, old_muted_state = self.volume_setting, self.muted + self.volume_setting = max(self.volume_setting + self.step_size, MAX_VOLUME) + self.muted = 0 + return (self.volume_setting, self.muted) != (old_volume, old_muted_state) + + def _on_set_absolute_volume(self, volume_setting) -> bool: + old_volume_setting = self.volume_setting + self.volume_setting = volume_setting + return old_volume_setting != self.volume_setting + + def _on_unmute(self) -> bool: + old_muted_state = self.muted + self.muted = 0 + return self.muted != old_muted_state + + def _on_mute(self) -> bool: + old_muted_state = self.muted + self.muted = 1 + return self.muted != old_muted_state + + +# ----------------------------------------------------------------------------- +# Client +# ----------------------------------------------------------------------------- +class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy): + SERVICE_CLASS = VolumeControlService + + volume_state: gatt_client.CharacteristicProxy + volume_control_point: gatt_client.CharacteristicProxy + volume_flags: gatt_client.CharacteristicProxy + + def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: + self.service_proxy = service_proxy + + self.volume_state = service_proxy.get_characteristics_by_uuid( + gatt.GATT_VOLUME_STATE_CHARACTERISTIC + )[0] + + self.volume_control_point = service_proxy.get_characteristics_by_uuid( + gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC + )[0] + + self.volume_flags = service_proxy.get_characteristics_by_uuid( + gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC + )[0] diff --git a/tests/vcp_test.py b/tests/vcp_test.py new file mode 100644 index 000000000..2aef45048 --- /dev/null +++ b/tests/vcp_test.py @@ -0,0 +1,63 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import os +import pytest +import logging + +from bumble import device +from bumble import gatt +from bumble.profiles import vcp +from .test_utils import TwoDevices + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_init_service(): + devices = TwoDevices() + devices[0].add_service( + vcp.VolumeControlService(volume_setting=32, muted=1, volume_flags=1) + ) + + await devices.setup_connection() + + # Mock encryption. + devices.connections[0].encryption = 1 + devices.connections[1].encryption = 1 + + peer = device.Peer(devices.connections[1]) + vcp_client = await peer.discover_service_and_create_proxy( + vcp.VolumeControlServiceProxy + ) + assert (await vcp_client.volume_flags.read_value()) == bytes([1]) + + +# ----------------------------------------------------------------------------- +async def run(): + pass + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + asyncio.run(run())