From 8b639b97c8b6e23427effddb66e77c24bc3d27d2 Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Mon, 19 Aug 2024 11:14:49 +0100 Subject: [PATCH 1/2] Handle absent lzma library The lzma library may be missing in some environments as it is an optional dependency [1]. Fall back to bz2 in this instance. [1] https://devguide.python.org/getting-started/setup-building/ Signed-off-by: Stephen Finucane --- cmd2/cmd2.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index c5c0db78..83fe9818 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -4813,7 +4813,12 @@ def _initialize_history(self, hist_file: str) -> None: to this file when the application exits. """ import json - import lzma + try: + import lzma + bz2 = None + except ModuleNotFoundError: # pragma: no cover + lzma = None + import bz2 self.history = History() # with no persistent history, nothing else in this method is relevant @@ -4841,7 +4846,10 @@ def _initialize_history(self, hist_file: str) -> None: try: with open(hist_file, 'rb') as fobj: compressed_bytes = fobj.read() - history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8') + if lzma is not None: + history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8') + else: + history_json = bz2.decompress(compressed_bytes).decode(encoding='utf-8') self.history = History.from_json(history_json) except FileNotFoundError: # Just use an empty history @@ -4879,7 +4887,12 @@ def _initialize_history(self, hist_file: str) -> None: def _persist_history(self) -> None: """Write history out to the persistent history file as compressed JSON""" - import lzma + try: + import lzma + bz2 = None + except ModuleNotFoundError: # pragma: no cover + lzma = None + import bz2 if not self.persistent_history_file: return @@ -4887,7 +4900,10 @@ def _persist_history(self) -> None: self.history.truncate(self._persistent_history_length) try: history_json = self.history.to_json() - compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8')) + if lzma is not None: + compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8')) + else: + compressed_bytes = bz2.compress(history_json.encode(encoding='utf-8')) with open(self.persistent_history_file, 'wb') as fobj: fobj.write(compressed_bytes) From 63848b9416cc0cd929dbdb8d451c1b7ca76a32c2 Mon Sep 17 00:00:00 2001 From: Kevin Van Brunt Date: Fri, 13 Sep 2024 15:09:09 -0400 Subject: [PATCH 2/2] Refactored Cmd._initialize_history() to handle different exception types between lzma and bz2 libraries. --- cmd2/cmd2.py | 94 +++++++++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 088879ef..280f5382 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -4627,7 +4627,7 @@ def do_ipy(self, _: argparse.Namespace) -> Optional[bool]: # pragma: no cover ) # Start IPython - start_ipython(config=config, argv=[], user_ns=local_vars) + start_ipython(config=config, argv=[], user_ns=local_vars) # type: ignore[no-untyped-call] self.poutput("Now exiting IPython shell...") # The IPython application is a singleton and won't be recreated next time @@ -4814,16 +4814,9 @@ def _initialize_history(self, hist_file: str) -> None: previous sessions will be included. Additionally, all history will be written to this file when the application exits. """ - import json - try: - import lzma - bz2 = None - except ModuleNotFoundError: # pragma: no cover - lzma = None - import bz2 - self.history = History() - # with no persistent history, nothing else in this method is relevant + + # With no persistent history, nothing else in this method is relevant if not hist_file: self.persistent_history_file = hist_file return @@ -4844,31 +4837,60 @@ def _initialize_history(self, hist_file: str) -> None: self.perror(f"Error creating persistent history file directory '{hist_file_dir}': {ex}") return - # Read and process history file + # Read history file try: with open(hist_file, 'rb') as fobj: compressed_bytes = fobj.read() - if lzma is not None: - history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8') - else: - history_json = bz2.decompress(compressed_bytes).decode(encoding='utf-8') - self.history = History.from_json(history_json) except FileNotFoundError: - # Just use an empty history - pass + compressed_bytes = bytes() except OSError as ex: self.perror(f"Cannot read persistent history file '{hist_file}': {ex}") return - except (json.JSONDecodeError, lzma.LZMAError, KeyError, UnicodeDecodeError, ValueError) as ex: + + # Register a function to write history at save + import atexit + + self.persistent_history_file = hist_file + atexit.register(self._persist_history) + + # Empty or nonexistent history file. Nothing more to do. + if not compressed_bytes: + return + + # Decompress history data + try: + import lzma as decompress_lib + + decompress_exceptions: Tuple[type[Exception]] = (decompress_lib.LZMAError,) + except ModuleNotFoundError: # pragma: no cover + import bz2 as decompress_lib # type: ignore[no-redef] + + decompress_exceptions: Tuple[type[Exception]] = (OSError, ValueError) # type: ignore[no-redef] + + try: + history_json = decompress_lib.decompress(compressed_bytes).decode(encoding='utf-8') + except decompress_exceptions as ex: + self.perror( + f"Error decompressing persistent history data '{hist_file}': {ex}\n" + f"The history file will be recreated when this application exits." + ) + return + + # Decode history json + import json + + try: + self.history = History.from_json(history_json) + except (json.JSONDecodeError, KeyError, ValueError) as ex: self.perror( - f"Error processing persistent history file '{hist_file}': {ex}\n" + f"Error processing persistent history data '{hist_file}': {ex}\n" f"The history file will be recreated when this application exits." ) + return self.history.start_session() - self.persistent_history_file = hist_file - # populate readline history + # Populate readline history if rl_type != RlType.NONE: last = None for item in self.history: @@ -4880,33 +4902,21 @@ def _initialize_history(self, hist_file: str) -> None: readline.add_history(line) last = line - # register a function to write history at save - # if the history file is in plain text format from 0.9.12 or lower - # this will fail, and the history in the plain text file will be lost - import atexit - - atexit.register(self._persist_history) - def _persist_history(self) -> None: """Write history out to the persistent history file as compressed JSON""" - try: - import lzma - bz2 = None - except ModuleNotFoundError: # pragma: no cover - lzma = None - import bz2 - if not self.persistent_history_file: return - self.history.truncate(self._persistent_history_length) try: - history_json = self.history.to_json() - if lzma is not None: - compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8')) - else: - compressed_bytes = bz2.compress(history_json.encode(encoding='utf-8')) + import lzma as compress_lib + except ModuleNotFoundError: # pragma: no cover + import bz2 as compress_lib # type: ignore[no-redef] + self.history.truncate(self._persistent_history_length) + history_json = self.history.to_json() + compressed_bytes = compress_lib.compress(history_json.encode(encoding='utf-8')) + + try: with open(self.persistent_history_file, 'wb') as fobj: fobj.write(compressed_bytes) except OSError as ex: