diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 55e8a44..d03739d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -11,3 +11,16 @@ include: project: QubesOS/qubes-continuous-integration - file: /r4.2/gitlab-vm.yml project: QubesOS/qubes-continuous-integration + +mypy: + stage: checks + image: fedora:40 + tags: + - docker + before_script: + - sudo dnf install -y python3-mypy python3-pip + script: + - mypy --install-types --non-interactive --ignore-missing-imports --junit-xml mypy.xml qubesusbproxy + artifacts: + reports: + junit: mypy.xml \ No newline at end of file diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index fc37044..665fcce 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -21,8 +21,9 @@ # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - +import asyncio import collections +import dataclasses import fcntl import grp import os @@ -32,16 +33,19 @@ import sys import tempfile -from typing import List, Optional, Dict, Tuple +from typing import List, Optional, Dict, Tuple, Any + +from qubes.utils import sanitize_stderr_for_log try: from qubes.device_protocol import DeviceInfo from qubes.device_protocol import DeviceInterface + from qubes.device_protocol import Port from qubes.ext import utils - from qubes.devices import UnrecognizedDevice def get_assigned_devices(devices): yield from devices.get_assigned_devices() + except ImportError: # This extension supports both the legacy and new device API. # In the case of the legacy backend, functionality is limited. @@ -49,16 +53,16 @@ def get_assigned_devices(devices): from qubesusbproxy import utils class DescriptionOverrider: + # pylint: disable=too-few-public-methods @property def description(self): return self.name - class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo): - def __init__(self, *args, **kwargs): + class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo): # type: ignore + def __init__(self, port): # not supported options in legacy code - del kwargs['devclass'] - self.safe_chars = self.safe_chars.replace(' ', '') - super().__init__(*args, **kwargs) + self.safe_chars = self.safe_chars.replace(" ", "") + super().__init__(port.backend_domain, port.port_id) # needed but not in legacy DeviceInfo self._vendor = None @@ -73,10 +77,14 @@ def __init__(self, *args, **kwargs): def frontend_domain(self): return self.attachment - class DeviceInterface: - pass + @dataclasses.dataclass + class Port: # type: ignore + backend_domain: Any + port_id: Any + devclass: Any - class UnrecognizedDevice(ValueError): + class DeviceInterface: # type: ignore + # pylint: disable=too-few-public-methods pass def get_assigned_devices(devices): @@ -89,25 +97,32 @@ def get_assigned_devices(devices): usb_device_re = re.compile(r"^[0-9]+-[0-9]+(_[0-9]+)*$") # should match valid VM name -usb_connected_to_re = re.compile(br"^[a-zA-Z][a-zA-Z0-9_.-]*$") -usb_device_hw_ident_re = re.compile(r'^[0-9a-f]{4}:[0-9a-f]{4} ') +usb_connected_to_re = re.compile(rb"^[a-zA-Z][a-zA-Z0-9_.-]*$") +usb_device_hw_ident_re = re.compile(r"^[0-9a-f]{4}:[0-9a-f]{4} ") -HWDATA_PATH = '/usr/share/hwdata' +HWDATA_PATH = "/usr/share/hwdata" class USBDevice(DeviceInfo): # pylint: disable=too-few-public-methods - def __init__(self, backend_domain, ident): + def __init__(self, port: qubes.device_protocol.Port): + if port.devclass != "usb": + raise qubes.exc.QubesValueError( + f"Incompatible device class for input port: {port.devclass}" + ) + # the superclass can restrict the allowed characters - self.safe_chars = (string.ascii_letters + string.digits - + string.punctuation + ' ') - super(USBDevice, self).__init__( - backend_domain=backend_domain, ident=ident, devclass="usb") + self.safe_chars = ( + string.ascii_letters + string.digits + string.punctuation + " " + ) + + # init parent class + super().__init__(port) - self._qdb_ident = ident.replace('.', '_') - self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident - self._vendor_id = None - self._product_id = None + self._qdb_ident = port.port_id.replace(".", "_") + self._qdb_path = "/qubes-usb-devices/" + self._qdb_ident + self._vendor_id: Optional[str] = None + self._product_id: Optional[str] = None @property def vendor(self) -> str: @@ -211,9 +226,8 @@ def _load_interfaces_from_qubesdb(self) -> List[DeviceInterface]: if not self.backend_domain.is_running(): # don't cache this value return result - untrusted_interfaces: bytes = ( - self.backend_domain.untrusted_qdb.read( - self._qdb_path + '/interfaces') + untrusted_interfaces: bytes = self.backend_domain.untrusted_qdb.read( + self._qdb_path + "/interfaces" ) if not untrusted_interfaces: return result @@ -221,35 +235,40 @@ def _load_interfaces_from_qubesdb(self) -> List[DeviceInterface]: DeviceInterface( self._sanitize(ifc, safe_chars=string.hexdigits), devclass="usb" ) - for ifc in untrusted_interfaces.split(b':') + for ifc in untrusted_interfaces.split(b":") if ifc ] return result def _load_desc_from_qubesdb(self) -> Dict[str, str]: unknown = "unknown" - result = {"vendor": unknown, - "vendor ID": "0000", - "product": unknown, - "product ID": "0000", - "manufacturer": unknown, - "name": unknown, - "serial": unknown} + result = { + "vendor": unknown, + "vendor ID": "0000", + "product": unknown, + "product ID": "0000", + "manufacturer": unknown, + "name": unknown, + "serial": unknown, + } if not self.backend_domain.is_running(): # don't cache this value return result - untrusted_device_desc: bytes = ( - self.backend_domain.untrusted_qdb.read( - self._qdb_path + '/desc') + untrusted_device_desc: bytes = self.backend_domain.untrusted_qdb.read( + self._qdb_path + "/desc" ) if not untrusted_device_desc: return result try: - (untrusted_vend_prod_id, untrusted_manufacturer, - untrusted_name, untrusted_serial - ) = untrusted_device_desc.split(b' ') + ( + untrusted_vend_prod_id, + untrusted_manufacturer, + untrusted_name, + untrusted_serial, + ) = untrusted_device_desc.split(b" ") untrusted_vendor_id, untrusted_product_id = ( - untrusted_vend_prod_id.split(b':')) + untrusted_vend_prod_id.split(b":") + ) except ValueError: # desc doesn't contain correctly formatted data, # but it is not empty. We cannot parse it, @@ -257,27 +276,31 @@ def _load_desc_from_qubesdb(self) -> Dict[str, str]: # some information to the user. untrusted_vendor_id, untrusted_product_id = (b"0000", b"0000") (untrusted_manufacturer, untrusted_serial) = ( - unknown.encode() for _ in range(2)) - untrusted_name = untrusted_device_desc.replace(b' ', b'_') + unknown.encode() for _ in range(2) + ) + untrusted_name = untrusted_device_desc.replace(b" ", b"_") # Data successfully loaded, cache these values self._vendor_id = result["vendor ID"] = self._sanitize( - untrusted_vendor_id) + untrusted_vendor_id + ) self._product_id = result["product ID"] = self._sanitize( - untrusted_product_id) + untrusted_product_id + ) vendor, product = self._get_vendor_and_product_names( - self._vendor_id, self._product_id) + self._vendor_id, self._product_id + ) self._vendor = result["vendor"] = self._sanitize(vendor.encode()) self._product = result["product"] = self._sanitize(product.encode()) - self._manufacturer = result["manufacturer"] = ( - self._sanitize(untrusted_manufacturer)) + self._manufacturer = result["manufacturer"] = self._sanitize( + untrusted_manufacturer + ) self._name = result["name"] = self._sanitize(untrusted_name) self._name = result["serial"] = self._sanitize(untrusted_serial) return result def _sanitize( - self, untrusted_device_desc: bytes, - safe_chars: Optional[str] = None + self, untrusted_device_desc: bytes, safe_chars: Optional[str] = None ) -> str: # rb'USB\x202.0\x20Camera' -> 'USB 2.0 Camera' if safe_chars is None: @@ -288,29 +311,29 @@ def _sanitize( i = 0 while i < len(untrusted_device_desc): c = chr(untrusted_device_desc[i]) - if c == '\\': + if c == "\\": i += 1 if i >= len(untrusted_device_desc): break c = chr(untrusted_device_desc[i]) - if c == 'x': + if c == "x": i += 2 if i >= len(untrusted_device_desc): break - hex_code = untrusted_device_desc[i - 1: i + 1] + hex_code = untrusted_device_desc[i - 1 : i + 1] try: for j in range(2): - if hex_code[j] not in b'0123456789abcdefABCDEF': + if hex_code[j] not in b"0123456789abcdefABCDEF": raise ValueError() hex_value = int(hex_code, 16) c = chr(hex_value) except ValueError: - c = '_' + c = "_" if c in safe_chars_set: result += c else: - result += '_' + result += "_" i += 1 return result @@ -319,29 +342,33 @@ def attachment(self): if not self.backend_domain.is_running(): return None untrusted_connected_to = self.backend_domain.untrusted_qdb.read( - self._qdb_path + '/connected-to' + self._qdb_path + "/connected-to" ) if not untrusted_connected_to: return None if not usb_connected_to_re.match(untrusted_connected_to): self.backend_domain.log.warning( - 'Device {} has invalid chars in connected-to ' - 'property'.format(self.ident)) + f"Device {self.port_id} has invalid chars in connected-to " + "property" + ) return None untrusted_connected_to = untrusted_connected_to.decode( - 'ascii', errors='strict') + "ascii", errors="strict" + ) try: connected_to = self.backend_domain.app.domains[ - untrusted_connected_to] + untrusted_connected_to + ] except KeyError: self.backend_domain.log.warning( - f'Device {self.ident} has invalid VM name in connected-to ' - f'property: {untrusted_connected_to}') + f"Device {self.port_id} has invalid VM name in connected-to " + f"property: {untrusted_connected_to}" + ) return None return connected_to @property - def self_identity(self) -> str: + def device_id(self) -> str: """ Get identification of a device not related to port. """ @@ -353,24 +380,24 @@ def self_identity(self) -> str: product_id = self._load_desc_from_qubesdb()["product ID"] else: product_id = self._product_id - interfaces = ''.join(repr(ifc) for ifc in self.interfaces) + interfaces = "".join(repr(ifc) for ifc in self.interfaces) serial = self.serial if self.serial != "unknown" else "" - return \ - f'{vendor_id}:{product_id}:{serial}:{interfaces}' + return f"{vendor_id}:{product_id}:{serial}:{interfaces}" @staticmethod def _get_vendor_and_product_names( - vendor_id: str, product_id: str + vendor_id: str, product_id: str ) -> Tuple[str, str]: """ Return tuple of vendor's and product's names for the ids. If the id is not known, return ("unknown", "unknown"). """ - return (USBDevice._load_usb_known_devices() - .get(vendor_id, dict()) - .get(product_id, ("unknown", "unknown")) - ) + return ( + USBDevice._load_usb_known_devices() + .get(vendor_id, {}) + .get(product_id, ("unknown", "unknown")) + ) @staticmethod def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: @@ -387,30 +414,35 @@ def _load_usb_known_devices() -> Dict[str, Dict[str, Tuple[str, str]]]: # C class class_name # subclass subclass_name <-- single tab # prog-if prog-if_name <-- two tabs - result = {} - with open(HWDATA_PATH + '/usb.ids', - encoding='utf-8', errors='ignore') as usb_ids: + result: Dict[str, Dict] = {} + with open( + HWDATA_PATH + "/usb.ids", encoding="utf-8", errors="ignore" + ) as usb_ids: + vendor_id: Optional[str] = None + vendor_name: Optional[str] = None for line in usb_ids.readlines(): line = line.rstrip() - if line.startswith('#'): + if line.startswith("#"): # skip comments continue - elif not line: + if not line: # skip empty lines continue - elif line.startswith('\t\t'): + if line.startswith("\t\t"): # skip interfaces continue - elif line.startswith('C '): + if line.startswith("C "): # description of classes starts here, we can finish break - elif line.startswith('\t'): + if line.startswith("\t"): # save vendor, device pair - device_id, _, device_name = line[1:].split(' ', 2) + device_id, _, device_name = line[1:].split(" ", 2) + if vendor_id is None or vendor_name is None: + continue result[vendor_id][device_id] = vendor_name, device_name else: # new vendor - vendor_id, _, vendor_name = line[:].split(' ', 2) + vendor_id, _, vendor_name = line[:].split(" ", 2) result[vendor_id] = {} return result @@ -435,9 +467,9 @@ def modify_qrexec_policy(service, line, add): :param add: True if line should be added, otherwise False :return: None """ - path = '/etc/qubes-rpc/policy/{}'.format(service) + path = f"/etc/qubes-rpc/policy/{service}" while True: - with open(path, 'a+') as policy: + with open(path, "a+") as policy: # take the lock here, it's released by closing the file fcntl.lockf(policy.fileno(), fcntl.LOCK_EX) # While we were waiting for lock, someone could have unlink()ed @@ -460,12 +492,14 @@ def modify_qrexec_policy(service, line, add): if policy_rules: with tempfile.NamedTemporaryFile( - prefix=path, delete=False) as policy_new: - policy_new.write(''.join(policy_rules).encode()) + prefix=path, delete=False + ) as policy_new: + policy_new.write("".join(policy_rules).encode()) policy_new.flush() try: - os.chown(policy_new.name, -1, - grp.getgrnam('qubes').gr_gid) + os.chown( + policy_new.name, -1, grp.getgrnam("qubes").gr_gid + ) os.chmod(policy_new.name, 0o660) except KeyError: # group 'qubes' not found # don't change mode if no 'qubes' group in the system @@ -479,118 +513,132 @@ def modify_qrexec_policy(service, line, add): class USBDeviceExtension(qubes.ext.Extension): def __init__(self): - super(USBDeviceExtension, self).__init__() + super().__init__() # include dom0 devices in listing only when usb-proxy is really # installed there self.usb_proxy_installed_in_dom0 = os.path.exists( - '/etc/qubes-rpc/qubes.USB') + "/etc/qubes-rpc/qubes.USB" + ) self.devices_cache = collections.defaultdict(dict) - @qubes.ext.handler('domain-init', 'domain-load') + @qubes.ext.handler("domain-init", "domain-load") def on_domain_init_load(self, vm, event): """Initialize watching for changes""" - # pylint: disable=unused-argument,no-self-use - vm.watch_qdb_path('/qubes-usb-devices') - if event == 'domain-load': + # pylint: disable=unused-argument + vm.watch_qdb_path("/qubes-usb-devices") + if event == "domain-load": # avoid building a cache on domain-init, as it isn't fully set yet, # and definitely isn't running yet current_devices = { - dev.ident: dev.attachment + dev.port_id: dev.attachment for dev in self.on_device_list_usb(vm, None) } self.devices_cache[vm.name] = current_devices else: self.devices_cache[vm.name] = {} - async def attach_and_notify(self, vm, device, options): + async def attach_and_notify(self, vm, assignment): # bypass DeviceCollection logic preventing double attach - try: - await self.on_device_attach_usb( - vm, 'device-pre-attach:usb', device, options) - except UnrecognizedDevice: - return + device = assignment.device + if assignment.mode.value == "ask-to-attach": + allowed = await utils.confirm_device_attachment( + device, {vm: assignment} + ) + allowed = allowed.strip() + if vm.name != allowed: + return + await self.on_device_attach_usb( + vm, "device-pre-attach:usb", device, assignment.options + ) await vm.fire_event_async( - 'device-attach:usb', device=device, options=options) + "device-attach:usb", device=device, options=assignment.options + ) + + def ensure_detach(self, vm, port): + """ + Run this method if device is no longer detected. - @qubes.ext.handler('domain-qdb-change:/qubes-usb-devices') + No additional action required in case of USB devices. + """ + pass + + @qubes.ext.handler("domain-qdb-change:/qubes-usb-devices") def on_qdb_change(self, vm, event, path): """A change in QubesDB means a change in a device list.""" - # pylint: disable=unused-argument,no-self-use - current_devices = dict((dev.ident, dev.attachment) - for dev in self.on_device_list_usb(vm, None)) + # pylint: disable=unused-argument + current_devices = dict( + (dev.port_id, dev.attachment) + for dev in self.on_device_list_usb(vm, None) + ) utils.device_list_change(self, current_devices, vm, path, USBDevice) - @qubes.ext.handler('device-list:usb') + @qubes.ext.handler("device-list:usb") def on_device_list_usb(self, vm, event): - # pylint: disable=unused-argument,no-self-use + # pylint: disable=unused-argument - if not vm.is_running() or not hasattr(vm, 'untrusted_qdb'): + if not vm.is_running() or not hasattr(vm, "untrusted_qdb"): return - if isinstance(vm, qubes.vm.adminvm.AdminVM) and not \ - self.usb_proxy_installed_in_dom0: + if ( + isinstance(vm, qubes.vm.adminvm.AdminVM) + and not self.usb_proxy_installed_in_dom0 + ): return - untrusted_dev_list = vm.untrusted_qdb.list('/qubes-usb-devices/') + untrusted_dev_list = vm.untrusted_qdb.list("/qubes-usb-devices/") if not untrusted_dev_list: return # just get a list of devices, not its every property - untrusted_dev_list = \ - set(path.split('/')[2] for path in untrusted_dev_list) + untrusted_dev_list = set( + path.split("/")[2] for path in untrusted_dev_list + ) for untrusted_qdb_ident in untrusted_dev_list: if not usb_device_re.match(untrusted_qdb_ident): - vm.log.warning('Invalid USB device name detected') + vm.log.warning("Invalid USB device name detected") continue - ident = untrusted_qdb_ident.replace('_', '.') - yield USBDevice(vm, ident) + port_id = untrusted_qdb_ident.replace("_", ".") + yield USBDevice(Port(vm, port_id, "usb")) - @qubes.ext.handler('device-get:usb') - def on_device_get_usb(self, vm, event, ident): - # pylint: disable=unused-argument,no-self-use + @qubes.ext.handler("device-get:usb") + def on_device_get_usb(self, vm, event, port_id): + # pylint: disable=unused-argument if not vm.is_running(): return + if vm.untrusted_qdb.list( - '/qubes-usb-devices/' + ident.replace('.', '_')): - yield USBDevice(vm, ident) + "/qubes-usb-devices/" + port_id.replace(".", "_") + ): + yield USBDevice(Port(vm, port_id, "usb")) @staticmethod def get_all_devices(app): for vm in app.domains: - if not vm.is_running(): + if not vm.is_running() or not hasattr(vm, "devices"): continue - for dev in vm.devices['usb']: + for dev in vm.devices["usb"]: # there may be more than one USB-passthrough implementation if isinstance(dev, USBDevice): yield dev - @qubes.ext.handler('device-list-attached:usb') + @qubes.ext.handler("device-list-attached:usb") def on_device_list_attached(self, vm, event, **kwargs): - # pylint: disable=unused-argument,no-self-use + # pylint: disable=unused-argument if not vm.is_running(): return for dev in self.get_all_devices(vm.app): if dev.attachment == vm: - yield (dev, {'identity': dev.self_identity}) + yield (dev, {}) - @qubes.ext.handler('device-pre-attach:usb') + @qubes.ext.handler("device-pre-attach:usb") async def on_device_attach_usb(self, vm, event, device, options): # pylint: disable=unused-argument if options: - if list(options.keys()) != ['identity']: - raise qubes.exc.QubesException( - 'USB device attach do not support user options') - identity = options['identity'] - if identity != 'any' and device.self_identity != identity: - print(f"Unrecognized identity, skipping attachment of {device}", - file=sys.stderr) - raise UnrecognizedDevice( - "Device presented identity " - f"{device.self_identity} " - f"does not match expected {identity}" - ) + raise qubes.exc.QubesException( + "USB device attach does not support user options" + ) if not vm.is_running() or vm.qid == 0: # print(f"Qube is not running, skipping attachment of {device}", @@ -605,95 +653,130 @@ async def on_device_attach_usb(self, vm, event, device, options): if device.attachment: raise qubes.devices.DeviceAlreadyAttached( - 'Device {!s} already attached to {!s}'.format( - device, device.attachment) + f"Device {device} already attached to {device.attachment}" ) stubdom_qrexec = ( - vm.virt_mode == 'hvm' - and vm.features.check_with_template('stubdom-qrexec', False)) + vm.virt_mode == "hvm" + and vm.features.check_with_template("stubdom-qrexec", False) + ) - name = vm.name + '-dm' if stubdom_qrexec else vm.name + name = vm.name + "-dm" if stubdom_qrexec else vm.name extra_kwargs = {} if stubdom_qrexec: - extra_kwargs['stubdom'] = True + extra_kwargs["stubdom"] = True # update the cache before the call, to avoid sending duplicated events # (one on qubesdb watch and the other by the caller of this method) - self.devices_cache[device.backend_domain.name][device.ident] = vm + self.devices_cache[device.backend_domain.name][device.port_id] = vm # set qrexec policy to allow this device - policy_line = '{} {} allow,user=root\n'.format(name, - device.backend_domain.name) - modify_qrexec_policy('qubes.USB+{}'.format(device.ident), - policy_line, True) + policy_line = f"{name} {device.backend_domain.name} allow,user=root\n" + modify_qrexec_policy(f"qubes.USB+{device.port_id}", policy_line, True) try: # and actual attach try: - await vm.run_service_for_stdio('qubes.USBAttach', - user='root', - input='{} {}\n'.format(device.backend_domain.name, - device.ident).encode(), **extra_kwargs) + await vm.run_service_for_stdio( + "qubes.USBAttach", + user="root", + input=f"{device.backend_domain.name} " + f"{device.port_id}\n".encode(), + **extra_kwargs, + ) except subprocess.CalledProcessError as e: + # pylint: disable=raise-missing-from if e.returncode == 127: raise USBProxyNotInstalled( - "qubes-usb-proxy not installed in the VM") - else: - # TODO: sanitize and include stdout - sanitized_stderr = e.stderr.replace(b"\n", b", ") - sanitized_stderr = ''.join( - [chr(c) for c in sanitized_stderr if 0x20 <= c < 0x80]) - if sanitized_stderr.endswith(", "): - sanitized_stderr = santizied_stderr[:-2] + "." - raise QubesUSBException( - 'Device attach failed: {}'.format(sanitized_stderr)) + "qubes-usb-proxy not installed in the VM" + ) + raise QubesUSBException( + f"Device attach failed: {sanitize_stderr_for_log(e.output)}" + f" {sanitize_stderr_for_log(e.stderr)}" + ) finally: - modify_qrexec_policy('qubes.USB+{}'.format(device.ident), - policy_line, False) + modify_qrexec_policy( + f"qubes.USB+{device.port_id}", policy_line, False + ) - @qubes.ext.handler('device-pre-detach:usb') - async def on_device_detach_usb(self, vm, event, device): - # pylint: disable=unused-argument,no-self-use + @qubes.ext.handler("device-pre-detach:usb") + async def on_device_detach_usb(self, vm, event, port): + # pylint: disable=unused-argument if not vm.is_running() or vm.qid == 0: return - if not isinstance(device, USBDevice): - return - - connected_to = device.attachment - # detect race conditions; there is still race here, but much smaller - if connected_to is None or connected_to.qid != vm.qid: + for attached, _options in self.on_device_list_attached(vm, event): + if attached.port == port: + break + else: raise QubesUSBException( - "Device {!s} not connected to VM {}".format( - device, vm.name)) + f"Device {port} not connected to VM {vm.name}" + ) # update the cache before the call, to avoid sending duplicated events # (one on qubesdb watch and the other by the caller of this method) - self.devices_cache[device.backend_domain.name][device.ident] = None + backend = attached.backend_domain + self.devices_cache[backend.name][attached.port_id] = None try: - await device.backend_domain.run_service_for_stdio( - 'qubes.USBDetach', - user='root', - input='{}\n'.format(device.ident).encode()) + await backend.run_service_for_stdio( + "qubes.USBDetach", + user="root", + input=f"{attached.port_id}\n".encode(), + ) except subprocess.CalledProcessError as e: - # TODO: sanitize and include stdout - raise QubesUSBException('Device detach failed') + # pylint: disable=raise-missing-from + raise QubesUSBException( + f"Device detach failed: {sanitize_stderr_for_log(e.output)}" + f" {sanitize_stderr_for_log(e.stderr)}" + ) + + @qubes.ext.handler("device-pre-assign:usb") + async def on_device_assign_usb(self, vm, event, device, options): + # pylint: disable=unused-argument - @qubes.ext.handler('domain-start') + if options: + raise qubes.exc.QubesException( + "USB device assignment does not support user options" + ) + + @qubes.ext.handler("domain-start") async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument - for assignment in get_assigned_devices(vm.devices['usb']): - await self.attach_and_notify( - vm, assignment.device, assignment.options) + to_attach = {} + assignments = get_assigned_devices(vm.devices["usb"]) + # the most specific assignments first + for assignment in reversed(sorted(assignments)): + for device in assignment.devices: + if isinstance(device, qubes.device_protocol.UnknownDevice): + continue + if device.attachment: + continue + if not assignment.matches(device): + print( + "Unrecognized identity, skipping attachment of device " + f"from the port {assignment}", + file=sys.stderr, + ) + continue + # chose first assignment (the most specific) and ignore rest + if device not in to_attach: + # make it unique + to_attach[device] = assignment.clone(device=device) + in_progress = set() + for assignment in to_attach.values(): + in_progress.add( + asyncio.ensure_future(self.attach_and_notify(vm, assignment)) + ) + if in_progress: + await asyncio.wait(in_progress) - @qubes.ext.handler('domain-shutdown') + @qubes.ext.handler("domain-shutdown") async def on_domain_shutdown(self, vm, _event, **_kwargs): # pylint: disable=unused-argument - vm.fire_event('device-list-change:usb') + vm.fire_event("device-list-change:usb") - @qubes.ext.handler('qubes-close', system=True) + @qubes.ext.handler("qubes-close", system=True) def on_qubes_close(self, app, event): # pylint: disable=unused-argument self.devices_cache.clear() diff --git a/qubesusbproxy/tests.py b/qubesusbproxy/tests.py index 413a62e..9e29274 100644 --- a/qubesusbproxy/tests.py +++ b/qubesusbproxy/tests.py @@ -21,19 +21,28 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # +import time import unittest +from unittest import mock +from unittest.mock import Mock, AsyncMock + +import jinja2 import qubes.tests.extra -import time -core2 = False core3 = False -legacy = False +LEGACY = False try: import qubesusbproxy.core3ext import asyncio try: - from qubes.device_protocol import DeviceAssignment + from qubes.device_protocol import DeviceAssignment, VirtualDevice, Port + + def make_assignment(backend, ident, auto_attach=False): + return DeviceAssignment( + VirtualDevice(Port(backend, ident, "usb")), + mode="auto-attach" if auto_attach else "manual", + ) def assign(test, collection, assignment): test.loop.run_until_complete(collection.assign(assignment)) @@ -41,82 +50,84 @@ def assign(test, collection, assignment): def unassign(test, collection, assignment): test.loop.run_until_complete(collection.unassign(assignment)) - AUTO_ATTACH = {"attach_automatically": True, "required": True} except ImportError: # This extension supports both the legacy and new device API. # In the case of the legacy backend, functionality is limited. from qubes.devices import DeviceAssignment + def make_assignment(backend, ident, required=False): + # pylint: disable=unexpected-keyword-arg + return DeviceAssignment(backend, ident, persistent=required) + def assign(test, collection, assignment): test.loop.run_until_complete(collection.attach(assignment)) def unassign(test, collection, assignment): test.loop.run_until_complete(collection.detach(assignment)) - - legacy = True - AUTO_ATTACH = {"persistent": True} - core3 = True -except ImportError: - pass + LEGACY = True -try: - import qubes.qubesutils - - core2 = True + core3 = True except ImportError: pass is_r40 = False try: - with open('/etc/qubes-release') as f: - if 'R4.0' in f.read(): + with open("/etc/qubes-release") as f: + if "R4.0" in f.read(): is_r40 = True except FileNotFoundError: pass -GADGET_PREREQ = '&&'.join([ - "modprobe dummy_hcd", - "modprobe usb_f_mass_storage", - "mount|grep -q configfs", - "test -d /sys/class/udc/dummy_udc.0", -]) - -GADGET_PREPARE = ';'.join([ - "set -e -x", - "cd /sys/kernel/config/usb_gadget", - "mkdir test_g1; cd test_g1", - "echo 0x1234 > idProduct", - "echo 0x1234 > idVendor", - "mkdir strings/0x409", - "echo 0123456789 > strings/0x409/serialnumber", - "echo Qubes > strings/0x409/manufacturer", - "echo Test device > strings/0x409/product", - "mkdir configs/c.1", - "mkdir functions/mass_storage.ms1", - "truncate -s 512M /var/tmp/test-file", - "echo /var/tmp/test-file > functions/mass_storage.ms1/lun.0/file", - "ln -s functions/mass_storage.ms1 configs/c.1", - "echo dummy_udc.0 > UDC", - "sleep 2; udevadm settle", -]) +GADGET_PREREQ = "&&".join( + [ + "modprobe dummy_hcd", + "modprobe usb_f_mass_storage", + "mount|grep -q configfs", + "test -d /sys/class/udc/dummy_udc.0", + ] +) + +GADGET_PREPARE = ";".join( + [ + "set -e -x", + "cd /sys/kernel/config/usb_gadget", + "mkdir test_g1; cd test_g1", + "echo 0x1234 > idProduct", + "echo 0x1234 > idVendor", + "mkdir strings/0x409", + "echo 0123456789 > strings/0x409/serialnumber", + "echo Qubes > strings/0x409/manufacturer", + "echo Test device > strings/0x409/product", + "mkdir configs/c.1", + "mkdir functions/mass_storage.ms1", + "truncate -s 512M /var/tmp/test-file", + "echo /var/tmp/test-file > functions/mass_storage.ms1/lun.0/file", + "ln -s functions/mass_storage.ms1 configs/c.1", + "echo dummy_udc.0 > UDC", + "sleep 2; udevadm settle", + ] +) def create_usb_gadget(vm): vm.start() - p = vm.run(GADGET_PREREQ, user="root", - passio_popen=True, passio_stderr=True) + p = vm.run( + GADGET_PREREQ, user="root", passio_popen=True, passio_stderr=True + ) (_, _stderr) = p.communicate() if p.returncode != 0: raise unittest.SkipTest("missing USB Gadget subsystem") - p = vm.run(GADGET_PREPARE, user="root", - passio_popen=True, passio_stderr=True) + p = vm.run( + GADGET_PREPARE, user="root", passio_popen=True, passio_stderr=True + ) (_, stderr) = p.communicate() if p.returncode != 0: raise RuntimeError("Failed to setup USB gadget: " + stderr.decode()) p = vm.run( - 'ls /sys/bus/platform/devices/dummy_hcd.0/usb*|grep -x .-.', - passio_popen=True) + "ls /sys/bus/platform/devices/dummy_hcd.0/usb*|grep -x .-.", + passio_popen=True, + ) (stdout, _) = p.communicate() stdout = stdout.strip() if not stdout: @@ -127,26 +138,30 @@ def create_usb_gadget(vm): def remove_usb_gadget(vm): assert vm.is_running() - retcode = vm.run("echo > /sys/kernel/config/usb_gadget/test_g1/UDC", - user="root", wait=True) + retcode = vm.run( + "echo > /sys/kernel/config/usb_gadget/test_g1/UDC", + user="root", + wait=True, + ) if retcode != 0: raise RuntimeError("Failed to disable USB gadget") def recreate_usb_gadget(vm): - '''Re-create the gadget previously created with *create_usb_gadget*, + """Re-create the gadget previously created with *create_usb_gadget*, then removed with *remove_usb_gadget*. - ''' - - reconnect = ";".join([ - "cd /sys/kernel/config/usb_gadget", - "mkdir test_g1; cd test_g1", - "echo dummy_udc.0 > UDC", - "sleep 2; udevadm settle", - ]) - - p = vm.run(reconnect, user="root", - passio_popen=True, passio_stderr=True) + """ + + reconnect = ";".join( + [ + "cd /sys/kernel/config/usb_gadget", + "mkdir test_g1; cd test_g1", + "echo dummy_udc.0 > UDC", + "sleep 2; udevadm settle", + ] + ) + + p = vm.run(reconnect, user="root", passio_popen=True, passio_stderr=True) (_, stderr) = p.communicate() if p.returncode != 0: raise RuntimeError("Failed to re-create USB gadget: " + stderr.decode()) @@ -154,348 +169,257 @@ def recreate_usb_gadget(vm): class TC_00_USBProxy(qubes.tests.extra.ExtraTestCase): def setUp(self): - if 'whonix-gw' in self.template: - self.skipTest('whonix-gw does not have qubes-usb-proxy') - super(TC_00_USBProxy, self).setUp() + if "whonix-gw" in self.template: + self.skipTest("whonix-gw does not have qubes-usb-proxy") + super().setUp() vms = self.create_vms(["backend", "frontend"]) (self.backend, self.frontend) = vms - self.qrexec_policy('qubes.USB', self.frontend.name, self.backend.name) + self.qrexec_policy("qubes.USB", self.frontend.name, self.backend.name) self.dummy_usb_dev = create_usb_gadget(self.backend).decode() def test_000_attach_detach(self): self.frontend.start() # TODO: check qubesdb entries - self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format( - self.backend.name, - self.dummy_usb_dev)), 0, - "qubes.USBAttach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBAttach", + user="root", + input=f"{self.backend.name} {self.dummy_usb_dev}\n", + ), + 0, + "qubes.USBAttach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) # TODO: check qubesdb entries - self.assertEqual(self.frontend.run_service('qubes.USBDetach', - user='root', - input="{} {}\n".format( - self.backend.name, - self.dummy_usb_dev)), 0, - "qubes.USBDetach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device disconnection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBDetach", + user="root", + input=f"{self.backend.name} {self.dummy_usb_dev}\n", + ), + 0, + "qubes.USBDetach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 1, + "Device disconnection failed", + ) def test_010_attach_detach_vid_pid(self): self.frontend.start() # TODO: check qubesdb entries - self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format( - self.backend.name, - "0x1234.0x1234")), 0, - "qubes.USBAttach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBAttach", + user="root", + input=f"{self.backend.name} 0x1234.0x1234\n", + ), + 0, + "qubes.USBAttach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) # TODO: check qubesdb entries - self.assertEqual(self.frontend.run_service('qubes.USBDetach', - user='root', - input="{} {}\n".format( - self.backend.name, - "0x1234.0x1234")), 0, - "qubes.USBDetach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device disconnection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBDetach", + user="root", + input=f"{self.backend.name} 0x1234.0x1234\n", + ), + 0, + "qubes.USBDetach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 1, + "Device disconnection failed", + ) def test_020_detach_on_remove(self): self.frontend.start() - self.assertEqual(self.frontend.run_service('qubes.USBAttach', - user='root', - input="{} {}\n".format( - self.backend.name, - self.dummy_usb_dev)), 0, - "qubes.USBAttach call failed") - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run_service( + "qubes.USBAttach", + user="root", + input=f"{self.backend.name} {self.dummy_usb_dev}\n", + ), + 0, + "qubes.USBAttach call failed", + ) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) remove_usb_gadget(self.backend) # FIXME: usb-export script may update qubesdb/disconnect with 1sec delay time.sleep(2) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', wait=True), 1, - "Device not cleaned up") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 1, + "Device not cleaned up", + ) # TODO: check for kernel errors? -class TC_10_USBProxy_core2(qubes.tests.extra.ExtraTestCase): - def setUp(self): - super(TC_10_USBProxy_core2, self).setUp() - vms = self.create_vms(["backend", "frontend"]) - (self.backend, self.frontend) = vms - self.qrexec_policy('qubes.USB', self.frontend.name, self.backend.name) - self.dummy_usb_dev = create_usb_gadget(self.backend) - self.usbdev_name = '{}:{}'.format(self.backend.name, self.dummy_usb_dev) - - def test_000_list_all(self): - usb_list = qubes.qubesutils.usb_list(self.qc) - self.assertIn(self.usbdev_name, usb_list) - - def test_010_list_vm(self): - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertIn(self.usbdev_name, usb_list) - - def test_020_attach(self): - self.frontend.start() - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - try: - qubes.qubesutils.usb_attach(self.qc, - self.frontend, - usb_list[self.usbdev_name]) - except qubes.qubesutils.USBProxyNotInstalled as e: - self.skipTest(str(e)) - - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") - - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertEqual(usb_list[self.usbdev_name]['connected-to'], - self.frontend) - - def test_030_detach(self): - self.frontend.start() - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - try: - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) - except qubes.qubesutils.USBProxyNotInstalled as e: - self.skipTest(str(e)) - - qubes.qubesutils.usb_detach(self.qc, self.frontend, - usb_list[self.usbdev_name]) - # FIXME: usb-export script may update qubesdb with 1sec delay - time.sleep(2) - - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertIsNone(usb_list[self.usbdev_name]['connected-to']) - - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") - - def test_040_detach_all(self): - self.frontend.start() - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - try: - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) - except qubes.qubesutils.USBProxyNotInstalled as e: - self.skipTest(str(e)) - - qubes.qubesutils.usb_detach_all(self.qc, self.frontend) - # FIXME: usb-export script may update qubesdb with 1sec delay - time.sleep(2) - - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertIsNone(usb_list[self.usbdev_name]['connected-to']) - - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") - - def test_050_list_attached(self): - """ Attached device should not be listed as further attachable """ - self.frontend.start() - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - - usb_list_front_pre = qubes.qubesutils.usb_list(self.qc, - vm=self.frontend) - - try: - qubes.qubesutils.usb_attach(self.qc, - self.frontend, - usb_list[self.usbdev_name]) - except qubes.qubesutils.USBProxyNotInstalled as e: - self.skipTest(str(e)) - - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") - - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertEqual(usb_list[self.usbdev_name]['connected-to'], - self.frontend) - - usb_list_front_post = qubes.qubesutils.usb_list(self.qc, - vm=self.frontend) - - self.assertEqual(usb_list_front_pre, usb_list_front_post) - - def test_060_auto_detach_on_remove(self): - self.frontend.start() - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - try: - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) - except qubes.qubesutils.USBProxyNotInstalled as e: - self.skipTest(str(e)) - - remove_usb_gadget(self.backend) - # FIXME: usb-export script may update qubesdb with 1sec delay - time.sleep(1) - - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - self.assertNotIn(self.usbdev_name, usb_list) - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") - - def test_070_attach_not_installed_front(self): - self.frontend.start() - # simulate package not installed - retcode = self.frontend.run("rm -f /etc/qubes-rpc/qubes.USBAttach", - user="root", wait=True) - if retcode != 0: - raise RuntimeError("Failed to simulate not installed package") - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - with self.assertRaises(qubes.qubesutils.USBProxyNotInstalled): - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) - - @unittest.expectedFailure - def test_075_attach_not_installed_back(self): - self.frontend.start() - # simulate package not installed - retcode = self.backend.run("rm -f /etc/qubes-rpc/qubes.USB", - user="root", wait=True) - if retcode != 0: - raise RuntimeError("Failed to simulate not installed package") - usb_list = qubes.qubesutils.usb_list(self.qc, vm=self.backend) - try: - qubes.qubesutils.usb_attach(self.qc, self.frontend, - usb_list[self.usbdev_name]) - except qubes.qubesutils.USBProxyNotInstalled: - pass - except Exception as e: - self.fail( - 'Wrong exception raised (expected USBProxyNotInstalled): ' - '{!r}'.format(e)) - else: - self.fail('USBProxyNotInstalled not raised') - - class TC_20_USBProxy_core3(qubes.tests.extra.ExtraTestCase): # noinspection PyAttributeOutsideInit def setUp(self): - super(TC_20_USBProxy_core3, self).setUp() - vms = self.create_vms(["backend", "frontend"]) - (self.backend, self.frontend) = vms - self.qrexec_policy('qubes.USB', self.frontend.name, self.backend.name) + super().setUp() + self.backend, self.frontend = self.create_vms(["backend", "frontend"]) + self.qrexec_policy("qubes.USB", self.frontend.name, self.backend.name) self.usbdev_ident = create_usb_gadget(self.backend).decode() - self.usbdev_name = '{}:{}'.format(self.backend.name, self.usbdev_ident) + self.usbdev_name = ( + f"{self.backend.name}:{self.usbdev_ident}" + ":1234:1234:0123456789:u080650" + ) def tearDown(self): # remove vms in this specific order, otherwise there may remain stray # dependency between them (so, objects leaks) self.remove_vms((self.frontend, self.backend)) - super(TC_20_USBProxy_core3, self).tearDown() + super().tearDown() def test_000_list(self): - usb_list = self.backend.devices['usb'] + usb_list = self.backend.devices["usb"] self.assertIn(self.usbdev_name, [str(dev) for dev in usb_list]) def test_010_assign(self): - usb_dev = self.backend.devices['usb'][self.usbdev_ident] + usb_dev = self.backend.devices["usb"][self.usbdev_ident] + ass = make_assignment(self.backend, self.usbdev_ident, auto_attach=True) + assign(self, self.frontend.devices["usb"], ass) + self.assertIsNone(usb_dev.attachment) + try: + self.frontend.start() + except qubesusbproxy.core3ext.USBProxyNotInstalled as e: + self.skipTest(str(e)) + + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) + + self.assertEqual(usb_dev.attachment, self.frontend) + + @unittest.mock.patch("qubes.ext.utils.confirm_device_attachment") + @unittest.skipIf(LEGACY, "new feature") + def test_011_assign_ask(self, confirm): + confirm.return_value = self.frontend.name + usb_dev = self.backend.devices["usb"][self.usbdev_ident] ass = DeviceAssignment( - self.backend, self.usbdev_ident, **AUTO_ATTACH) - assign(self, self.frontend.devices['usb'], ass) + VirtualDevice(Port(self.backend, self.usbdev_ident, "usb")), + mode="ask-to-attach", + ) + assign(self, self.frontend.devices["usb"], ass) self.assertIsNone(usb_dev.attachment) try: self.frontend.start() except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) self.assertEqual(usb_dev.attachment, self.frontend) def test_020_attach(self): self.frontend.start() - usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = DeviceAssignment(self.backend, self.usbdev_ident) + usb_dev = self.backend.devices["usb"][self.usbdev_ident] + ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) self.assertEqual(usb_dev.attachment, self.frontend) def test_030_detach(self): self.frontend.start() - usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = DeviceAssignment(self.backend, self.usbdev_ident) + usb_dev = self.backend.devices["usb"][self.usbdev_ident] + ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.loop.run_until_complete( - self.frontend.devices['usb'].detach(ass)) + self.loop.run_until_complete(self.frontend.devices["usb"].detach(ass)) # FIXME: usb-export script may update qubesdb with 1sec delay self.loop.run_until_complete(asyncio.sleep(2)) self.assertIsNone(usb_dev.attachment) - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + self.assertNotEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device disconnection failed", + ) def test_040_unassign(self): - usb_dev = self.backend.devices['usb'][self.usbdev_ident] - ass = DeviceAssignment( - self.backend, self.usbdev_ident, **AUTO_ATTACH) - assign(self, self.frontend.devices['usb'], ass) + usb_dev = self.backend.devices["usb"][self.usbdev_ident] + ass = make_assignment(self.backend, self.usbdev_ident, auto_attach=True) + assign(self, self.frontend.devices["usb"], ass) self.assertIsNone(usb_dev.attachment) - unassign(self, self.frontend.devices['usb'], ass) + unassign(self, self.frontend.devices["usb"], ass) self.assertIsNone(usb_dev.attachment) def test_050_list_attached(self): - """ Attached device should not be listed as further attachable """ + """Attached device should not be listed as further attachable""" self.frontend.start() - usb_list = self.backend.devices['usb'] + usb_list = self.backend.devices["usb"] - usb_list_front_pre = list(self.frontend.devices['usb']) - ass = DeviceAssignment(self.backend, self.usbdev_ident) + usb_list_front_pre = list(self.frontend.devices["usb"]) + ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) self.assertEqual(usb_list[self.usbdev_ident].attachment, self.frontend) - usb_list_front_post = list(self.frontend.devices['usb']) + usb_list_front_post = list(self.frontend.devices["usb"]) self.assertEqual(usb_list_front_pre, usb_list_front_post) def test_060_auto_detach_on_remove(self): self.frontend.start() - usb_list = self.backend.devices['usb'] - ass = DeviceAssignment(self.backend, self.usbdev_ident) + usb_list = self.backend.devices["usb"] + ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -504,17 +428,48 @@ def test_060_auto_detach_on_remove(self): self.loop.run_until_complete(asyncio.sleep(2)) self.assertNotIn(self.usbdev_name, [str(dev) for dev in usb_list]) - self.assertNotEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device disconnection failed") + self.assertNotEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device disconnection failed", + ) def test_061_auto_attach_on_reconnect(self): self.frontend.start() - usb_list = self.backend.devices['usb'] - ass = DeviceAssignment( - self.backend, self.usbdev_ident, **AUTO_ATTACH) + usb_list = self.backend.devices["usb"] + ass = make_assignment(self.backend, self.usbdev_ident, auto_attach=True) + try: + assign(self, self.frontend.devices["usb"], ass) + except qubesusbproxy.core3ext.USBProxyNotInstalled as e: + self.skipTest(str(e)) + + remove_usb_gadget(self.backend) + # FIXME: usb-export script may update qubesdb with 1sec delay + timeout = 5 + while self.usbdev_name in (str(dev) for dev in usb_list): + self.loop.run_until_complete(asyncio.sleep(1)) + timeout -= 1 + self.assertGreater(timeout, 0, "timeout on device remove") + + recreate_usb_gadget(self.backend) + timeout = 5 + while self.usbdev_name not in (str(dev) for dev in usb_list): + self.loop.run_until_complete(asyncio.sleep(1)) + timeout -= 1 + self.assertGreater(timeout, 0, "timeout on device create") + self.loop.run_until_complete(asyncio.sleep(5)) + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device reconnection failed", + ) + + def test_062_ask_to_attach_on_start(self): + self.frontend.start() + usb_list = self.backend.devices["usb"] + ass = make_assignment(self.backend, self.usbdev_ident, auto_attach=True) try: - assign(self, self.frontend.devices['usb'], ass) + assign(self, self.frontend.devices["usb"], ass) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) @@ -524,82 +479,525 @@ def test_061_auto_attach_on_reconnect(self): while self.usbdev_name in (str(dev) for dev in usb_list): self.loop.run_until_complete(asyncio.sleep(1)) timeout -= 1 - self.assertGreater(timeout, 0, 'timeout on device remove') + self.assertGreater(timeout, 0, "timeout on device remove") recreate_usb_gadget(self.backend) timeout = 5 while self.usbdev_name not in (str(dev) for dev in usb_list): self.loop.run_until_complete(asyncio.sleep(1)) timeout -= 1 - self.assertGreater(timeout, 0, 'timeout on device create') + self.assertGreater(timeout, 0, "timeout on device create") self.loop.run_until_complete(asyncio.sleep(5)) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device reconnection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device reconnection failed", + ) def test_070_attach_not_installed_front(self): self.frontend.start() # simulate package not installed - retcode = self.frontend.run("rm -f /etc/qubes-rpc/qubes.USBAttach", - user="root", wait=True) + retcode = self.frontend.run( + "rm -f /etc/qubes-rpc/qubes.USBAttach", user="root", wait=True + ) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") - ass = DeviceAssignment(self.backend, self.usbdev_ident) + ass = make_assignment(self.backend, self.usbdev_ident) with self.assertRaises(qubesusbproxy.core3ext.USBProxyNotInstalled): self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) @unittest.expectedFailure def test_075_attach_not_installed_back(self): self.frontend.start() # simulate package not installed - retcode = self.backend.run("rm -f /etc/qubes-rpc/qubes.USB", - user="root", wait=True) + retcode = self.backend.run( + "rm -f /etc/qubes-rpc/qubes.USB", user="root", wait=True + ) if retcode != 0: raise RuntimeError("Failed to simulate not installed package") - ass = DeviceAssignment(self.backend, self.usbdev_ident) + ass = make_assignment(self.backend, self.usbdev_ident) try: with self.assertRaises(qubesusbproxy.core3ext.USBProxyNotInstalled): self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.QubesUSBException as e: - self.fail('Generic exception raise instead of specific ' - 'USBProxyNotInstalled: ' + str(e)) + self.fail( + "Generic exception raise instead of specific " + "USBProxyNotInstalled: " + str(e) + ) def test_080_attach_existing_policy(self): self.frontend.start() # this override policy file, but during normal execution it shouldn't # exist, so should be ok, especially on a testing system with open( - '/etc/qubes-rpc/policy/qubes.USB+{}'.format(self.usbdev_ident), - 'w+') as policy_file: - policy_file.write('# empty policy\n') - ass = DeviceAssignment(self.backend, self.usbdev_ident) - self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + f"/etc/qubes-rpc/policy/qubes.USB+{self.usbdev_ident}", "w+" + ) as policy_file: + policy_file.write("# empty policy\n") + ass = make_assignment(self.backend, self.usbdev_ident) + self.loop.run_until_complete(self.frontend.devices["usb"].attach(ass)) @unittest.skipIf(is_r40, "Not supported on R4.0") def test_090_attach_stubdom(self): - self.frontend.virt_mode = 'hvm' - self.frontend.features['stubdom-qrexec'] = True + self.frontend.virt_mode = "hvm" + self.frontend.features["stubdom-qrexec"] = True self.frontend.start() - ass = DeviceAssignment(self.backend, self.usbdev_ident) + ass = make_assignment(self.backend, self.usbdev_ident) try: self.loop.run_until_complete( - self.frontend.devices['usb'].attach(ass)) + self.frontend.devices["usb"].attach(ass) + ) except qubesusbproxy.core3ext.USBProxyNotInstalled as e: self.skipTest(str(e)) time.sleep(5) - self.assertEqual(self.frontend.run('lsusb -d 1234:1234', - wait=True), 0, - "Device connection failed") + self.assertEqual( + self.frontend.run("lsusb -d 1234:1234", wait=True), + 0, + "Device connection failed", + ) + + +class TestQubesDB: + def __init__(self, data): + self._data = data + + def read(self, key): + return self._data.get(key, None) + + def list(self, prefix): + return [key for key in self._data if key.startswith(prefix)] + + +class TestApp: + class Domains(dict): + def __iter__(self): + return iter(self.values()) + + def __init__(self): + #: jinja2 environment for libvirt XML templates + self.env = jinja2.Environment( + loader=jinja2.FileSystemLoader( + [ + "templates", + "/etc/qubes/templates", + "/usr/share/qubes/templates", + ] + ), + undefined=jinja2.StrictUndefined, + autoescape=True, + ) + self.domains = TestApp.Domains() + self.vmm = mock.Mock() + + +class TestDeviceCollection: + def __init__(self, backend_vm, devclass): + self._exposed = [] + self._assigned = [] + self.backend_vm = backend_vm + self.devclass = devclass + + def get_assigned_devices(self): + return self._assigned + + def get_exposed_devices(self): + yield from self._exposed + + __iter__ = get_exposed_devices + + def __getitem__(self, port_id): + for dev in self._exposed: + if dev.port_id == port_id: + return dev + raise KeyError() + + +class TestVM(qubes.tests.TestEmitter): + def __init__(self, qdb, running=True, name="test-vm", **kwargs): + super().__init__(**kwargs) + self.name = name + self.klass = "AdminVM" if name == "dom0" else "AppVM" + self.icon = "red" + self.untrusted_qdb = TestQubesDB(qdb) + self.libvirt_domain = mock.Mock() + self.features = mock.Mock() + self.features.check_with_template.side_effect = lambda name, default: ( + "4.2" if name == "qubes-agent-version" else None + ) + self.is_running = lambda: running + self.log = mock.Mock() + self.app = TestApp() + self.devices = {"testclass": TestDeviceCollection(self, "testclass")} + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + if isinstance(other, TestVM): + return self.name == other.name + return False + + def __str__(self): + return self.name + + +def get_qdb(attachment=None): + result = { + "/qubes-usb-devices/1-1/desc": b"1a0a:badd USB-IF Test\x20Device", + "/qubes-usb-devices/1-1/interfaces": b":ffff00:020600:0a0000:", + "/qubes-usb-devices/1-1/usb-ver": b"2", + "/qubes-usb-devices/1-2/desc": b"1a0a:badd USB-IF Test\x20Device\x202", + "/qubes-usb-devices/1-2/interfaces": b":0acafe:", + "/qubes-usb-devices/1-2/usb-ver": b"3", + } + if attachment: + result["/qubes-usb-devices/1-1/connected-to"] = attachment.encode() + return result + + +class TC_30_USBProxy_core3(qubes.tests.QubesTestCase): + # noinspection PyAttributeOutsideInit + def setUp(self): + super().setUp() + self.ext = qubesusbproxy.core3ext.USBDeviceExtension() + + @staticmethod + def added_assign_setup(attachment=None): + back_vm = TestVM(qdb=get_qdb(attachment), name="sys-usb") + front = TestVM({}, name="front-vm") + dom0 = TestVM({}, name="dom0") + back_vm.app.domains["sys-usb"] = back_vm + back_vm.app.domains["front-vm"] = front + back_vm.app.domains[0] = dom0 + back_vm.app.domains["dom0"] = dom0 + front.app = back_vm.app + dom0.app = back_vm.app + + back_vm.app.vmm.configure_mock(**{"offline_mode": False}) + fire_event_async = mock.Mock() + front.fire_event_async = fire_event_async + + back_vm.devices["usb"] = TestDeviceCollection( + backend_vm=back_vm, devclass="usb" + ) + front.devices["usb"] = TestDeviceCollection( + backend_vm=front, devclass="usb" + ) + dom0.devices["usb"] = TestDeviceCollection( + backend_vm=dom0, devclass="usb" + ) + + return back_vm, front + + def test_010_on_qdb_change_multiple_assignments_including_full(self): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + full_assig = DeviceAssignment( + VirtualDevice(exp_dev.port, exp_dev.device_id), + mode="auto-attach", + options={"pid": "did"}, + ) + port_assign = DeviceAssignment( + VirtualDevice(exp_dev.port, "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + front.devices["usb"]._assigned.append(full_assig) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + ) + + self.ext.attach_and_notify = AsyncMock() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.ext.on_domain_start(front, None)) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"pid": "did"} + ) + + def test_011_on_qdb_change_multiple_assignments_port_vs_dev(self): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + port_assign = DeviceAssignment( + VirtualDevice(exp_dev.port, "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + ) + + self.ext.attach_and_notify = AsyncMock() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.ext.on_domain_start(front, None)) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"pid": "any"} + ) + + def test_012_on_qdb_change_multiple_assignments_dev(self): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + port_assign = DeviceAssignment( + VirtualDevice(Port(exp_dev.backend_domain, "1-2", "usb"), "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + ) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(Port(back, "1-2", "usb")) + ) + + self.ext.attach_and_notify = AsyncMock() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.ext.on_domain_start(front, None)) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"any": "did"} + ) + + def test_013_on_qdb_change_two_fronts(self): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + assmnt = DeviceAssignment(exp_dev, mode="auto-attach") + + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) + + resolver_path = "qubes.ext.utils.resolve_conflicts_and_attach" + with mock.patch(resolver_path, new_callable=Mock) as resolver: + with mock.patch("asyncio.ensure_future"): + self.ext.on_qdb_change(back, None, None) + resolver.assert_called_once_with( + self.ext, {"1-1": {front: assmnt, back: assmnt}} + ) + + # call_socket_service returns coroutine + @unittest.mock.patch( + 'qubes.ext.utils.call_socket_service', new_callable=AsyncMock) + def test_014_failed_confirmation(self, socket): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + assmnt = DeviceAssignment(exp_dev, mode="auto-attach") + + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) + + socket.return_value = "allow:nonsense" + + loop = asyncio.get_event_loop() + self.ext.attach_and_notify = AsyncMock() + loop.run_until_complete( + qubes.ext.utils.resolve_conflicts_and_attach( + self.ext, {"1-1": {front: assmnt, back: assmnt}} + ) + ) + self.ext.attach_and_notify.assert_not_called() + + # call_socket_service returns coroutine + @unittest.mock.patch( + 'qubes.ext.utils.call_socket_service', new_callable=AsyncMock) + def test_015_successful_confirmation(self, socket): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + assmnt = DeviceAssignment(exp_dev, mode="auto-attach") + + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) + + socket.return_value = "allow:front-vm" + + loop = asyncio.get_event_loop() + self.ext.attach_and_notify = AsyncMock() + loop.run_until_complete( + qubes.ext.utils.resolve_conflicts_and_attach( + self.ext, {"1-1": {front: assmnt, back: assmnt}} + ) + ) + self.ext.attach_and_notify.assert_called_once_with(front, assmnt) + + def test_016_on_qdb_change_ask(self): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + assmnt = DeviceAssignment(exp_dev, mode="ask-to-attach") + + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) + + resolver_path = "qubes.ext.utils.resolve_conflicts_and_attach" + with mock.patch(resolver_path, new_callable=Mock) as resolver: + with mock.patch("asyncio.ensure_future"): + self.ext.on_qdb_change(back, None, None) + resolver.assert_called_once_with(self.ext, {"1-1": {front: assmnt}}) + + def test_020_on_startup_multiple_assignments_including_full(self): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + full_assig = DeviceAssignment( + VirtualDevice(exp_dev.port, exp_dev.device_id), + mode="auto-attach", + options={"pid": "did"}, + ) + port_assign = DeviceAssignment( + VirtualDevice(exp_dev.port, "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + front.devices["usb"]._assigned.append(full_assig) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + ) + + self.ext.attach_and_notify = AsyncMock() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.ext.on_domain_start(front, None)) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"pid": "did"} + ) + + def test_021_on_startup_multiple_assignments_port_vs_dev(self): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + port_assign = DeviceAssignment( + VirtualDevice(exp_dev.port, "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + ) + + loop = asyncio.get_event_loop() + self.ext.attach_and_notify = AsyncMock() + loop.run_until_complete(self.ext.on_domain_start(front, None)) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"pid": "any"} + ) + + def test_022_on_startup_multiple_assignments_dev(self): + back, front = self.added_assign_setup() + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + port_assign = DeviceAssignment( + VirtualDevice(Port(exp_dev.backend_domain, "1-2", "usb"), "*"), + mode="auto-attach", + options={"pid": "any"}, + ) + dev_assign = DeviceAssignment( + VirtualDevice( + Port(exp_dev.backend_domain, "*", "usb"), exp_dev.device_id + ), + mode="auto-attach", + options={"any": "did"}, + ) + + front.devices["usb"]._assigned.append(dev_assign) + front.devices["usb"]._assigned.append(port_assign) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + ) + back.devices["usb"]._exposed.append( + qubesusbproxy.core3ext.USBDevice(Port(back, "1-2", "usb")) + ) + + self.ext.attach_and_notify = AsyncMock() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.ext.on_domain_start(front, None)) + self.assertEqual( + self.ext.attach_and_notify.call_args[0][1].options, {"any": "did"} + ) + + def test_023_on_startup_already_attached(self): + back, front = self.added_assign_setup(attachment="sys-usb") + + exp_dev = qubesusbproxy.core3ext.USBDevice(Port(back, "1-1", "usb")) + assmnt = DeviceAssignment( + VirtualDevice(exp_dev.port, exp_dev.device_id), mode="auto-attach" + ) + + front.devices["usb"]._assigned.append(assmnt) + back.devices["usb"]._exposed.append(exp_dev) + + self.ext.attach_and_notify = AsyncMock() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.ext.on_domain_start(front, None)) + self.ext.attach_and_notify.assert_not_called() def list_tests(): tests = [TC_00_USBProxy] - if core2: - tests += [TC_10_USBProxy_core2] if core3: tests += [TC_20_USBProxy_core3] return tests + + +def list_unit_tests(): + tests = [] + if core3: + tests += [TC_30_USBProxy_core3] + return tests diff --git a/qubesusbproxy/utils.py b/qubesusbproxy/utils.py index da49b4f..03a701c 100644 --- a/qubesusbproxy/utils.py +++ b/qubesusbproxy/utils.py @@ -2,7 +2,7 @@ # # The Qubes OS Project, https://www.qubes-os.org # -# Copyright (C) 2023 Piotr Bartman-Szwarc +# Copyright (C) 2024 Piotr Bartman-Szwarc # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -19,60 +19,118 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. import asyncio +import sys import qubes +from typing import Type, Dict, Any + +from qubes import device_protocol +from qubes.device_protocol import VirtualDevice + +from qrexec.server import call_socket_service + +SOCKET_PATH = "/var/run/qubes" + def device_list_change( - ext: qubes.ext.Extension, current_devices, - vm, path, device_class + ext: qubes.ext.Extension, + current_devices, + vm, + path, + device_class: Type[qubes.device_protocol.DeviceInfo], ): - devclass = device_class.__name__[:-len('Device')].lower() + devclass = device_class.__name__[: -len("Device")].lower() if path is not None: - vm.fire_event(f'device-list-change:{devclass}') + vm.fire_event(f"device-list-change:{devclass}") - added, attached, detached, removed = ( - compare_device_cache(vm, ext.devices_cache, current_devices)) + added, attached, detached, removed = compare_device_cache( + vm, ext.devices_cache, current_devices + ) # send events about devices detached/attached outside by themselves - for dev_id, front_vm in detached.items(): - dev = device_class(vm, dev_id) - asyncio.ensure_future(front_vm.fire_event_async( - f'device-detach:{devclass}', device=dev)) - for dev_id in removed: - device = device_class(vm, dev_id) - vm.fire_event(f'device-removed:{devclass}', device=device) - for dev_id in added: - device = device_class(vm, dev_id) - vm.fire_event(f'device-added:{devclass}', device=device) - for dev_ident, front_vm in attached.items(): - dev = device_class(vm, dev_ident) + for port_id, front_vm in detached.items(): + dev = device_class(vm, port_id) + ext.ensure_detach(front_vm, dev.port) + asyncio.ensure_future( + front_vm.fire_event_async( + f"device-detach:{devclass}", port=dev.port + ) + ) + for port_id in removed: + device = device_class(vm, port_id) + vm.fire_event(f"device-removed:{devclass}", port=device.port) + for port_id in added: + device = device_class(vm, port_id) + vm.fire_event(f"device-added:{devclass}", device=device) + for port_id, front_vm in attached.items(): + dev = device_class(vm, port_id) # options are unknown, device already attached - asyncio.ensure_future(front_vm.fire_event_async( - f'device-attach:{devclass}', device=dev, options={})) + asyncio.ensure_future( + front_vm.fire_event_async( + f"device-attach:{devclass}", device=dev, options={} + ) + ) ext.devices_cache[vm.name] = current_devices + to_attach: Dict[str, Dict] = {} for front_vm in vm.app.domains: if not front_vm.is_running(): continue - for assignment in front_vm.devices[devclass].assignments( - persistent=True): - if (assignment.backend_domain == vm - and assignment.ident in added - and assignment.ident not in attached - ): - asyncio.ensure_future(ext.attach_and_notify( - front_vm, assignment.device, assignment.options)) + for assignment in reversed( + sorted(front_vm.devices[devclass].get_assigned_devices()) + ): + for device in assignment.devices: + if ( + assignment.matches(device) + and device.port_id in added + and device.port_id not in attached + ): + frontends = to_attach.get(device.port_id, {}) + # make it unique + ass = assignment.clone( + device=VirtualDevice(device.port, device.device_id) + ) + curr = frontends.get(front_vm, None) + if curr is None or curr < ass: + # chose the most specific assignment + frontends[front_vm] = ass + to_attach[device.port_id] = frontends + + asyncio.ensure_future(resolve_conflicts_and_attach(ext, to_attach)) + + +async def resolve_conflicts_and_attach(ext, to_attach): + for _, frontends in to_attach.items(): + if len(frontends) > 1: + # unique + device = tuple(frontends.values())[0].device + target_name = await confirm_device_attachment(device, frontends) + for front in frontends: + if front.name == target_name: + target = front + assignment = frontends[front] + # already asked + if assignment.mode.value == "ask-to-attach": + assignment.mode = device_protocol.AssignmentMode.AUTO + break + else: + return + else: + target = tuple(frontends.keys())[0] + assignment = frontends[target] + + await ext.attach_and_notify(target, assignment) def compare_device_cache(vm, devices_cache, current_devices): # compare cached devices and current devices, collect: - # - newly appeared devices (ident) - # - devices attached from a vm to frontend vm (ident: frontend_vm) - # - devices detached from frontend vm (ident: frontend_vm) - # - disappeared devices, e.g., plugged out (ident) + # - newly appeared devices (port_id) + # - devices attached from a vm to frontend vm (port_id: frontend_vm) + # - devices detached from frontend vm (port_id: frontend_vm) + # - disappeared devices, e.g., plugged out (port_id) added = set() attached = {} detached = {} @@ -101,3 +159,56 @@ def compare_device_cache(vm, devices_cache, current_devices): if cached_front is not None: detached[dev_id] = cached_front return added, attached, detached, removed + + +async def confirm_device_attachment(device, frontends) -> str: + try: + return await _do_confirm_device_attachment(device, frontends) + except Exception as exc: + print(str(exc.__class__.__name__) + ":", str(exc), file=sys.stderr) + return "" + + +async def _do_confirm_device_attachment(device, frontends): + socket = "device-agent.GUI" + + app = tuple(frontends.keys())[0].app + doms = app.domains + + front_names = [f.name for f in frontends.keys()] + + try: + guivm = doms["dom0"].guivm.name + except AttributeError: + guivm = "dom0" + + number_of_targets = len(front_names) + + params = { + "source": device.backend_domain.name, + "device_name": device.description, + "argument": device.port_id, + "targets": front_names, + "default_target": front_names[0] if number_of_targets == 1 else "", + "icons": { + ( + dom.name if dom.klass != "DispVM" else f"@dispvm:{dom.name}" + ): dom.icon + for dom in doms.values() + }, + } + + socked_call = asyncio.create_task( + call_socket_service(guivm, socket, "dom0", params, SOCKET_PATH) + ) + + while not socked_call.done(): + await asyncio.sleep(0.1) + + ask_response = await socked_call + + if ask_response.startswith("allow:"): + chosen = ask_response[len("allow:") :] + if chosen in front_names: + return chosen + return "" diff --git a/setup.py b/setup.py index e6dff41..9dd5710 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,8 @@ entry_points={ 'qubes.tests.extra.for_template': 'usbproxy = qubesusbproxy.tests:list_tests', + 'qubes.tests.extra': + 'usbproxy = qubesusbproxy.tests:list_unit_tests', 'qubes.ext': 'usbproxy = qubesusbproxy.core3ext:USBDeviceExtension', 'qubes.devices':