Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write a specialized threaded compressor and special case writing with one thread #162

Merged
merged 17 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.

version 1.5.0-dev
-----------------
+ Make a special case for threads==1 in ``igzip_threaded.open`` for writing
files. This now combines the writing and compression thread for less
overhead.
+ Write a specialized function for compressing blocks in a threaded fashion.
This function maximizes time spent outside the GIL.

version 1.4.1
-----------------
+ Fix several errors related to unclosed files and buffers.
Expand Down
131 changes: 94 additions & 37 deletions src/isal/igzip_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@


def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
encoding=None, errors=None, newline=None, *, threads=1):
encoding=None, errors=None, newline=None, *, threads=1,
block_size=1024 * 1024):
"""
Utilize threads to read and write gzip objects and escape the GIL.
Comparable to gzip.open. This method is only usable for streamed reading
Expand All @@ -39,6 +40,8 @@
:param threads: If 0 will defer to igzip.open, if < 0 will use all threads
available to the system. Reading gzip can only
use one thread.
:param block_size: Determines how large the blocks in the read/write
queues are for threaded reading and writing.
:return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper,
depending on the mode.
"""
Expand All @@ -61,21 +64,31 @@
else:
raise TypeError("filename must be a str or bytes object, or a file")
if "r" in mode:
gzip_file = io.BufferedReader(_ThreadedGzipReader(binary_file))
gzip_file = io.BufferedReader(
_ThreadedGzipReader(binary_file, block_size=block_size))
else:
# Deflating random data results in an output a little larger than the
# input. Making the output buffer 10% larger is sufficient overkill.
compress_buffer_size = block_size + max(
block_size // 10, 500)
gzip_file = io.BufferedWriter(
_ThreadedGzipWriter(binary_file, compresslevel, threads),
buffer_size=1024 * 1024
_ThreadedGzipWriter(
fp=binary_file,
buffer_size=compress_buffer_size,
level=compresslevel,
threads=threads
),
buffer_size=block_size
)
if "t" in mode:
return io.TextIOWrapper(gzip_file, encoding, errors, newline)
return gzip_file


class _ThreadedGzipReader(io.RawIOBase):
def __init__(self, fp, queue_size=4, block_size=8 * 1024 * 1024):
def __init__(self, fp, queue_size=2, block_size=1024 * 1024):
self.raw = fp
self.fileobj = igzip._IGzipReader(fp, buffersize=8 * 1024 * 1024)
self.fileobj = igzip._IGzipReader(fp, buffersize=8 * block_size)
self.pos = 0
self.read_file = False
self.queue = queue.Queue(queue_size)
Expand Down Expand Up @@ -179,35 +192,49 @@

The writer thread reads from output queues and uses the crc32_combine
function to calculate the total crc. It also writes the compressed block.

When only one thread is requested, only the input queue is used and
compressing and output is handled in one thread.
"""
def __init__(self,
fp: BinaryIO,
level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION,
threads: int = 1,
queue_size: int = 2):
if level < 0 or level > 3:
raise ValueError(
f"Invalid compression level, "
f"level should be between 0 and 3: {level}")
queue_size: int = 1,
buffer_size: int = 1024 * 1024,
):
self.lock = threading.Lock()
self.exception: Optional[Exception] = None
self.raw = fp
self.level = level
self.previous_block = b""
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.index = 0
self.compressors: List[isal_zlib._ParallelCompress] = [
isal_zlib._ParallelCompress(buffersize=buffer_size,
level=level) for _ in range(threads)
]
if threads > 1:
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_worker = threading.Thread(target=self._write)
self.compression_workers = [
threading.Thread(target=self._compress, args=(i,))
for i in range(threads)
]
elif threads == 1:
self.input_queues = [queue.Queue(queue_size)]
self.output_queues = []
self.compression_workers = []
self.output_worker = threading.Thread(
target=self._compress_and_write)
else:
raise ValueError(f"threads should be at least 1, got {threads}")
self.threads = threads
self.index = 0
self._crc = 0
self.running = False
self._size = 0
self.output_worker = threading.Thread(target=self._write)
self.compression_workers = [
threading.Thread(target=self._compress, args=(i,))
for i in range(threads)
]
self._closed = False
self._write_gzip_header()
self.start()
Expand Down Expand Up @@ -289,6 +316,7 @@
def _compress(self, index: int):
in_queue = self.input_queues[index]
out_queue = self.output_queues[index]
compressor: isal_zlib._ParallelCompress = self.compressors[index]
while True:
try:
data, zdict = in_queue.get(timeout=0.05)
Expand All @@ -297,23 +325,11 @@
return
continue
try:
compressor = isal_zlib.compressobj(
self.level, wbits=-15, zdict=zdict)
compressed = compressor.compress(data) + compressor.flush(
isal_zlib.Z_SYNC_FLUSH)
crc = isal_zlib.crc32(data)
compressed, crc = compressor.compress_and_crc(data, zdict)
except Exception as e:
with self.lock:
self.exception = e
# Abort everything and empty the queue
in_queue.task_done()
self.running = False
while True:
try:
_ = in_queue.get(timeout=0.05)
in_queue.task_done()
except queue.Empty:
return
in_queue.task_done()
self._set_error_and_empty_queue(e, in_queue)
return
data_length = len(data)
out_queue.put((compressed, crc, data_length))
in_queue.task_done()
Expand Down Expand Up @@ -341,5 +357,46 @@
output_queue.task_done()
index += 1

def _compress_and_write(self):
if not self.threads == 1:
raise SystemError("Compress_and_write is for one thread only")

Check warning on line 362 in src/isal/igzip_threaded.py

View check run for this annotation

Codecov / codecov/patch

src/isal/igzip_threaded.py#L362

Added line #L362 was not covered by tests
fp = self.raw
total_crc = 0
size = 0
in_queue = self.input_queues[0]
compressor = self.compressors[0]
while True:
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
if not self.running:
self._crc = total_crc
self._size = size
return
continue
try:
compressed, crc = compressor.compress_and_crc(data, zdict)
except Exception as e:
in_queue.task_done()
self._set_error_and_empty_queue(e, in_queue)
return
data_length = len(data)
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
size += data_length
fp.write(compressed)
in_queue.task_done()

def _set_error_and_empty_queue(self, error, q):
with self.lock:
self.exception = error
# Abort everything and empty the queue
self.running = False
while True:
try:
_ = q.get(timeout=0.05)
q.task_done()

Check warning on line 397 in src/isal/igzip_threaded.py

View check run for this annotation

Codecov / codecov/patch

src/isal/igzip_threaded.py#L397

Added line #L397 was not covered by tests
except queue.Empty:
return

def writable(self) -> bool:
return True
4 changes: 4 additions & 0 deletions src/isal/isal_zlib.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def adler32(__data, __value: int = 1) -> int: ...
def crc32(__data, __value: int = 0) -> int: ...
def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ...

class _ParallelCompress:
def __init__(self, buffersize: int, level: int): ...
def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ...

def compress(__data,
level: int = ISAL_DEFAULT_COMPRESSION,
wbits: int = MAX_WBITS) -> bytes: ...
Expand Down
Loading
Loading