Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a class-based GATT adapter #598

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions bumble/att.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,13 +759,13 @@ class AttributeValue:
def __init__(
self,
read: Union[
Callable[[Optional[Connection]], bytes],
Callable[[Optional[Connection]], Awaitable[bytes]],
Callable[[Optional[Connection]], Any],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope this can be a TypeVar, but it might always require a type without PEP 696 which is annoying.

Callable[[Optional[Connection]], Awaitable[Any]],
None,
] = None,
write: Union[
Callable[[Optional[Connection], bytes], None],
Callable[[Optional[Connection], bytes], Awaitable[None]],
Callable[[Optional[Connection], Any], None],
Callable[[Optional[Connection], Any], Awaitable[None]],
None,
] = None,
):
Expand Down Expand Up @@ -824,13 +824,13 @@ def from_string(cls, permissions_str: str) -> Attribute.Permissions:
READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION

value: Union[bytes, AttributeValue]
value: Any

def __init__(
self,
attribute_type: Union[str, bytes, UUID],
permissions: Union[str, Attribute.Permissions],
value: Union[str, bytes, AttributeValue] = b'',
value: Any = b'',
) -> None:
EventEmitter.__init__(self)
self.handle = 0
Expand All @@ -848,11 +848,7 @@ def __init__(
else:
self.type = attribute_type

# Convert the value to a byte array
if isinstance(value, str):
self.value = bytes(value, 'utf-8')
else:
self.value = value
self.value = value

def encode_value(self, value: Any) -> bytes:
return value
Expand Down Expand Up @@ -895,6 +891,8 @@ async def read_value(self, connection: Optional[Connection]) -> bytes:
else:
value = self.value

self.emit('read', connection, value)

return self.encode_value(value)

async def write_value(self, connection: Connection, value_bytes: bytes) -> None:
Expand Down
38 changes: 32 additions & 6 deletions bumble/gatt.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@
import logging
import struct
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
SupportsBytes,
Type,
Union,
TYPE_CHECKING,
)

from bumble.colors import color
from bumble.core import BaseBumbleError, UUID
from bumble.att import Attribute, AttributeValue
from bumble.utils import ByteSerializable

if TYPE_CHECKING:
from bumble.gatt_client import AttributeProxy
Expand Down Expand Up @@ -343,7 +347,7 @@ class Service(Attribute):
def __init__(
self,
uuid: Union[str, UUID],
characteristics: List[Characteristic],
characteristics: Iterable[Characteristic],
primary=True,
included_services: Iterable[Service] = (),
) -> None:
Expand All @@ -362,7 +366,7 @@ def __init__(
)
self.uuid = uuid
self.included_services = list(included_services)
self.characteristics = characteristics[:]
self.characteristics = list(characteristics)
self.primary = primary

def get_advertising_data(self) -> Optional[bytes]:
Expand Down Expand Up @@ -393,7 +397,7 @@ class TemplateService(Service):

def __init__(
self,
characteristics: List[Characteristic],
characteristics: Iterable[Characteristic],
primary: bool = True,
included_services: Iterable[Service] = (),
) -> None:
Expand Down Expand Up @@ -490,7 +494,7 @@ def __init__(
uuid: Union[str, bytes, UUID],
properties: Characteristic.Properties,
permissions: Union[str, Attribute.Permissions],
value: Union[str, bytes, CharacteristicValue] = b'',
value: Any = b'',
descriptors: Sequence[Descriptor] = (),
):
super().__init__(uuid, permissions, value)
Expand Down Expand Up @@ -525,7 +529,11 @@ class CharacteristicDeclaration(Attribute):

characteristic: Characteristic

def __init__(self, characteristic: Characteristic, value_handle: int) -> None:
def __init__(
self,
characteristic: Characteristic,
value_handle: int,
) -> None:
declaration_bytes = (
struct.pack('<BH', characteristic.properties, value_handle)
+ characteristic.uuid.to_pdu_bytes()
Expand Down Expand Up @@ -705,7 +713,7 @@ class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept aa dictionary which
The adapted `read_value` and `write_value` methods return/accept a dictionary which
is packed/unpacked according to format, with the arguments extracted from the
dictionary by key, in the same order as they occur in the `keys` parameter.
'''
Expand Down Expand Up @@ -735,6 +743,24 @@ def decode_value(self, value: bytes) -> str:
return value.decode('utf-8')


# -----------------------------------------------------------------------------
class SerializableCharacteristicAdapter(CharacteristicAdapter):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to solve the problem that CharacteristicAdapter is not considered as a Characteristic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it's possible to do that, and it would be nice to improve type checking. Currently, the adapter classes are bit hard to deal with when it comes to types, because they are like dynamic decorators, they are polymorphic at runtime (that's convenient to reuse the code, but not convenient for typing). I have an idea about how to fix this, by splitting the adapter classes into separate classes for attributes and attribute proxies. That should solve the problem. But that's a larger change, so I'd like to do that in a separate PR.

'''
Adapter that converts any class to/from bytes using the class'
`to_bytes` and `__bytes__` methods, respectively.
'''

def __init__(self, characteristic, cls: Type[ByteSerializable]):
super().__init__(characteristic)
self.cls = cls

def encode_value(self, value: SupportsBytes) -> bytes:
return bytes(value)

def decode_value(self, value: bytes) -> Any:
return self.cls.from_bytes(value)


# -----------------------------------------------------------------------------
class Descriptor(Attribute):
'''
Expand Down
13 changes: 12 additions & 1 deletion bumble/gatt_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,17 @@
import logging
from collections import defaultdict
import struct
from typing import List, Tuple, Optional, TypeVar, Type, Dict, Iterable, TYPE_CHECKING
from typing import (
Dict,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Type,
Union,
TYPE_CHECKING,
)
from pyee import EventEmitter

from bumble.colors import color
Expand Down Expand Up @@ -68,6 +78,7 @@
GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
Characteristic,
CharacteristicAdapter,
CharacteristicDeclaration,
CharacteristicValue,
IncludedServiceDeclaration,
Expand Down
84 changes: 34 additions & 50 deletions bumble/profiles/aics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import logging
import struct

Expand All @@ -28,10 +29,11 @@
from bumble.att import ATT_Error
from bumble.gatt import (
Characteristic,
DelegatedCharacteristicAdapter,
SerializableCharacteristicAdapter,
PackedCharacteristicAdapter,
TemplateService,
CharacteristicValue,
PackedCharacteristicAdapter,
UTF8CharacteristicAdapter,
GATT_AUDIO_INPUT_CONTROL_SERVICE,
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
Expand Down Expand Up @@ -154,9 +156,6 @@ async def notify_subscribers_via_connection(self, connection: Connection) -> Non
attribute=self.attribute_value, value=bytes(self)
)

def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)


@dataclass
class GainSettingsProperties:
Expand All @@ -173,7 +172,7 @@ def from_bytes(cls, data: bytes):
(gain_settings_unit, gain_settings_minimum, gain_settings_maximum) = (
struct.unpack('BBB', data)
)
GainSettingsProperties(
return GainSettingsProperties(
gain_settings_unit, gain_settings_minimum, gain_settings_maximum
)

Expand All @@ -186,9 +185,6 @@ def __bytes__(self) -> bytes:
]
)

def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)


@dataclass
class AudioInputControlPoint:
Expand Down Expand Up @@ -321,21 +317,14 @@ class AudioInputDescription:
audio_input_description: str = "Bluetooth"
attribute_value: Optional[CharacteristicValue] = None

@classmethod
def from_bytes(cls, data: bytes):
return cls(audio_input_description=data.decode('utf-8'))

def __bytes__(self) -> bytes:
return self.audio_input_description.encode('utf-8')

def on_read(self, _connection: Optional[Connection]) -> bytes:
return self.audio_input_description.encode('utf-8')
def on_read(self, _connection: Optional[Connection]) -> str:
return self.audio_input_description

async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
async def on_write(self, connection: Optional[Connection], value: str) -> None:
assert connection
assert self.attribute_value

self.audio_input_description = value.decode('utf-8')
self.audio_input_description = value
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=value
)
Expand Down Expand Up @@ -375,34 +364,37 @@ def __init__(
self.audio_input_state, self.gain_settings_properties
)

self.audio_input_state_characteristic = DelegatedCharacteristicAdapter(
self.audio_input_state_characteristic = SerializableCharacteristicAdapter(
Characteristic(
uuid=GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
properties=Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY,
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.audio_input_state.on_read),
value=self.audio_input_state,
),
encode=lambda value: bytes(value),
AudioInputState,
)
self.audio_input_state.attribute_value = (
self.audio_input_state_characteristic.value
)

self.gain_settings_properties_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
uuid=GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.gain_settings_properties.on_read),
self.gain_settings_properties_characteristic = (
SerializableCharacteristicAdapter(
Characteristic(
uuid=GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=self.gain_settings_properties,
),
GainSettingsProperties,
)
)

self.audio_input_type_characteristic = Characteristic(
uuid=GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=audio_input_type,
value=bytes(audio_input_type, 'utf-8'),
)

self.audio_input_status_characteristic = Characteristic(
Expand All @@ -412,18 +404,14 @@ def __init__(
value=bytes([self.audio_input_status]),
)

self.audio_input_control_point_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
uuid=GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(
write=self.audio_input_control_point.on_write
),
)
self.audio_input_control_point_characteristic = Characteristic(
uuid=GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(write=self.audio_input_control_point.on_write),
)

self.audio_input_description_characteristic = DelegatedCharacteristicAdapter(
self.audio_input_description_characteristic = UTF8CharacteristicAdapter(
Characteristic(
uuid=GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
properties=Characteristic.Properties.READ
Expand Down Expand Up @@ -469,8 +457,8 @@ def __init__(self, service_proxy: ServiceProxy) -> None:
)
):
raise gatt.InvalidServiceError("Audio Input State Characteristic not found")
self.audio_input_state = DelegatedCharacteristicAdapter(
characteristic=characteristics[0], decode=AudioInputState.from_bytes
self.audio_input_state = SerializableCharacteristicAdapter(
characteristics[0], AudioInputState
)

if not (
Expand All @@ -481,9 +469,8 @@ def __init__(self, service_proxy: ServiceProxy) -> None:
raise gatt.InvalidServiceError(
"Gain Settings Attribute Characteristic not found"
)
self.gain_settings_properties = PackedCharacteristicAdapter(
characteristics[0],
'BBB',
self.gain_settings_properties = SerializableCharacteristicAdapter(
characteristics[0], GainSettingsProperties
)

if not (
Expand All @@ -494,10 +481,7 @@ def __init__(self, service_proxy: ServiceProxy) -> None:
raise gatt.InvalidServiceError(
"Audio Input Status Characteristic not found"
)
self.audio_input_status = PackedCharacteristicAdapter(
characteristics[0],
'B',
)
self.audio_input_status = PackedCharacteristicAdapter(characteristics[0], 'B')

if not (
characteristics := service_proxy.get_characteristics_by_uuid(
Expand All @@ -517,4 +501,4 @@ def __init__(self, service_proxy: ServiceProxy) -> None:
raise gatt.InvalidServiceError(
"Audio Input Description Characteristic not found"
)
self.audio_input_description = characteristics[0]
self.audio_input_description = UTF8CharacteristicAdapter(characteristics[0])
Loading
Loading