diff --git a/.mypy.ini b/.mypy.ini index 1e15c62..04a8a64 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -1,5 +1,5 @@ [mypy] -files = a1314_message_filter.py, adapter.py, agent.py, bluetooth_devices.py, compatibility_device.py, hid_devices.py, hid_message_filter.py, mouse_message_filter.py +files = a1314_message_filter.py, adapter.py, agent.py, bluetooth_devices.py, compatibility_device.py, hid_devices.py, hid_message_filter.py, mouse_g502_message_filter.py, mouse_message_filter.py, mouse_mx510_message_filter.py check_untyped_defs = True follow_imports_for_stubs = True disallow_any_decorated = True diff --git a/adapter.py b/adapter.py index 8c589cd..c077b3e 100644 --- a/adapter.py +++ b/adapter.py @@ -1,6 +1,7 @@ # Copyright (c) 2020 ruundii. All rights reserved. import asyncio +from collections.abc import Container from datetime import datetime, timedelta from typing import Awaitable, Callable, Optional, TypedDict, cast @@ -119,7 +120,7 @@ async def wait_till_adapter_present_then_init(self) -> None: self.initialising_adapter = False - def interfaces_added(self, obj_name: str, interfaces: list[str]) -> None: + def interfaces_added(self, obj_name: str, interfaces: Container[str]) -> None: self.on_interface_changed() if not self.adapter_exists(): return @@ -133,13 +134,13 @@ def interfaces_added(self, obj_name: str, interfaces: list[str]) -> None: elif INPUT_DEVICE_INTERFACE in interfaces: self.bluetooth_devices.add_device(obj_name, False) - def interfaces_removed(self, obj_name: str, interfaces: list[str]) -> None: - if(obj_name==ADAPTER_OBJECT or obj_name==ROOT_OBJECT): + def interfaces_removed(self, obj_name: str, interfaces: Container[str]) -> None: + if (obj_name==ADAPTER_OBJECT or obj_name==ROOT_OBJECT): self.adapter = None self.bluetooth_devices.remove_devices() print("Bluetooth adapter removed. Stopping") asyncio.run_coroutine_threadsafe(self.init(), loop=self.loop) - elif INPUT_HOST_INTERFACE in interfaces or INPUT_DEVICE_INTERFACE in interfaces: + elif INPUT_HOST_INTERFACE in interfaces or INPUT_DEVICE_INTERFACE in interfaces: self.bluetooth_devices.remove_device(obj_name) self.on_interface_changed() diff --git a/agent.py b/agent.py index d478aee..d2a015a 100644 --- a/agent.py +++ b/agent.py @@ -31,14 +31,17 @@ def dev_connect(device_path: str) -> None: device.Connect() -@dbus_interface("org.bluez.Agent1") -class Agent(object): - __dbus_xml__: str - +# Decorator won't work with compiled class as it depends on introspection. +# If you update anything in this class, then uncomment the below line, and add a +# print(self.__dbus_xml__) into the init method. Then copy and paste the updated string +# to replace the hardcoded one below. +#@dbus_interface("org.bluez.Agent1") +class Agent: def __init__(self) -> None: self.on_agent_action_handler: Optional[Callable[[Action], None]] = None self.request_confirmation_device: Optional[dt.ObjPath] = None self.request_confirmation_passkey: Optional[str] = None + self.__dbus_xml__ = '' def Release(self) -> None: self.on_agent_action({'action':'agent_released'}) diff --git a/compatibility_device.py b/compatibility_device.py index 6de893a..7a0606d 100644 --- a/compatibility_device.py +++ b/compatibility_device.py @@ -146,7 +146,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, device_path: str): self.hidraw_device.name = "BT HID Hub Virtual Hid Raw Keyboard" self.hidraw_device.info = (0x06, 0x0001, 0x0001) # 0x06 - BUS_VIRTUAL, vendor id 1 product id 1 self.hidraw_device.phys = "0" - self.hidraw_device.rdesc = bytearray.fromhex( + self.hidraw_device.rdesc = bytearray.fromhex( # type: ignore[assignment] "05010906a1018501050719e029e715002501750195088102950175088103950575010508190129059102950175039103950675081500256d05071900296d8100c0050C0901A1018502050C150025017501950709B509B609B709CD09E209E909EA810295018101C0") self.pressed_keys: list[int] = [] self.pressed_consumer_keys: list[int] = [] @@ -204,10 +204,8 @@ def __eq__(self, other: object) -> bool: def finalise(self) -> None: #close device self.hidraw_device.destroy() - del self.hidraw_device self.ev_device.ungrab() self.ev_device.close() - del self.ev_device print("Compatibility Device ",self.device_path," finalised") def __del__(self) -> None: diff --git a/hid_devices.py b/hid_devices.py index 05ba5dd..63948dd 100644 --- a/hid_devices.py +++ b/hid_devices.py @@ -236,26 +236,25 @@ def _filter(d: evdev.InputDevice) -> bool: try: with open('/sys/bus/hid/devices/'+device+'/uevent', 'r') as uevent: m = re.search('HID_NAME\s*=(.+)', uevent.read()) - if m is None: - continue - name: str = m.group(1) - hidraw = os.listdir('/sys/bus/hid/devices/'+device+'/hidraw')[0] - inputs = os.listdir('/sys/bus/hid/devices/'+device+'/input') - events = [] - compatibility_mode = False - for input in inputs: - input_events = [e for e in os.listdir('/sys/bus/hid/devices/' + device + '/input/'+input) if e.startswith('event')] - for event in input_events: - for input_device in self.input_devices: - if input_device["compatibility_mode"] and input_device["path"].find(event)>=0: - compatibility_mode = True - break - events.extend(input_events) - - id = device.split('.')[0] - devs.append({"id":id, "instance":device, "name":name, "hidraw": hidraw, "events":events, "compatibility_mode":compatibility_mode}) - devs_dict[device] = id - if compatibility_mode: devs_in_compatibility_mode.append(device) + if m is not None: + name: str = m.group(1) + hidraw = os.listdir('/sys/bus/hid/devices/'+device+'/hidraw')[0] + inputs = os.listdir('/sys/bus/hid/devices/'+device+'/input') + events = [] + compatibility_mode = False + for input in inputs: + input_events = [e for e in os.listdir('/sys/bus/hid/devices/' + device + '/input/'+input) if e.startswith('event')] + for event in input_events: + for input_device in self.input_devices: + if input_device["compatibility_mode"] and input_device["path"].find(event)>=0: + compatibility_mode = True + break + events.extend(input_events) + + id = device.split('.')[0] + devs.append({"id":id, "instance":device, "name":name, "hidraw": hidraw, "events":events, "compatibility_mode":compatibility_mode}) + devs_dict[device] = id + if compatibility_mode: devs_in_compatibility_mode.append(device) except Exception as exc: print("Error while loading HID device: ", device, ", Error: ", exc,", Skipping.") devs_to_remove = [] diff --git a/install/on_rpi/on_pi_setup.sh b/install/on_rpi/on_pi_setup.sh index b580d57..50a63d7 100644 --- a/install/on_rpi/on_pi_setup.sh +++ b/install/on_rpi/on_pi_setup.sh @@ -20,7 +20,7 @@ systemctl --user stop obex systemctl --user disable obex systemctl --user mask obex -sudo apt-get install git libdbus-1-dev libglib2.0-dev libudev-dev libical-dev libreadline-dev autoconf automake libtool python3-pip -y +sudo apt-get install libcairo2-dev libdbus-1-dev libgirepository1.0-dev libglib2.0-dev libudev-dev libical-dev libreadline-dev autoconf automake libtool python3-pip -y sudo pip3 install -r /home/pi/bthidhub/requirements.txt cd /home/pi/bthidhub/install/on_rpi diff --git a/requirements.txt b/requirements.txt index c636c8d..fb0bed6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ asyncio_glib bitarray # TODO: remove dasbus evdev -hid-tools +hid-tools!=0.4 mypy==1.5.1 PyGObject pyudev diff --git a/stubs/dasbus/connection.pyi b/stubs/dasbus/connection.pyi index e7a44c8..f482627 100644 --- a/stubs/dasbus/connection.pyi +++ b/stubs/dasbus/connection.pyi @@ -1,10 +1,10 @@ -from typing import Callable, Dict, List, Optional +from typing import Callable, Optional from dasbus.typing import Variant class _InterfaceThing: - def connect(self, callback: Callable[[str, List[str]], None]) -> None: ... + def connect(self, callback: Callable[[str, list[str]], None]) -> None: ... class _Signal: @@ -33,7 +33,7 @@ class InterfaceProxy: def CancelPairing(self) -> None: ... def Connect(self) -> None: ... def Disconnect(self) -> None: ... - def GetManagedObjects(self) -> Dict[str, Dict[str, Dict[str, Variant[object]]]]: ... + def GetManagedObjects(self) -> dict[str, dict[str, dict[str, Variant[object]]]]: ... def Pair(self) -> None: ... def RegisterAgent(self, path: str, action: str) -> None: ... def RemoveDevice(self, path: str) -> None: ... @@ -43,6 +43,5 @@ class InterfaceProxy: class SystemMessageBus: - def get_proxy(self, service_name: str, object_path: str, - interface_name: Optional[str] = ...) -> InterfaceProxy: ... + def get_proxy(self, service_name: str, object_path: str, interface_name: str) -> InterfaceProxy: ... def publish_object(self, object_path: str, obj: object) -> None: ... diff --git a/stubs/dasbus/server/interface.pyi b/stubs/dasbus/server/interface.pyi index 267c09e..5f3407e 100644 --- a/stubs/dasbus/server/interface.pyi +++ b/stubs/dasbus/server/interface.pyi @@ -1,5 +1,5 @@ -from typing import Callable, Tuple, Type, TypeVar +from typing import Callable, TypeVar -_T = TypeVar("_T", bound=Type[object]) +_T = TypeVar("_T", bound=type[object]) -def dbus_interface(interface_name: str, namespace: Tuple[str, ...] = ...) -> Callable[[_T], _T]: ... +def dbus_interface(interface_name: str, namespace: tuple[str, ...] = ...) -> Callable[[_T], _T]: ... diff --git a/stubs/dasbus/typing.pyi b/stubs/dasbus/typing.pyi index e33bfed..9dc1ce5 100644 --- a/stubs/dasbus/typing.pyi +++ b/stubs/dasbus/typing.pyi @@ -15,12 +15,14 @@ UInt32 = NewType("UInt32", int) Int64 = NewType("Int64", int) UInt64 = NewType("UInt64", int) -File = IO - ObjPath = NewType('ObjPath', str) -class Variant(Generic[_T]): ... +class Variant(Generic[_T]): + def get_type_string(self) -> str: ... + def get_child_value(self, i: int) -> object: ... + def get_variant(self) -> _T: ... + def unpack(self) -> _T: ... def unwrap_variant(variant: Variant[_T]) -> _T: ... diff --git a/stubs/evdev/__init__.pyi b/stubs/evdev/__init__.pyi index 4e9930e..09371d8 100644 --- a/stubs/evdev/__init__.pyi +++ b/stubs/evdev/__init__.pyi @@ -1,5 +1,5 @@ import asyncio -from typing import Dict, List, NamedTuple +from typing import NamedTuple from . import ecodes @@ -32,11 +32,11 @@ class InputDevice: def __init__(self, dev: str): ... def async_read_loop(self) -> ReadIterator: ... - def capabilities(self, verbose: bool = ..., absinfo: bool = ...) -> Dict[int, List[int]]: ... + def capabilities(self, verbose: bool = ..., absinfo: bool = ...) -> dict[int, list[int]]: ... def close(self) -> None: ... def grab(self) -> None: ... def ungrab(self) -> None: ... def categorize(event: InputEvent) -> InputEvent: ... -def list_devices(input_device_dir: str = ...) -> List[str]: ... +def list_devices(input_device_dir: str = ...) -> list[str]: ... diff --git a/stubs/hidtools/uhid.pyi b/stubs/hidtools/uhid.pyi index 7fee60b..f213583 100644 --- a/stubs/hidtools/uhid.pyi +++ b/stubs/hidtools/uhid.pyi @@ -1,11 +1,26 @@ -from typing import Iterable, Tuple +from typing import Iterable, Optional, Union class UHIDDevice: - info: Tuple[int, int, int] - name: str - phys: str - rdesc: bytes + @property + def name(self) -> Optional[str]: ... + @name.setter + def name(self, name: str) -> None: ... - def call_input_event(self, data: Iterable[int]) -> None: ... + @property + def info(self) -> Optional[tuple[int, int, int]]: ... + @info.setter + def info(self, info: tuple[int, int, int]) -> None: ... + + @property + def phys(self) -> Optional[str]: ... + @phys.setter + def phys(self, phys: str) -> None: ... + + @property + def rdesc(self) -> Optional[list[int]]: ... + @rdesc.setter + def rdesc(self, rdesc: Union[str, bytes]) -> None: ... + + def call_input_event(self, _data: Union[list[int], bytes]) -> None: ... def create_kernel_device(self) -> None: ... def destroy(self) -> None: ...