diff --git a/.vscode/settings.json b/.vscode/settings.json index 93e9ece33..dec6c267c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -72,6 +72,7 @@ "substates", "tobytes", "tsep", + "UNMUTE", "usbmodem", "vhci", "websockets", diff --git a/bumble/profiles/vcp.py b/bumble/profiles/vcp.py new file mode 100644 index 000000000..eacdab01e --- /dev/null +++ b/bumble/profiles/vcp.py @@ -0,0 +1,206 @@ +# Copyright 2021-2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import enum + +from bumble import att +from bumble import device +from bumble import gatt +from bumble import gatt_client + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- + +MIN_VOLUME = 0 +MAX_VOLUME = 255 + + +class ErrorCode(enum.IntEnum): + ''' + See Volume Control Service 1.6. Application error codes. + ''' + + INVALID_CHANGE_COUNTER = 0x80 + OPCODE_NOT_SUPPORTED = 0x81 + + +class VolumeFlags(enum.IntFlag): + ''' + See Volume Control Service 3.3. Volume Flags. + ''' + + VOLUME_SETTING_PERSISTED = 0x01 + # RFU + + +class VolumeControlPointOpcode(enum.IntEnum): + ''' + See Volume Control Service Table 3.3: Volume Control Point procedure requirements. + ''' + + # fmt: off + RELATIVE_VOLUME_DOWN = 0x00 + RELATIVE_VOLUME_UP = 0x01 + UNMUTE_RELATIVE_VOLUME_DOWN = 0x02 + UNMUTE_RELATIVE_VOLUME_UP = 0x03 + SET_ABSOLUTE_VOLUME = 0x04 + UNMUTE = 0x05 + MUTE = 0x06 + + +# ----------------------------------------------------------------------------- +# Server +# ----------------------------------------------------------------------------- +class VolumeControlService(gatt.TemplateService): + UUID = gatt.GATT_VOLUME_CONTROL_SERVICE + + volume_state: gatt.Characteristic + volume_control_point: gatt.Characteristic + volume_flags: gatt.Characteristic + + volume_setting: int + muted: int + change_counter: int + + def __init__( + self, + step_size: int = 16, + volume_setting: int = 0, + muted: int = 0, + change_counter: int = 0, + volume_flags: int = 0, + ) -> None: + self.step_size = step_size + self.volume_setting = volume_setting + self.muted = muted + self.change_counter = change_counter + + self.volume_state = gatt.Characteristic( + uuid=gatt.GATT_VOLUME_STATE_CHARACTERISTIC, + properties=( + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY + ), + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, + value=gatt.CharacteristicValue( + read=lambda *_: ([self.volume_setting, self.muted, self.change_counter]) + ), + ) + self.volume_control_point = gatt.Characteristic( + uuid=gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.WRITE, + permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, + value=gatt.CharacteristicValue(write=self._on_volume_control_point_write), + ) + self.volume_flags = gatt.Characteristic( + uuid=gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.READ, + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, + value=volume_flags, + ) + + super().__init__( + [ + self.volume_control_point, + self.volume_control_point, + self.volume_flags, + ] + ) + + def _on_write_volume_control_point( + self, connection: device.Connection, value: bytes + ) -> None: + opcode = VolumeControlPointOpcode(value[0]) + change_counter = value[1] + + if change_counter != self.change_counter: + raise att.ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER) + + handler = getattr(self, '_on_' + opcode.name) + if handler(*value[2:]): + self.change_counter = (self.change_counter + 1) % 256 + connection.abort_on( + 'disconnection', + connection.device.notify_subscriber(connection, self.volume_setting), + ) + + def _on_relative_volume_down(self) -> bool: + old_volume = self.volume_setting + self.volume_setting = min(self.volume_setting - self.step_size, MIN_VOLUME) + return self.volume_setting != old_volume + + def _on_relative_volume_up(self) -> bool: + old_volume = self.volume_setting + self.volume_setting = max(self.volume_setting + self.step_size, MAX_VOLUME) + return self.volume_setting != old_volume + + def _on_unmute_relative_volume_down(self) -> bool: + old_volume, old_muted_state = self.volume_setting, self.muted + self.volume_setting = min(self.volume_setting - self.step_size, MIN_VOLUME) + self.muted = 0 + return (self.volume_setting, self.muted) != (old_volume, old_muted_state) + + def _on_unmute_relative_volume_up(self) -> bool: + old_volume, old_muted_state = self.volume_setting, self.muted + self.volume_setting = max(self.volume_setting + self.step_size, MAX_VOLUME) + self.muted = 0 + return (self.volume_setting, self.muted) != (old_volume, old_muted_state) + + def _on_set_absolute_volume(self, volume_setting) -> bool: + old_volume_setting = self.volume_setting + self.volume_setting = volume_setting + return old_volume_setting != self.volume_setting + + def _on_unmute(self) -> bool: + old_muted_state = self.muted + self.muted = 0 + return self.muted != old_muted_state + + def _on_mute(self) -> bool: + old_muted_state = self.muted + self.muted = 1 + return self.muted != old_muted_state + + +# ----------------------------------------------------------------------------- +# Client +# ----------------------------------------------------------------------------- +class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy): + SERVICE_CLASS = VolumeControlService + + volume_state: gatt_client.CharacteristicProxy + volume_control_point: gatt_client.CharacteristicProxy + volume_flags: gatt_client.CharacteristicProxy + + def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: + self.service_proxy = service_proxy + + self.volume_state = service_proxy.get_characteristics_by_uuid( + gatt.GATT_VOLUME_STATE_CHARACTERISTIC + )[0] + + self.volume_control_point = service_proxy.get_characteristics_by_uuid( + gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC + )[0] + + self.volume_flags = service_proxy.get_characteristics_by_uuid( + gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC + )[0]