From 51bdbbd9fda34fab39751529488e26b0f067305f Mon Sep 17 00:00:00 2001 From: Jeremy Rimpo Date: Sun, 18 Feb 2024 00:19:53 -0600 Subject: [PATCH] Overlay: Error checking and display efficiency --- src/bio_scan/overlay.py | 92 ++++++++++++++++++++++++----------------- src/load.py | 10 +++-- 2 files changed, 59 insertions(+), 43 deletions(-) diff --git a/src/bio_scan/overlay.py b/src/bio_scan/overlay.py index 7a05412..0f43521 100644 --- a/src/bio_scan/overlay.py +++ b/src/bio_scan/overlay.py @@ -18,6 +18,7 @@ def setInterval(interval: float) -> Callable: :param interval: Interval in seconds """ + def decorator(function: Callable) -> Callable: def wrapper(*args, **kwargs) -> threading.Event: stopped = threading.Event() @@ -30,7 +31,9 @@ def loop() -> None: # executed in another thread t.daemon = True # stop if the program exits t.start() return stopped + return wrapper + return decorator @@ -89,28 +92,31 @@ def display(self, message_id: str, text: str, x: int = 0, y: int = 0, color: str :param size: Accepts "normal" and "large" :param scrolled: Toggle for scrolled text panel :param limit: Line limit for display; 0 is no limit + :param delay: Time between up/down scroll of scrolled text """ - self.clear(message_id) formatted_text = text.replace('🗸', '√').replace('\N{memo}', '♦').split("\n") + if not scrolled: + self.clear(message_id, False) + elif message_id in self._text_blocks and len(formatted_text) < len(self._text_blocks[message_id].text): + self.clear(message_id, len(formatted_text), False) self._text_blocks[message_id] = TextBlock( text=formatted_text, x=x, y=y, size=size, color=color, scrolled=scrolled, limit=limit, delay=delay ) - try: + if not scrolled: self.draw(message_id) - except Exception as err: - logger.debug(err) - def clear(self, message_id: str, remove: bool = True) -> None: + def clear(self, message_id: str, from_line: int = 0, remove: bool = True) -> None: """ Clears a given text block identified by a unique message ID. :param message_id: Unique ID of text to clear. + :param from_line: Start point of clear :param remove: Remove the message from the cache. """ try: if message_id in self._text_blocks: - count = 0 + count = from_line while (count < len(self._text_blocks[message_id].text) or (self._text_blocks[message_id].limit > 0 and count < self._text_blocks[message_id].limit)): self._overlay.send_message("{}_{}".format(message_id, count), @@ -118,8 +124,8 @@ def clear(self, message_id: str, remove: bool = True) -> None: count += 1 if remove: self._text_blocks.pop(message_id, None) - except Exception as err: - logger.debug(err) + except Exception as ex: + logger.debug("Exception during overlay clear", exc_info=ex) @setInterval(30) def redraw(self): @@ -129,7 +135,7 @@ def redraw(self): """ if self.available(): - for message_id, message_info in self._text_blocks.items(): + for message_id, message_info in self._text_blocks.copy().items(): if message_info.scrolled: continue self.draw(message_id) @@ -142,26 +148,31 @@ def scroll(self): """ if self.available(): - for message_id, message_info in self._text_blocks.items(): - if not message_info.scrolled: - continue - - if not message_info.timer.is_alive(): - self.draw(message_id) - if message_info.limit != 0 and message_info.limit < len(message_info.text): - offset = message_info.offset + 1 if message_info.direction == "down" else \ - len(message_info.text) - message_info.offset - display = offset + message_info.limit if message_info.direction == "down" else offset - if display >= len(message_info.text): - self._text_blocks[message_id].direction = "up" if message_info.direction == "down" \ - else "down" - self._text_blocks[message_id].timer = threading.Timer(message_info.delay, - lambda *args: None) - self._text_blocks[message_id].timer.start() - if self._text_blocks[message_id].direction == "down": - self._text_blocks[message_id].offset += 1 - else: - self._text_blocks[message_id].offset -= 1 + try: + for message_id, message_info in self._text_blocks.copy().items(): + if not message_info.scrolled: + continue + + if not message_info.timer.is_alive(): + self.draw(message_id) + if message_info.limit != 0 and message_info.limit < len(message_info.text): + offset = message_info.offset + 1 if message_info.direction == "down" else \ + len(message_info.text) - message_info.offset + display = offset + message_info.limit if message_info.direction == "down" else offset + if display >= len(message_info.text): + self._text_blocks[message_id].direction = "up" if message_info.direction == "down" \ + else "down" + self._text_blocks[message_id].timer = threading.Timer(message_info.delay, + lambda *args: None) + self._text_blocks[message_id].timer.start() + if self._text_blocks[message_id].direction == "down": + self._text_blocks[message_id].offset += 1 + else: + self._text_blocks[message_id].offset -= 1 + elif message_info.limit != 0 and message_info.limit > len(message_info.text): + self.draw(message_id) + except Exception as ex: + logger.debug("Exception during scroll repaint", exc_info=ex) def draw(self, message_id: str): if message_id in self._text_blocks: @@ -170,9 +181,15 @@ def draw(self, message_id: str): line_count = 0 spacer = 14 if block.size == "normal" else 24 while (block.limit == 0 or count - block.offset < block.limit) and count < len(block.text): - self._overlay.send_message("{}_{}".format(message_id, line_count), block.text[count], block.color, - block.x, block.y + (spacer * (count - block.offset)), - ttl=60, size=block.size) + try: + self._overlay.send_message("{}_{}".format(message_id, line_count), block.text[count], block.color, + block.x, block.y + (spacer * (count - block.offset)), + ttl=60, size=block.size) + except AttributeError: + count -= 1 + self._overlay.connect() + except Exception as ex: + logger.debug("Exception during draw", exc_info=ex) count += 1 line_count += 1 @@ -182,14 +199,11 @@ def available(self) -> bool: :return: Availability of EDMCOverlay """ - if self._overlay: + if self._overlay is not None: if hasattr(self._overlay, 'connection'): - if self._overlay.connection: - return True - else: - return False - else: - return True + if self._overlay.connection is None: + self._overlay = edmcoverlay.Overlay() + return True else: if edmcoverlay: self._overlay = edmcoverlay.Overlay() diff --git a/src/load.py b/src/load.py index 1d8251d..e405edf 100644 --- a/src/load.py +++ b/src/load.py @@ -1887,6 +1887,7 @@ def update_display() -> None: this.total_label['text'] = '' this.label['text'] = title + signal_summary + text + redraw_overlay = True if this.values_label['text'] != detail_text.strip() else False this.values_label['text'] = detail_text.strip() if this.use_overlay.get() and this.overlay.available(): @@ -1894,10 +1895,11 @@ def update_display() -> None: this.overlay.display("bioscan_title", "BioScan Details", x=this.overlay_anchor_x.get(), y=this.overlay_anchor_y.get(), color=this.overlay_color.get()) - this.overlay.display("bioscan_details", detail_text, - x=this.overlay_anchor_x.get(), y=this.overlay_anchor_y.get() + 20, - color=this.overlay_color.get(), scrolled=this.overlay_detail_scroll.get(), - limit=this.overlay_detail_length.get(), delay=this.overlay_detail_delay.get()) + if redraw_overlay: + this.overlay.display("bioscan_details", detail_text.strip(), + x=this.overlay_anchor_x.get(), y=this.overlay_anchor_y.get() + 20, + color=this.overlay_color.get(), scrolled=this.overlay_detail_scroll.get(), + limit=this.overlay_detail_length.get(), delay=this.overlay_detail_delay.get()) this.overlay.display("bioscan_summary", text, x=this.overlay_summary_x.get(), y=this.overlay_summary_y.get(), size="large", color=this.overlay_color.get())