From 7eb1390c2dd0f9af192b8eab47996536863f56f6 Mon Sep 17 00:00:00 2001 From: Ganden Schaffner Date: Wed, 13 Mar 2024 00:00:00 -0700 Subject: [PATCH] Avoid busy-waiting in SimVisaLibrary.read --- pyproject.toml | 2 +- pyvisa_sim/devices.py | 69 ++++++++++++++++++++++++++-------- pyvisa_sim/sessions/session.py | 8 +--- 3 files changed, 56 insertions(+), 23 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 515312c..215a6f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "PyVISA-sim" description = "Simulated backend for PyVISA implementing TCPIP, GPIB, RS232, and USB resources" readme = "README.rst" -requires-python = ">=3.7" +requires-python = ">=3.8" license = {file = "LICENSE.txt"} authors = [ {name = "Hernan E. Grecco", email = "hernan.grecco@gmail.com"}, diff --git a/pyvisa_sim/devices.py b/pyvisa_sim/devices.py index 196b3bb..6d69140 100644 --- a/pyvisa_sim/devices.py +++ b/pyvisa_sim/devices.py @@ -6,7 +6,8 @@ """ -from typing import Deque, Dict, List, Optional, Tuple, Union +from threading import Lock +from typing import Deque, Dict, Final, List, Optional, Tuple, Union from pyvisa import constants, rname @@ -56,6 +57,51 @@ def clear(self) -> None: _value: int +class OutputQueue: + """Store output in a FIFO queue.""" + + # N.B.: We don't use a `deque[tuple[bytes, bool]]` or `SimpleQueue[tuple[bytes, + # bool]]` for this as + # * It would require splitting each bytearray message and pushing (length-1 bytes + # object, bool) to the deque for each byte in the message. + # * We would still need a separate lock, as they do not have thread-safe `.extend` + # methods. + + def __init__(self) -> None: + self._message_buffers: Final[Deque[bytearray]] = Deque() + self._lock: Final = Lock() + self._readable = False + + def append_message(self, message: bytearray) -> None: + """Append a message to the output queue.""" + with self._lock: + self._message_buffers.append(message) + self._readable = True + + def read(self, timeout: float) -> Tuple[bytes, bool]: + """ + Return a single byte from the output queue and whether it is accompanied by an + END indicator. + """ + if not self._lock.acquire(timeout=timeout): + return b"", False + try: + if not self._readable: + return b"", False + + current_message_buffer = self._message_buffers[0] + b = int_to_byte(current_message_buffer.pop(0)) + if current_message_buffer: + return b, False + else: + self._message_buffers.popleft() + if not self._message_buffers: + self._readable = False + return b, True + finally: + self._lock.release() + + class ErrorQueue: """Store error messages in a FIFO queue. @@ -134,7 +180,7 @@ def __init__(self, name: str, delimiter: bytes) -> None: self._status_registers = {} self._error_map = {} self._eoms = {} - self._output_buffers = Deque() + self._output_queue = OutputQueue() self._input_buffer = bytearray() self._error_queues = {} @@ -251,26 +297,17 @@ def write(self, data: bytes) -> None: assert response is not None if response is not NoResponse: - self._output_buffers.append(bytearray(response) + eom) + self._output_queue.append_message(bytearray(response) + eom) finally: self._input_buffer = bytearray() - def read(self) -> Tuple[bytes, bool]: + def read(self, timeout: float) -> Tuple[bytes, bool]: """ Return a single byte from the output buffer and whether it is accompanied by an END indicator. """ - if not self._output_buffers: - return b"", False - - output_buffer = self._output_buffers[0] - b = int_to_byte(output_buffer.pop(0)) - if output_buffer: - return b, False - else: - self._output_buffers.popleft() - return b, True + return self._output_queue.read(timeout=timeout) # --- Private API @@ -300,8 +337,8 @@ def read(self) -> Tuple[bytes, bool]: #: TYPE CLASS -> (query termination, response termination) _eoms: Dict[Tuple[constants.InterfaceType, str], Tuple[bytes, bytes]] - #: Deque of buffers in which the user can read - _output_buffers: Deque[bytearray] + #: Queue in which the user can read + _output_buffer: OutputQueue #: Buffer in which the user can write _input_buffer: bytearray diff --git a/pyvisa_sim/sessions/session.py b/pyvisa_sim/sessions/session.py index 4d9960d..3bb3894 100644 --- a/pyvisa_sim/sessions/session.py +++ b/pyvisa_sim/sessions/session.py @@ -236,8 +236,8 @@ def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: out = bytearray() - while time.monotonic() - start <= timeout: - last, end_indicator = self.device.read() + while (timeout_remaining := (timeout - (time.monotonic() - start))) >= 0: + last, end_indicator = self.device.read(timeout=timeout_remaining) out += last @@ -270,10 +270,6 @@ def read(self, count: int) -> Tuple[bytes, constants.StatusCode]: elif len(out) == count: # Rule 6.1.3. return out, constants.StatusCode.success_max_count_read - - # Busy-wait only if the device's output buffer was empty. - if not last: - time.sleep(0.01) else: return out, constants.StatusCode.error_timeout