From 179f8cc02cd2a854e2deb95b9b98a8d7223a05f4 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Fri, 9 Aug 2024 16:10:12 +0200 Subject: [PATCH 1/8] Add 3.13 to the supported python versions --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 84a92cf7..7491870d 100644 --- a/setup.py +++ b/setup.py @@ -166,6 +166,7 @@ def build_isa_l(): "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: C", From 4229fc78390285552e63002cb63f53d0d573de0c Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 08:47:47 +0200 Subject: [PATCH 2/8] Fix documentation dependencies --- requirements-docs.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements-docs.txt b/requirements-docs.txt index 89dc59f1..47081d94 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -1,4 +1,3 @@ sphinx sphinx-rtd-theme -# See https://github.com/sphinx-doc/sphinx-argparse/issues/56 -sphinx-argparse <0.5.0 \ No newline at end of file +sphinx-argparse From a1286cf546e76e5fb0709aa2a2741d15247aba7e Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 08:53:05 +0200 Subject: [PATCH 3/8] Reproduce blocking threads issue --- tests/test_igzip_threaded.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index 336a5806..d211f81f 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -9,6 +9,8 @@ import io import itertools import os +import subprocess +import sys import tempfile from pathlib import Path @@ -218,3 +220,22 @@ def test_threaded_writer_does_not_close_stream(): assert not test_stream.closed test_stream.seek(0) assert gzip.decompress(test_stream.read()) == b"thisisatest" + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + ["mode", "threads"], itertools.product(["rb", "wb"], [1, 2])) +def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): + program = tmp_path / "no_context_manager.py" + test_file = tmp_path / "output.gz" + # Write 40 mb input data to saturate read buffer. Because of the repetitive + # nature the resulting gzip file is very small (~40 KiB). + test_file.write_bytes(gzip.compress(b"test" * (10 * 1024 * 1024))) + with open(program, "wt") as f: + f.write("from isal import igzip_threaded\n") + f.write( + f"f = igzip_threaded.open('{test_file}', " + f"mode='{mode}', threads={threads})\n" + ) + f.write("raise Exception('Error')\n") + subprocess.run([sys.executable, str(program)]) From a0639b9e44e38575c4dc177597e0fbe7e8f6d6c0 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 09:00:58 +0200 Subject: [PATCH 4/8] Prevent threads from blocking python exit --- CHANGELOG.rst | 5 +++++ src/isal/igzip_threaded.py | 12 +++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c7f686ac..fe22d86e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,11 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 1.7.1-dev +----------------- ++ Prevent threaded opening from blocking python exit when an error is thrown + in the calling thread. + version 1.7.0 ----------------- + Include a patched ISA-L version 2.31. The applied patches make compilation diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index cd8b4238..da076e9b 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -101,6 +101,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.worker = threading.Thread(target=self._decompress) self._closed = False self.running = True + self._calling_thread = threading.current_thread() self.worker.start() def _check_closed(self, msg=None): @@ -110,7 +111,7 @@ def _check_closed(self, msg=None): def _decompress(self): block_size = self.block_size block_queue = self.queue - while self.running: + while self.running and self._calling_thread.is_alive(): try: data = self.fileobj.read(block_size) except Exception as e: @@ -118,7 +119,7 @@ def _decompress(self): return if not data: return - while self.running: + while self.running and self._calling_thread.is_alive(): try: block_queue.put(data, timeout=0.05) break @@ -215,6 +216,7 @@ def __init__(self, if "b" not in mode: mode += "b" self.lock = threading.Lock() + self._calling_thread = threading.current_thread() self.exception: Optional[Exception] = None self.level = level self.previous_block = b"" @@ -348,7 +350,7 @@ def _compress(self, index: int): try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: - if not self.running: + if not (self.running and self._calling_thread.is_alive()): return continue try: @@ -373,7 +375,7 @@ def _write(self): try: compressed, crc, data_length = output_queue.get(timeout=0.05) except queue.Empty: - if not self.running: + if not (self.running and self._calling_thread.is_alive()): self._crc = total_crc self._size = size return @@ -396,7 +398,7 @@ def _compress_and_write(self): try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: - if not self.running: + if not (self.running and self._calling_thread.is_alive()): self._crc = total_crc self._size = size return From ea30d6f93f3dde7c773b1807b2444a845e641bcf Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 09:13:48 +0200 Subject: [PATCH 5/8] Reproduce faulty flushing behavior --- tests/test_igzip_threaded.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index d211f81f..41c61bfe 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -239,3 +239,19 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): ) f.write("raise Exception('Error')\n") subprocess.run([sys.executable, str(program)]) + + +@pytest.mark.parametrize("threads", [1, 2]) +def test_flush(tmp_path, threads): + test_file = tmp_path / "output.gz" + with igzip_threaded.open(test_file, "wb", threads=threads) as f: + f.write(b"1") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"1" + f.write(b"2") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"12" + f.write(b"3") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"123" + assert gzip.decompress(test_file.read_bytes()) == b"123" From a670f984970c52f47babc8ba656311f81a150ea5 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 09:18:04 +0200 Subject: [PATCH 6/8] Make threaded writer streams flushable --- CHANGELOG.rst | 2 ++ src/isal/igzip_threaded.py | 49 +++++++++++++++++++------------------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index fe22d86e..cdc01206 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,6 +9,8 @@ Changelog version 1.7.1-dev ----------------- ++ Fix a bug where flushing files when writing in threaded mode did not work + properly. + Prevent threaded opening from blocking python exit when an error is thrown in the calling thread. diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index da076e9b..7f1c94fc 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, gzip_file = io.BufferedReader( _ThreadedGzipReader(filename, block_size=block_size)) else: - gzip_file = io.BufferedWriter( + gzip_file = FlushableBufferedWriter( _ThreadedGzipWriter( filename, mode.replace("t", "b"), @@ -167,6 +167,12 @@ def closed(self) -> bool: return self._closed +class FlushableBufferedWriter(io.BufferedWriter): + def flush(self): + super().flush() + self.raw.flush() + + class _ThreadedGzipWriter(io.RawIOBase): """ Write a gzip file using multiple threads. @@ -310,7 +316,7 @@ def write(self, b) -> int: self.input_queues[worker_index].put((data, zdict)) return len(data) - def flush(self): + def _end_gzip_stream(self): self._check_closed() # Wait for all data to be compressed for in_q in self.input_queues: @@ -318,22 +324,27 @@ def flush(self): # Wait for all data to be written for out_q in self.output_queues: out_q.join() + # Write an empty deflate block with a lost block marker. + self.raw.write(isal_zlib.compress(b"", wbits=-15)) + trailer = struct.pack(" None: if self._closed: return - self.flush() + self._end_gzip_stream() self.stop() if self.exception: self.raw.close() self._closed = True raise self.exception - # Write an empty deflate block with a lost block marker. - self.raw.write(isal_zlib.compress(b"", wbits=-15)) - trailer = struct.pack(" Date: Wed, 25 Sep 2024 09:24:44 +0200 Subject: [PATCH 7/8] Remove manual version update from the release checklist. --- .github/release_checklist.md | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/release_checklist.md b/.github/release_checklist.md index a38d8661..8ae899dc 100644 --- a/.github/release_checklist.md +++ b/.github/release_checklist.md @@ -9,7 +9,6 @@ Release checklist from CHANGELOG.rst. - [ ] Push tag to remote. This triggers the wheel/sdist build on github CI. - [ ] merge `main` branch back into `develop`. -- [ ] Add updated version number to develop. (`setup.py` and `src/isal/__init__.py`) - [ ] Build the new tag on readthedocs. Only build the last patch version of each minor version. So `1.1.1` and `1.2.0` but not `1.1.0`, `1.1.1` and `1.2.0`. - [ ] Create a new release on github. From 63f3bcd97b1d89fab36acb50a55bf76c1b62be8f Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 09:27:50 +0200 Subject: [PATCH 8/8] Prepare release 1.7.1 --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index cdc01206..2ffabcb8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,7 +7,7 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. -version 1.7.1-dev +version 1.7.1 ----------------- + Fix a bug where flushing files when writing in threaded mode did not work properly.