diff --git a/bumble/hid.py b/bumble/hid.py index 87126584..c0fb0535 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -19,16 +19,17 @@ from dataclasses import dataclass import logging import enum +import struct +from abc import ABC, abstractmethod from pyee import EventEmitter -from typing import Optional, TYPE_CHECKING +from typing import Optional, Callable, TYPE_CHECKING +from typing_extensions import override -from bumble import l2cap +from bumble import l2cap, device from bumble.colors import color from bumble.core import InvalidStateError, ProtocolError - -if TYPE_CHECKING: - from bumble.device import Device, Connection +from .hci import Address # ----------------------------------------------------------------------------- @@ -60,6 +61,7 @@ class Handshake(enum.IntEnum): NOT_READY = 0x01 ERR_INVALID_REPORT_ID = 0x02 ERR_UNSUPPORTED_REQUEST = 0x03 + ERR_INVALID_PARAMETER = 0x04 ERR_UNKNOWN = 0x0E ERR_FATAL = 0x0F @@ -101,13 +103,14 @@ class GetReportMessage(Message): def __bytes__(self) -> bytes: packet_bytes = bytearray() packet_bytes.append(self.report_id) - packet_bytes.extend( - [(self.buffer_size & 0xFF), ((self.buffer_size >> 8) & 0xFF)] - ) - if self.report_type == Message.ReportType.OTHER_REPORT: + if self.buffer_size == 0: return self.header(self.report_type) + packet_bytes else: - return self.header(0x08 | self.report_type) + packet_bytes + return ( + self.header(0x08 | self.report_type) + + packet_bytes + + struct.pack(" bytes: return self.header(self.report_type) + self.data +@dataclass +class SendControlData(Message): + report_type: int + data: bytes + message_type = Message.MessageType.DATA + + def __bytes__(self) -> bytes: + return self.header(self.report_type) + self.data + + @dataclass class GetProtocolMessage(Message): message_type = Message.MessageType.GET_PROTOCOL @@ -161,31 +174,47 @@ def __bytes__(self) -> bytes: return self.header(Message.ControlCommand.VIRTUAL_CABLE_UNPLUG) +# Device sends input report, host sends output report. @dataclass class SendData(Message): data: bytes + report_type: int message_type = Message.MessageType.DATA def __bytes__(self) -> bytes: - return self.header(Message.ReportType.OUTPUT_REPORT) + self.data + return self.header(self.report_type) + self.data + + +@dataclass +class SendHandshakeMessage(Message): + result_code: int + message_type = Message.MessageType.HANDSHAKE + + def __bytes__(self) -> bytes: + return self.header(self.result_code) # ----------------------------------------------------------------------------- -class Host(EventEmitter): - l2cap_ctrl_channel: Optional[l2cap.ClassicChannel] - l2cap_intr_channel: Optional[l2cap.ClassicChannel] +class HID(ABC, EventEmitter): + l2cap_ctrl_channel: Optional[l2cap.ClassicChannel] = None + l2cap_intr_channel: Optional[l2cap.ClassicChannel] = None + connection: Optional[device.Connection] = None + + class Role(enum.IntEnum): + HOST = 0x00 + DEVICE = 0x01 - def __init__(self, device: Device, connection: Connection) -> None: + def __init__(self, device: device.Device, role: Role) -> None: super().__init__() + self.remote_device_bd_address: Optional[Address] = None self.device = device - self.connection = connection - - self.l2cap_ctrl_channel = None - self.l2cap_intr_channel = None + self.role = role # Register ourselves with the L2CAP channel manager - device.register_l2cap_server(HID_CONTROL_PSM, self.on_connection) - device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_connection) + device.register_l2cap_server(HID_CONTROL_PSM, self.on_l2cap_connection) + device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_l2cap_connection) + + device.on('connection', self.on_device_connection) async def connect_control_channel(self) -> None: # Create a new L2CAP connection - control channel @@ -229,9 +258,18 @@ async def disconnect_control_channel(self) -> None: self.l2cap_ctrl_channel = None await channel.disconnect() - def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: + def on_device_connection(self, connection: device.Connection) -> None: + self.connection = connection + self.remote_device_bd_address = connection.peer_address + connection.on('disconnection', self.on_device_disconnection) + + def on_device_disconnection(self, reason: int) -> None: + self.connection = None + + def on_l2cap_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: logger.debug(f'+++ New L2CAP connection: {l2cap_channel}') l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel)) + l2cap_channel.on('close', lambda: self.on_l2cap_channel_close(l2cap_channel)) def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: if l2cap_channel.psm == HID_CONTROL_PSM: @@ -242,37 +280,220 @@ def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: self.l2cap_intr_channel.sink = self.on_intr_pdu logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}') + def on_l2cap_channel_close(self, l2cap_channel: l2cap.ClassicChannel) -> None: + if l2cap_channel.psm == HID_CONTROL_PSM: + self.l2cap_ctrl_channel = None + else: + self.l2cap_intr_channel = None + logger.debug(f'$$$ L2CAP channel close: {l2cap_channel}') + + @abstractmethod + def on_ctrl_pdu(self, pdu: bytes) -> None: + pass + + def on_intr_pdu(self, pdu: bytes) -> None: + logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') + self.emit("interrupt_data", pdu) + + def send_pdu_on_ctrl(self, msg: bytes) -> None: + assert self.l2cap_ctrl_channel + self.l2cap_ctrl_channel.send_pdu(msg) + + def send_pdu_on_intr(self, msg: bytes) -> None: + assert self.l2cap_intr_channel + self.l2cap_intr_channel.send_pdu(msg) + + def send_data(self, data: bytes) -> None: + if self.role == HID.Role.HOST: + report_type = Message.ReportType.OUTPUT_REPORT + else: + report_type = Message.ReportType.INPUT_REPORT + msg = SendData(data, report_type) + hid_message = bytes(msg) + if self.l2cap_intr_channel is not None: + logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_message.hex()}') + self.send_pdu_on_intr(hid_message) + + def virtual_cable_unplug(self) -> None: + msg = VirtualCableUnplug() + hid_message = bytes(msg) + logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {hid_message.hex()}') + self.send_pdu_on_ctrl(hid_message) + + +# ----------------------------------------------------------------------------- + + +class Device(HID): + class GetSetReturn(enum.IntEnum): + FAILURE = 0x00 + REPORT_ID_NOT_FOUND = 0x01 + ERR_UNSUPPORTED_REQUEST = 0x02 + ERR_UNKNOWN = 0x03 + ERR_INVALID_PARAMETER = 0x04 + SUCCESS = 0xFF + + class GetSetStatus: + def __init__(self) -> None: + self.data = bytearray() + self.status = 0 + + def __init__(self, device: device.Device) -> None: + super().__init__(device, HID.Role.DEVICE) + get_report_cb: Optional[Callable[[int, int, int], None]] = None + set_report_cb: Optional[Callable[[int, int, int, bytes], None]] = None + get_protocol_cb: Optional[Callable[[], None]] = None + set_protocol_cb: Optional[Callable[[int], None]] = None + + @override def on_ctrl_pdu(self, pdu: bytes) -> None: logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}') - # Here we will receive all kinds of packets, parse and then call respective callbacks - message_type = pdu[0] >> 4 param = pdu[0] & 0x0F + message_type = pdu[0] >> 4 - if message_type == Message.MessageType.HANDSHAKE: - logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}') - self.emit('handshake', Message.Handshake(param)) + if message_type == Message.MessageType.GET_REPORT: + logger.debug('<<< HID GET REPORT') + self.handle_get_report(pdu) + elif message_type == Message.MessageType.SET_REPORT: + logger.debug('<<< HID SET REPORT') + self.handle_set_report(pdu) + elif message_type == Message.MessageType.GET_PROTOCOL: + logger.debug('<<< HID GET PROTOCOL') + self.handle_get_protocol(pdu) + elif message_type == Message.MessageType.SET_PROTOCOL: + logger.debug('<<< HID SET PROTOCOL') + self.handle_set_protocol(pdu) elif message_type == Message.MessageType.DATA: logger.debug('<<< HID CONTROL DATA') - self.emit('data', pdu) + self.emit('control_data', pdu) elif message_type == Message.MessageType.CONTROL: if param == Message.ControlCommand.SUSPEND: logger.debug('<<< HID SUSPEND') - self.emit('suspend', pdu) + self.emit('suspend') elif param == Message.ControlCommand.EXIT_SUSPEND: logger.debug('<<< HID EXIT SUSPEND') - self.emit('exit_suspend', pdu) + self.emit('exit_suspend') elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG: logger.debug('<<< HID VIRTUAL CABLE UNPLUG') self.emit('virtual_cable_unplug') else: logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') else: - logger.debug('<<< HID CONTROL DATA') - self.emit('data', pdu) + logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED') + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) - def on_intr_pdu(self, pdu: bytes) -> None: - logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') - self.emit("data", pdu) + def send_handshake_message(self, result_code: int) -> None: + msg = SendHandshakeMessage(result_code) + hid_message = bytes(msg) + logger.debug(f'>>> HID HANDSHAKE MESSAGE, PDU: {hid_message.hex()}') + self.send_pdu_on_ctrl(hid_message) + + def send_control_data(self, report_type: int, data: bytes): + msg = SendControlData(report_type=report_type, data=data) + hid_message = bytes(msg) + logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}') + self.send_pdu_on_ctrl(hid_message) + + def handle_get_report(self, pdu: bytes): + if self.get_report_cb is None: + logger.debug("GetReport callback not registered !!") + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + return + report_type = pdu[0] & 0x03 + buffer_flag = (pdu[0] & 0x08) >> 3 + report_id = pdu[1] + logger.debug("buffer_flag: " + str(buffer_flag)) + if buffer_flag == 1: + buffer_size = (pdu[3] << 8) | pdu[2] + else: + buffer_size = 0 + + ret = self.get_report_cb(report_id, report_type, buffer_size) + assert ret is not None + if ret.status == self.GetSetReturn.FAILURE: + self.send_handshake_message(Message.Handshake.ERR_UNKNOWN) + elif ret.status == self.GetSetReturn.SUCCESS: + data = bytearray() + data.append(report_id) + data.extend(ret.data) + if len(data) < self.l2cap_ctrl_channel.mtu: # type: ignore[union-attr] + self.send_control_data(report_type=report_type, data=data) + else: + self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) + elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND: + self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) + elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER: + self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) + elif ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST: + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + + def register_get_report_cb(self, cb: Callable[[int, int, int], None]) -> None: + self.get_report_cb = cb + logger.debug("GetReport callback registered successfully") + + def handle_set_report(self, pdu: bytes): + if self.set_report_cb is None: + logger.debug("SetReport callback not registered !!") + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + return + report_type = pdu[0] & 0x03 + report_id = pdu[1] + report_data = pdu[2:] + report_size = len(report_data) + 1 + ret = self.set_report_cb(report_id, report_type, report_size, report_data) + assert ret is not None + if ret.status == self.GetSetReturn.SUCCESS: + self.send_handshake_message(Message.Handshake.SUCCESSFUL) + elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER: + self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) + elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND: + self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) + else: + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + + def register_set_report_cb( + self, cb: Callable[[int, int, int, bytes], None] + ) -> None: + self.set_report_cb = cb + logger.debug("SetReport callback registered successfully") + + def handle_get_protocol(self, pdu: bytes): + if self.get_protocol_cb is None: + logger.debug("GetProtocol callback not registered !!") + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + return + ret = self.get_protocol_cb() + assert ret is not None + if ret.status == self.GetSetReturn.SUCCESS: + self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data) + else: + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + + def register_get_protocol_cb(self, cb: Callable[[], None]) -> None: + self.get_protocol_cb = cb + logger.debug("GetProtocol callback registered successfully") + + def handle_set_protocol(self, pdu: bytes): + if self.set_protocol_cb is None: + logger.debug("SetProtocol callback not registered !!") + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + return + ret = self.set_protocol_cb(pdu[0] & 0x01) + assert ret is not None + if ret.status == self.GetSetReturn.SUCCESS: + self.send_handshake_message(Message.Handshake.SUCCESSFUL) + else: + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + + def register_set_protocol_cb(self, cb: Callable[[int], None]) -> None: + self.set_protocol_cb = cb + logger.debug("SetProtocol callback registered successfully") + + +# ----------------------------------------------------------------------------- +class Host(HID): + def __init__(self, device: device.Device) -> None: + super().__init__(device, HID.Role.HOST) def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None: msg = GetReportMessage( @@ -282,52 +503,52 @@ def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def set_report(self, report_type: int, data: bytes): + def set_report(self, report_type: int, data: bytes) -> None: msg = SetReportMessage(report_type=report_type, data=data) hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def get_protocol(self): + def get_protocol(self) -> None: msg = GetProtocolMessage() hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def set_protocol(self, protocol_mode: int): + def set_protocol(self, protocol_mode: int) -> None: msg = SetProtocolMessage(protocol_mode=protocol_mode) hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def send_pdu_on_ctrl(self, msg: bytes) -> None: - assert self.l2cap_ctrl_channel - self.l2cap_ctrl_channel.send_pdu(msg) - - def send_pdu_on_intr(self, msg: bytes) -> None: - assert self.l2cap_intr_channel - self.l2cap_intr_channel.send_pdu(msg) - - def send_data(self, data): - msg = SendData(data) - hid_message = bytes(msg) - logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_message.hex()}') - self.send_pdu_on_intr(hid_message) - - def suspend(self): + def suspend(self) -> None: msg = Suspend() hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL SUSPEND, PDU:{hid_message.hex()}') - self.send_pdu_on_ctrl(msg) + self.send_pdu_on_ctrl(hid_message) - def exit_suspend(self): + def exit_suspend(self) -> None: msg = ExitSuspend() hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{hid_message.hex()}') - self.send_pdu_on_ctrl(msg) + self.send_pdu_on_ctrl(hid_message) - def virtual_cable_unplug(self): - msg = VirtualCableUnplug() - hid_message = bytes(msg) - logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {hid_message.hex()}') - self.send_pdu_on_ctrl(msg) + @override + def on_ctrl_pdu(self, pdu: bytes) -> None: + logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}') + param = pdu[0] & 0x0F + message_type = pdu[0] >> 4 + if message_type == Message.MessageType.HANDSHAKE: + logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}') + self.emit('handshake', Message.Handshake(param)) + elif message_type == Message.MessageType.DATA: + logger.debug('<<< HID CONTROL DATA') + self.emit('control_data', pdu) + elif message_type == Message.MessageType.CONTROL: + if param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG: + logger.debug('<<< HID VIRTUAL CABLE UNPLUG') + self.emit('virtual_cable_unplug') + else: + logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') + else: + logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED') diff --git a/examples/hid_keyboard.json b/examples/hid_keyboard.json new file mode 100644 index 00000000..b7b14096 --- /dev/null +++ b/examples/hid_keyboard.json @@ -0,0 +1,5 @@ +{ + "name": "Bumble HID Keyboard", + "class_of_device": 9664, + "keystore": "JsonKeyStore" +} diff --git a/examples/keyboard.html b/examples/keyboard.html index 6ad83a7f..7d44a031 100644 --- a/examples/keyboard.html +++ b/examples/keyboard.html @@ -40,9 +40,9 @@ } } function onMouseMove(event) { - //console.log(event.clientX, event.clientY) - mouseInfo.innerText = `MOUSE: x=${event.clientX}, y=${event.clientY}` - send({ type:'mousemove', x: event.clientX, y: event.clientY }) + //console.log(event.movementX, event.movementY) + mouseInfo.innerText = `MOUSE: x=${event.movementX}, y=${event.movementY}` + send({ type:'mousemove', x: event.movementX, y: event.movementY }) } function onKeyDown(event) { diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py new file mode 100644 index 00000000..b71e38be --- /dev/null +++ b/examples/run_hid_device.py @@ -0,0 +1,748 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import sys +import os +import logging +import json +import websockets +from bumble.colors import color + +from bumble.device import Device +from bumble.transport import open_transport_or_link +from bumble.core import ( + BT_BR_EDR_TRANSPORT, + BT_L2CAP_PROTOCOL_ID, + BT_HUMAN_INTERFACE_DEVICE_SERVICE, + BT_HIDP_PROTOCOL_ID, + UUID, +) +from bumble.hci import Address +from bumble.hid import ( + Device as HID_Device, + HID_CONTROL_PSM, + HID_INTERRUPT_PSM, + Message, +) +from bumble.sdp import ( + Client as SDP_Client, + DataElement, + ServiceAttribute, + SDP_PUBLIC_BROWSE_ROOT, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_ALL_ATTRIBUTES_RANGE, + SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID, + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, +) +from bumble.utils import AsyncRunner + +# ----------------------------------------------------------------------------- +# SDP attributes for Bluetooth HID devices +SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100 +SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101 +SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102 +SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED] +SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201 +SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202 +SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203 +SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204 +SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205 +SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206 +SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207 +SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED] +SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209 +SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A +SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED] +SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C +SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D +SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E +SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F +SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210 + +# Refer to HID profile specification v1.1.1, "5.3 Service Discovery Protocol (SDP)" for details +# HID SDP attribute values +LANGUAGE = 0x656E # 0x656E uint16 “en” (English) +ENCODING = 0x6A # 0x006A uint16 UTF-8 encoding +PRIMARY_LANGUAGE_BASE_ID = 0x100 # 0x0100 uint16 PrimaryLanguageBaseID +VERSION_NUMBER = 0x0101 # 0x0101 uint16 version number (v1.1) +SERVICE_NAME = b'Bumble HID' +SERVICE_DESCRIPTION = b'Bumble' +PROVIDER_NAME = b'Bumble' +HID_PARSER_VERSION = 0x0111 # uint16 0x0111 (v1.1.1) +HID_DEVICE_SUBCLASS = 0xC0 # Combo keyboard/pointing device +HID_COUNTRY_CODE = 0x21 # 0x21 Uint8, USA +HID_VIRTUAL_CABLE = True # Virtual cable enabled +HID_RECONNECT_INITIATE = True # Reconnect initiate enabled +REPORT_DESCRIPTOR_TYPE = 0x22 # 0x22 Type = Report Descriptor +HID_LANGID_BASE_LANGUAGE = 0x0409 # 0x0409 Language = English (United States) +HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET = 0x100 # 0x0100 Default +HID_BATTERY_POWER = True # Battery power enabled +HID_REMOTE_WAKE = True # Remote wake enabled +HID_SUPERVISION_TIMEOUT = 0xC80 # uint16 0xC80 (2s) +HID_NORMALLY_CONNECTABLE = True # Normally connectable enabled +HID_BOOT_DEVICE = True # Boot device support enabled +HID_SSR_HOST_MAX_LATENCY = 0x640 # uint16 0x640 (1s) +HID_SSR_HOST_MIN_TIMEOUT = 0xC80 # uint16 0xC80 (2s) +HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor + # pylint: disable=line-too-long + [ + 0x05, + 0x01, # Usage Page (Generic Desktop Ctrls) + 0x09, + 0x06, # Usage (Keyboard) + 0xA1, + 0x01, # Collection (Application) + 0x85, + 0x01, # . Report ID (1) + 0x05, + 0x07, # . Usage Page (Kbrd/Keypad) + 0x19, + 0xE0, # . Usage Minimum (0xE0) + 0x29, + 0xE7, # . Usage Maximum (0xE7) + 0x15, + 0x00, # . Logical Minimum (0) + 0x25, + 0x01, # . Logical Maximum (1) + 0x75, + 0x01, # . Report Size (1) + 0x95, + 0x08, # . Report Count (8) + 0x81, + 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x01, # . Report Count (1) + 0x75, + 0x08, # . Report Size (8) + 0x81, + 0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x05, # . Report Count (5) + 0x75, + 0x01, # . Report Size (1) + 0x05, + 0x08, # . Usage Page (LEDs) + 0x19, + 0x01, # . Usage Minimum (Num Lock) + 0x29, + 0x05, # . Usage Maximum (Kana) + 0x91, + 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) + 0x95, + 0x01, # . Report Count (1) + 0x75, + 0x03, # . Report Size (3) + 0x91, + 0x03, # . Output (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) + 0x95, + 0x06, # . Report Count (6) + 0x75, + 0x08, # . Report Size (8) + 0x15, + 0x00, # . Logical Minimum (0) + 0x25, + 0x65, # . Logical Maximum (101) + 0x05, + 0x07, # . Usage Page (Kbrd/Keypad) + 0x19, + 0x00, # . Usage Minimum (0x00) + 0x29, + 0x65, # . Usage Maximum (0x65) + 0x81, + 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0xC0, # End Collection + 0x05, + 0x01, # Usage Page (Generic Desktop Ctrls) + 0x09, + 0x02, # Usage (Mouse) + 0xA1, + 0x01, # Collection (Application) + 0x85, + 0x02, # . Report ID (2) + 0x09, + 0x01, # . Usage (Pointer) + 0xA1, + 0x00, # . Collection (Physical) + 0x05, + 0x09, # . Usage Page (Button) + 0x19, + 0x01, # . Usage Minimum (0x01) + 0x29, + 0x03, # . Usage Maximum (0x03) + 0x15, + 0x00, # . Logical Minimum (0) + 0x25, + 0x01, # . Logical Maximum (1) + 0x95, + 0x03, # . Report Count (3) + 0x75, + 0x01, # . Report Size (1) + 0x81, + 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x01, # . Report Count (1) + 0x75, + 0x05, # . Report Size (5) + 0x81, + 0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x05, + 0x01, # . Usage Page (Generic Desktop Ctrls) + 0x09, + 0x30, # . Usage (X) + 0x09, + 0x31, # . Usage (Y) + 0x15, + 0x81, # . Logical Minimum (-127) + 0x25, + 0x7F, # . Logical Maximum (127) + 0x75, + 0x08, # . Report Size (8) + 0x95, + 0x02, # . Report Count (2) + 0x81, + 0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position) + 0xC0, # . End Collection + 0xC0, # End Collection + ] +) + + +# Default protocol mode set to report protocol +protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL + +# ----------------------------------------------------------------------------- +def sdp_records(): + service_record_handle = 0x00010002 + return { + service_record_handle: [ + ServiceAttribute( + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(service_record_handle), + ), + ServiceAttribute( + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), + ), + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE)] + ), + ), + ServiceAttribute( + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16(HID_CONTROL_PSM), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(BT_HIDP_PROTOCOL_ID), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.unsigned_integer_16(LANGUAGE), + DataElement.unsigned_integer_16(ENCODING), + DataElement.unsigned_integer_16(PRIMARY_LANGUAGE_BASE_ID), + ] + ), + ), + ServiceAttribute( + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE), + DataElement.unsigned_integer_16(VERSION_NUMBER), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16( + HID_INTERRUPT_PSM + ), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(BT_HIDP_PROTOCOL_ID), + ] + ), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_HID_SERVICE_NAME_ATTRIBUTE_ID, + DataElement(DataElement.TEXT_STRING, SERVICE_NAME), + ), + ServiceAttribute( + SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID, + DataElement(DataElement.TEXT_STRING, SERVICE_DESCRIPTION), + ), + ServiceAttribute( + SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID, + DataElement(DataElement.TEXT_STRING, PROVIDER_NAME), + ), + ServiceAttribute( + SDP_HID_PARSER_VERSION_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(HID_PARSER_VERSION), + ), + ServiceAttribute( + SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(HID_DEVICE_SUBCLASS), + ), + ServiceAttribute( + SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(HID_COUNTRY_CODE), + ), + ServiceAttribute( + SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID, + DataElement.boolean(HID_VIRTUAL_CABLE), + ), + ServiceAttribute( + SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID, + DataElement.boolean(HID_RECONNECT_INITIATE), + ), + ServiceAttribute( + SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.unsigned_integer_16(REPORT_DESCRIPTOR_TYPE), + DataElement(DataElement.TEXT_STRING, HID_REPORT_MAP), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.unsigned_integer_16( + HID_LANGID_BASE_LANGUAGE + ), + DataElement.unsigned_integer_16( + HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET + ), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_HID_BATTERY_POWER_ATTRIBUTE_ID, + DataElement.boolean(HID_BATTERY_POWER), + ), + ServiceAttribute( + SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID, + DataElement.boolean(HID_REMOTE_WAKE), + ), + ServiceAttribute( + SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(HID_SUPERVISION_TIMEOUT), + ), + ServiceAttribute( + SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID, + DataElement.boolean(HID_NORMALLY_CONNECTABLE), + ), + ServiceAttribute( + SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID, + DataElement.boolean(HID_BOOT_DEVICE), + ), + ServiceAttribute( + SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(HID_SSR_HOST_MAX_LATENCY), + ), + ServiceAttribute( + SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(HID_SSR_HOST_MIN_TIMEOUT), + ), + ] + } + + +# ----------------------------------------------------------------------------- +async def get_stream_reader(pipe) -> asyncio.StreamReader: + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader(loop=loop) + protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: protocol, pipe) + return reader + + +class DeviceData: + def __init__(self) -> None: + self.keyboardData = bytearray( + [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + ) + self.mouseData = bytearray([0x02, 0x00, 0x00, 0x00]) + + +# Device's live data - Mouse and Keyboard will be stored in this +deviceData = DeviceData() + +# ----------------------------------------------------------------------------- +async def keyboard_device(hid_device): + + # Start a Websocket server to receive events from a web page + async def serve(websocket, _path): + global deviceData + while True: + try: + message = await websocket.recv() + print('Received: ', str(message)) + parsed = json.loads(message) + message_type = parsed['type'] + if message_type == 'keydown': + # Only deal with keys a to z for now + key = parsed['key'] + if len(key) == 1: + code = ord(key) + if ord('a') <= code <= ord('z'): + hid_code = 0x04 + code - ord('a') + deviceData.keyboardData = bytearray( + [ + 0x01, + 0x00, + 0x00, + hid_code, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + ] + ) + hid_device.send_data(deviceData.keyboardData) + elif message_type == 'keyup': + deviceData.keyboardData = bytearray( + [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + ) + hid_device.send_data(deviceData.keyboardData) + elif message_type == "mousemove": + # logical min and max values + log_min = -127 + log_max = 127 + x = parsed['x'] + y = parsed['y'] + # limiting x and y values within logical max and min range + x = max(log_min, min(log_max, x)) + y = max(log_min, min(log_max, y)) + x_cord = x.to_bytes(signed=True) + y_cord = y.to_bytes(signed=True) + deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord + hid_device.send_data(deviceData.mouseData) + except websockets.exceptions.ConnectionClosedOK: + pass + + # pylint: disable-next=no-member + await websockets.serve(serve, 'localhost', 8989) + await asyncio.get_event_loop().create_future() + + +# ----------------------------------------------------------------------------- +async def main(): + if len(sys.argv) < 3: + print( + 'Usage: python run_hid_device.py ' + ' where is one of:\n' + ' test-mode (run with menu enabled for testing)\n' + ' web (run a keyboard with keypress input from a web page, ' + 'see keyboard.html' + ) + print('example: python run_hid_device.py hid_keyboard.json usb:0 web') + print('example: python run_hid_device.py hid_keyboard.json usb:0 test-mode') + + return + + async def handle_virtual_cable_unplug(): + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + await hid_device.disconnect_interrupt_channel() + await hid_device.disconnect_control_channel() + await device.keystore.delete(hid_host_bd_addr) # type: ignore + connection = hid_device.connection + if connection is not None: + await connection.disconnect() + + def on_hid_data_cb(pdu: bytes): + print(f'Received Data, PDU: {pdu.hex()}') + + def on_get_report_cb(report_id: int, report_type: int, buffer_size: int): + retValue = hid_device.GetSetStatus() + print( + "GET_REPORT report_id: " + + str(report_id) + + "report_type: " + + str(report_type) + + "buffer_size:" + + str(buffer_size) + ) + if report_type == Message.ReportType.INPUT_REPORT: + if report_id == 1: + retValue.data = deviceData.keyboardData[1:] + retValue.status = hid_device.GetSetReturn.SUCCESS + elif report_id == 2: + retValue.data = deviceData.mouseData[1:] + retValue.status = hid_device.GetSetReturn.SUCCESS + else: + retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND + + if buffer_size: + data_len = buffer_size - 1 + retValue.data = retValue.data[:data_len] + elif report_type == Message.ReportType.OUTPUT_REPORT: + # This sample app has nothing to do with the report received, to enable PTS + # testing, we will return single byte random data. + retValue.data = bytearray([0x11]) + retValue.status = hid_device.GetSetReturn.SUCCESS + elif report_type == Message.ReportType.FEATURE_REPORT: + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_type == Message.ReportType.OTHER_REPORT: + if report_id == 3: + retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND + else: + retValue.status = hid_device.GetSetReturn.FAILURE + + return retValue + + def on_set_report_cb( + report_id: int, report_type: int, report_size: int, data: bytes + ): + retValue = hid_device.GetSetStatus() + print( + "SET_REPORT report_id: " + + str(report_id) + + "report_type: " + + str(report_type) + + "report_size " + + str(report_size) + + "data:" + + str(data) + ) + if report_type == Message.ReportType.FEATURE_REPORT: + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_type == Message.ReportType.INPUT_REPORT: + if report_id == 1 and report_size != len(deviceData.keyboardData): + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_id == 2 and report_size != len(deviceData.mouseData): + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_id == 3: + retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND + else: + retValue.status = hid_device.GetSetReturn.SUCCESS + else: + retValue.status = hid_device.GetSetReturn.SUCCESS + + return retValue + + def on_get_protocol_cb(): + retValue = hid_device.GetSetStatus() + retValue.data = protocol_mode.to_bytes() + retValue.status = hid_device.GetSetReturn.SUCCESS + return retValue + + def on_set_protocol_cb(protocol: int): + retValue = hid_device.GetSetStatus() + # We do not support SET_PROTOCOL. + print("SET_PROTOCOL report_id: " + str(protocol)) + retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST + return retValue + + def on_virtual_cable_unplug_cb(): + print(f'Received Virtual Cable Unplug') + asyncio.create_task(handle_virtual_cable_unplug()) + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + print('<<< connected') + + # Create a device + device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device.classic_enabled = True + + # Create and register HID device + hid_device = HID_Device(device) + + # Register for call backs + hid_device.on('interrupt_data', on_hid_data_cb) + + hid_device.register_get_report_cb(on_get_report_cb) + hid_device.register_set_report_cb(on_set_report_cb) + hid_device.register_get_protocol_cb(on_get_protocol_cb) + hid_device.register_set_protocol_cb(on_set_protocol_cb) + + # Register for virtual cable unplug call back + hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb) + + # Setup the SDP to advertise HID Device service + device.sdp_service_records = sdp_records() + + # Start the controller + await device.power_on() + + # Start being discoverable and connectable + await device.set_discoverable(True) + await device.set_connectable(True) + + async def menu(): + reader = await get_stream_reader(sys.stdin) + while True: + print( + "\n************************ HID Device Menu *****************************\n" + ) + print(" 1. Connect Control Channel") + print(" 2. Connect Interrupt Channel") + print(" 3. Disconnect Control Channel") + print(" 4. Disconnect Interrupt Channel") + print(" 5. Send Report on Interrupt Channel") + print(" 6. Virtual Cable Unplug") + print(" 7. Disconnect device") + print(" 8. Delete Bonding") + print(" 9. Re-connect to device") + print("10. Exit ") + print("\nEnter your choice : \n") + + choice = await reader.readline() + choice = choice.decode('utf-8').strip() + + if choice == '1': + await hid_device.connect_control_channel() + + elif choice == '2': + await hid_device.connect_interrupt_channel() + + elif choice == '3': + await hid_device.disconnect_control_channel() + + elif choice == '4': + await hid_device.disconnect_interrupt_channel() + + elif choice == '5': + print(" 1. Report ID 0x01") + print(" 2. Report ID 0x02") + print(" 3. Invalid Report ID") + + choice1 = await reader.readline() + choice1 = choice1.decode('utf-8').strip() + + if choice1 == '1': + data = bytearray( + [0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00] + ) + hid_device.send_data(data) + data = bytearray( + [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + ) + hid_device.send_data(data) + + elif choice1 == '2': + data = bytearray([0x02, 0x00, 0x00, 0xF6]) + hid_device.send_data(data) + data = bytearray([0x02, 0x00, 0x00, 0x00]) + hid_device.send_data(data) + + elif choice1 == '3': + data = bytearray([0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(data) + data = bytearray([0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(data) + + else: + print('Incorrect option selected') + + elif choice == '6': + hid_device.virtual_cable_unplug() + try: + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + await device.keystore.delete(hid_host_bd_addr) + except KeyError: + print('Device not found or Device already unpaired.') + + elif choice == '7': + connection = hid_device.connection + if connection is not None: + await connection.disconnect() + else: + print("Already disconnected from device") + + elif choice == '8': + try: + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + await device.keystore.delete(hid_host_bd_addr) + except KeyError: + print('Device NOT found or Device already unpaired.') + + elif choice == '9': + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + connection = await device.connect( + hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT + ) + await connection.authenticate() + await connection.encrypt() + + elif choice == '10': + sys.exit("Exit successful") + + else: + print("Invalid option selected.") + + if (len(sys.argv) > 3) and (sys.argv[3] == 'test-mode'): + # Test mode for PTS/Unit testing + await menu() + else: + # default option is using keyboard.html (web) + print("Executing in Web mode") + await keyboard_device(hid_device) + + await hci_source.wait_for_termination() + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main()) diff --git a/examples/run_hid_host.py b/examples/run_hid_host.py index cbaaac5a..7519b4e1 100644 --- a/examples/run_hid_host.py +++ b/examples/run_hid_host.py @@ -285,7 +285,10 @@ async def main(): print('example: run_hid_host.py classic1.json usb:0 E1:CA:72:48:C4:E8/P') return - def on_hid_data_cb(pdu): + def on_hid_control_data_cb(pdu: bytes): + print(f'Received Control Data, PDU: {pdu.hex()}') + + def on_hid_interrupt_data_cb(pdu: bytes): report_type = pdu[0] & 0x0F if len(pdu) == 1: print(color(f'Warning: No report received', 'yellow')) @@ -305,7 +308,7 @@ def on_hid_data_cb(pdu): if (report_length <= 1) or (report_id == 0): return - + # Parse report over interrupt channel if report_type == Message.ReportType.INPUT_REPORT: ReportParser.parse_input_report(pdu[1:]) # type: ignore @@ -313,7 +316,9 @@ async def handle_virtual_cable_unplug(): await hid_host.disconnect_interrupt_channel() await hid_host.disconnect_control_channel() await device.keystore.delete(target_address) # type: ignore - await connection.disconnect() + connection = hid_host.connection + if connection is not None: + await connection.disconnect() def on_hid_virtual_cable_unplug_cb(): asyncio.create_task(handle_virtual_cable_unplug()) @@ -325,6 +330,18 @@ def on_hid_virtual_cable_unplug_cb(): # Create a device device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device.classic_enabled = True + + # Create HID host and start it + print('@@@ Starting HID Host...') + hid_host = Host(device) + + # Register for HID data call back + hid_host.on('interrupt_data', on_hid_interrupt_data_cb) + hid_host.on('control_data', on_hid_control_data_cb) + + # Register for virtual cable unplug call back + hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb) + await device.power_on() # Connect to a peer @@ -345,16 +362,6 @@ def on_hid_virtual_cable_unplug_cb(): await get_hid_device_sdp_record(connection) - # Create HID host and start it - print('@@@ Starting HID Host...') - hid_host = Host(device, connection) - - # Register for HID data call back - hid_host.on('data', on_hid_data_cb) - - # Register for virtual cable unplug call back - hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb) - async def menu(): reader = await get_stream_reader(sys.stdin) while True: @@ -369,13 +376,14 @@ async def menu(): print(" 6. Set Report") print(" 7. Set Protocol Mode") print(" 8. Get Protocol Mode") - print(" 9. Send Report") + print(" 9. Send Report on Interrupt Channel") print("10. Suspend") print("11. Exit Suspend") print("12. Virtual Cable Unplug") print("13. Disconnect device") print("14. Delete Bonding") print("15. Re-connect to device") + print("16. Exit") print("\nEnter your choice : \n") choice = await reader.readline() @@ -394,21 +402,40 @@ async def menu(): await hid_host.disconnect_interrupt_channel() elif choice == '5': - print(" 1. Report ID 0x02") - print(" 2. Report ID 0x03") - print(" 3. Report ID 0x05") + print(" 1. Input Report with ID 0x01") + print(" 2. Input Report with ID 0x02") + print(" 3. Input Report with ID 0x0F - Invalid ReportId") + print(" 4. Output Report with ID 0x02") + print(" 5. Feature Report with ID 0x05 - Unsupported Request") + print(" 6. Input Report with ID 0x02, BufferSize 3") + print(" 7. Output Report with ID 0x03, BufferSize 2") + print(" 8. Feature Report with ID 0x05, BufferSize 3") choice1 = await reader.readline() choice1 = choice1.decode('utf-8').strip() if choice1 == '1': - hid_host.get_report(1, 2, 3) + hid_host.get_report(1, 1, 0) elif choice1 == '2': - hid_host.get_report(2, 3, 2) + hid_host.get_report(1, 2, 0) elif choice1 == '3': - hid_host.get_report(3, 5, 3) + hid_host.get_report(1, 5, 0) + + elif choice1 == '4': + hid_host.get_report(2, 2, 0) + elif choice1 == '5': + hid_host.get_report(3, 15, 0) + + elif choice1 == '6': + hid_host.get_report(1, 2, 3) + + elif choice1 == '7': + hid_host.get_report(2, 3, 2) + + elif choice1 == '8': + hid_host.get_report(3, 5, 3) else: print('Incorrect option selected') @@ -484,6 +511,7 @@ async def menu(): hid_host.virtual_cable_unplug() try: await device.keystore.delete(target_address) + print("Unpair successful") except KeyError: print('Device not found or Device already unpaired.') @@ -513,6 +541,9 @@ async def menu(): await connection.authenticate() await connection.encrypt() + elif choice == '16': + sys.exit("Exit successful") + else: print("Invalid option selected.")