diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b042ead1..fac4ed78 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,14 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 1.5.0-dev +----------------- ++ Make a special case for threads==1 in ``igzip_threaded.open`` for writing + files. This now combines the writing and compression thread for less + overhead. ++ Write a specialized function for compressing blocks in a threaded fashion. + This function maximizes time spent outside the GIL. + version 1.4.1 ----------------- + Fix several errors related to unclosed files and buffers. diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index e4cd5fc5..7a3d24af 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -20,7 +20,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, - encoding=None, errors=None, newline=None, *, threads=1): + encoding=None, errors=None, newline=None, *, threads=1, + block_size=1024 * 1024): """ Utilize threads to read and write gzip objects and escape the GIL. Comparable to gzip.open. This method is only usable for streamed reading @@ -39,6 +40,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, :param threads: If 0 will defer to igzip.open, if < 0 will use all threads available to the system. Reading gzip can only use one thread. + :param block_size: Determines how large the blocks in the read/write + queues are for threaded reading and writing. :return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper, depending on the mode. """ @@ -61,11 +64,21 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, else: raise TypeError("filename must be a str or bytes object, or a file") if "r" in mode: - gzip_file = io.BufferedReader(_ThreadedGzipReader(binary_file)) + gzip_file = io.BufferedReader( + _ThreadedGzipReader(binary_file, block_size=block_size)) else: + # Deflating random data results in an output a little larger than the + # input. Making the output buffer 10% larger is sufficient overkill. + compress_buffer_size = block_size + max( + block_size // 10, 500) gzip_file = io.BufferedWriter( - _ThreadedGzipWriter(binary_file, compresslevel, threads), - buffer_size=1024 * 1024 + _ThreadedGzipWriter( + fp=binary_file, + buffer_size=compress_buffer_size, + level=compresslevel, + threads=threads + ), + buffer_size=block_size ) if "t" in mode: return io.TextIOWrapper(gzip_file, encoding, errors, newline) @@ -73,9 +86,9 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, class _ThreadedGzipReader(io.RawIOBase): - def __init__(self, fp, queue_size=4, block_size=8 * 1024 * 1024): + def __init__(self, fp, queue_size=2, block_size=1024 * 1024): self.raw = fp - self.fileobj = igzip._IGzipReader(fp, buffersize=8 * 1024 * 1024) + self.fileobj = igzip._IGzipReader(fp, buffersize=8 * block_size) self.pos = 0 self.read_file = False self.queue = queue.Queue(queue_size) @@ -179,35 +192,49 @@ class _ThreadedGzipWriter(io.RawIOBase): The writer thread reads from output queues and uses the crc32_combine function to calculate the total crc. It also writes the compressed block. + + When only one thread is requested, only the input queue is used and + compressing and output is handled in one thread. """ def __init__(self, fp: BinaryIO, level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION, threads: int = 1, - queue_size: int = 2): - if level < 0 or level > 3: - raise ValueError( - f"Invalid compression level, " - f"level should be between 0 and 3: {level}") + queue_size: int = 1, + buffer_size: int = 1024 * 1024, + ): self.lock = threading.Lock() self.exception: Optional[Exception] = None self.raw = fp self.level = level self.previous_block = b"" - self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [ - queue.Queue(queue_size) for _ in range(threads)] - self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ - queue.Queue(queue_size) for _ in range(threads)] - self.index = 0 + self.compressors: List[isal_zlib._ParallelCompress] = [ + isal_zlib._ParallelCompress(buffersize=buffer_size, + level=level) for _ in range(threads) + ] + if threads > 1: + self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [ + queue.Queue(queue_size) for _ in range(threads)] + self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ + queue.Queue(queue_size) for _ in range(threads)] + self.output_worker = threading.Thread(target=self._write) + self.compression_workers = [ + threading.Thread(target=self._compress, args=(i,)) + for i in range(threads) + ] + elif threads == 1: + self.input_queues = [queue.Queue(queue_size)] + self.output_queues = [] + self.compression_workers = [] + self.output_worker = threading.Thread( + target=self._compress_and_write) + else: + raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads + self.index = 0 self._crc = 0 self.running = False self._size = 0 - self.output_worker = threading.Thread(target=self._write) - self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,)) - for i in range(threads) - ] self._closed = False self._write_gzip_header() self.start() @@ -289,6 +316,7 @@ def closed(self) -> bool: def _compress(self, index: int): in_queue = self.input_queues[index] out_queue = self.output_queues[index] + compressor: isal_zlib._ParallelCompress = self.compressors[index] while True: try: data, zdict = in_queue.get(timeout=0.05) @@ -297,23 +325,11 @@ def _compress(self, index: int): return continue try: - compressor = isal_zlib.compressobj( - self.level, wbits=-15, zdict=zdict) - compressed = compressor.compress(data) + compressor.flush( - isal_zlib.Z_SYNC_FLUSH) - crc = isal_zlib.crc32(data) + compressed, crc = compressor.compress_and_crc(data, zdict) except Exception as e: - with self.lock: - self.exception = e - # Abort everything and empty the queue - in_queue.task_done() - self.running = False - while True: - try: - _ = in_queue.get(timeout=0.05) - in_queue.task_done() - except queue.Empty: - return + in_queue.task_done() + self._set_error_and_empty_queue(e, in_queue) + return data_length = len(data) out_queue.put((compressed, crc, data_length)) in_queue.task_done() @@ -341,5 +357,46 @@ def _write(self): output_queue.task_done() index += 1 + def _compress_and_write(self): + if not self.threads == 1: + raise SystemError("Compress_and_write is for one thread only") + fp = self.raw + total_crc = 0 + size = 0 + in_queue = self.input_queues[0] + compressor = self.compressors[0] + while True: + try: + data, zdict = in_queue.get(timeout=0.05) + except queue.Empty: + if not self.running: + self._crc = total_crc + self._size = size + return + continue + try: + compressed, crc = compressor.compress_and_crc(data, zdict) + except Exception as e: + in_queue.task_done() + self._set_error_and_empty_queue(e, in_queue) + return + data_length = len(data) + total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length) + size += data_length + fp.write(compressed) + in_queue.task_done() + + def _set_error_and_empty_queue(self, error, q): + with self.lock: + self.exception = error + # Abort everything and empty the queue + self.running = False + while True: + try: + _ = q.get(timeout=0.05) + q.task_done() + except queue.Empty: + return + def writable(self) -> bool: return True diff --git a/src/isal/isal_zlib.pyi b/src/isal/isal_zlib.pyi index 53caf837..9ed725a2 100644 --- a/src/isal/isal_zlib.pyi +++ b/src/isal/isal_zlib.pyi @@ -38,6 +38,10 @@ def adler32(__data, __value: int = 1) -> int: ... def crc32(__data, __value: int = 0) -> int: ... def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ... +class _ParallelCompress: + def __init__(self, buffersize: int, level: int): ... + def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ... + def compress(__data, level: int = ISAL_DEFAULT_COMPRESSION, wbits: int = MAX_WBITS) -> bytes: ... diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index 35e77fe1..33f796bf 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -288,6 +288,189 @@ isal_zlib_crc32_combine(PyObject *module, PyObject *args) { crc32_comb(crc1, crc2, crc2_length) & 0xFFFFFFFF); } + +typedef struct { + PyObject_HEAD + uint8_t *buffer; + uint32_t buffer_size; + struct isal_zstream zst; +} ParallelCompress; + +static void +ParallelCompress_dealloc(ParallelCompress *self) +{ + PyMem_Free(self->buffer); + PyMem_Free(self->zst.level_buf); + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject * +ParallelCompress__new__(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + Py_ssize_t buffer_size = 0; + int level = ISAL_DEFAULT_COMPRESSION; + static char *format = "n|i:ParallelCompress__new__"; + static char *kwarg_names[] = {"buffersize", "level", NULL}; + if (PyArg_ParseTupleAndKeywords(args, kwargs, format, kwarg_names, + &buffer_size, &level) < 0) { + return NULL; + } + uint32_t level_buf_size; + if (mem_level_to_bufsize(level, MEM_LEVEL_DEFAULT, &level_buf_size) < 0) { + PyErr_Format(PyExc_ValueError, "Invalid compression level %d", level); + return NULL; + } + if (buffer_size > UINT32_MAX) { + PyErr_Format(PyExc_ValueError, + "buffersize must be at most %zd, got %zd", + (Py_ssize_t)UINT32_MAX, buffer_size); + } + ParallelCompress *self = PyObject_New(ParallelCompress, type); + if (self == NULL) { + return PyErr_NoMemory(); + } + self->buffer = NULL; + self->zst.level_buf = NULL; + isal_deflate_init(&self->zst); + uint8_t *level_buf = PyMem_Malloc(level_buf_size); + if (level_buf == NULL) { + Py_DECREF(self); + return PyErr_NoMemory(); + } + uint8_t *buffer = PyMem_Malloc(buffer_size); + if (buffer == NULL) { + Py_DECREF(self); + PyMem_Free(level_buf); + return PyErr_NoMemory(); + } + self->buffer = buffer; + self->buffer_size = buffer_size; + self->zst.level_buf = level_buf; + self->zst.level_buf_size = level_buf_size; + self->zst.gzip_flag = IGZIP_GZIP_NO_HDR; + self->zst.hist_bits = ISAL_DEF_MAX_HIST_BITS; + self->zst.level = (uint32_t)level; + self->zst.flush = SYNC_FLUSH; + return (PyObject *)self; +} + + +PyDoc_STRVAR(ParallelCompress_compress_and_crc__doc__, +"compress_and_crc($self, data, zdict, /)\n" +"--\n" +"\n" +"Function specifically designed for use in parallel compression. Data is \n" +"compressed using deflate and Z_SYNC_FLUSH is used to ensure the block aligns\n" +"to a byte boundary. Also the CRC is calculated. This function is designed to \n" +"maximize the time spent outside the GIL\n" +"\n" +" data\n" +" bytes-like object containing the to be compressed data\n" +" zdict\n" +" last 32 bytes of the previous block\n" +); +#define PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF \ + { \ + "compress_and_crc", (PyCFunction)ParallelCompress_compress_and_crc, \ + METH_FASTCALL, ParallelCompress_compress_and_crc__doc__} + +static PyObject * +ParallelCompress_compress_and_crc(ParallelCompress *self, + PyObject *const *args, + Py_ssize_t nargs) +{ + if (nargs != 2) { + PyErr_Format( + PyExc_TypeError, + "compress_and_crc takes exactly 2 arguments, got %zd", + nargs); + return NULL; + } + Py_buffer data; + Py_buffer zdict; + if (PyObject_GetBuffer(args[0], &data, PyBUF_SIMPLE) == -1) { + return NULL; + } + if (PyObject_GetBuffer(args[1], &zdict, PyBUF_SIMPLE) == -1) { + PyBuffer_Release(&data); + return NULL; + } + + if (data.len + zdict.len > UINT32_MAX) { + PyErr_Format(PyExc_OverflowError, + "Can only compress %d bytes of data", UINT32_MAX); + goto error; + } + PyThreadState *_save; + Py_UNBLOCK_THREADS + isal_deflate_reset(&self->zst); + self->zst.avail_in = data.len; + self->zst.next_in = data.buf; + self->zst.next_out = self->buffer; + self->zst.avail_out = self->buffer_size; + int err = isal_deflate_set_dict(&self->zst, zdict.buf, zdict.len); + if (err != 0){ + Py_BLOCK_THREADS; + isal_deflate_error(err); + goto error; + } + err = isal_deflate(&self->zst); + Py_BLOCK_THREADS; + + if (err != COMP_OK) { + isal_deflate_error(err); + goto error; + } + if (self->zst.avail_out == 0) { + PyErr_Format( + PyExc_OverflowError, + "Compressed output exceeds buffer size of %u", self->buffer_size + ); + goto error; + } + if (self->zst.avail_in != 0) { + PyErr_Format( + PyExc_RuntimeError, + "Developer error input bytes are still available: %u. " + "Please contact the developers by creating an issue at " + "https://github.com/pycompression/python-isal/issues", + self->zst.avail_in); + goto error; + } + PyObject *out_tup = PyTuple_New(2); + PyObject *crc_obj = PyLong_FromUnsignedLong(self->zst.internal_state.crc); + PyObject *out_bytes = PyBytes_FromStringAndSize( + (char *)self->buffer, self->zst.next_out - self->buffer); + if (out_bytes == NULL || out_tup == NULL || crc_obj == NULL) { + Py_XDECREF(out_bytes); Py_XDECREF(out_tup); Py_XDECREF(crc_obj); + goto error; + } + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + PyTuple_SET_ITEM(out_tup, 0, out_bytes); + PyTuple_SET_ITEM(out_tup, 1, crc_obj); + return out_tup; +error: + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + return NULL; +} + +static PyMethodDef ParallelCompress_methods[] = { + PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF, + {NULL}, +}; + +static PyTypeObject ParallelCompress_Type = { + .tp_name = "isal_zlib._ParallelCompress", + .tp_basicsize = sizeof(ParallelCompress), + .tp_doc = PyDoc_STR( + "A reusable zstream and buffer fast parallel compression."), + .tp_dealloc = (destructor)ParallelCompress_dealloc, + .tp_new = ParallelCompress__new__, + .tp_methods = ParallelCompress_methods, +}; + PyDoc_STRVAR(zlib_compress__doc__, "compress($module, data, /, level=ISAL_DEFAULT_COMPRESSION, wbits=MAX_WBITS)\n" "--\n" @@ -2055,6 +2238,15 @@ PyInit_isal_zlib(void) return NULL; } + if (PyType_Ready(&ParallelCompress_Type) != 0) { + return NULL; + } + Py_INCREF(&ParallelCompress_Type); + if (PyModule_AddObject(m, "_ParallelCompress", + (PyObject *)&ParallelCompress_Type) < 0) { + return NULL; + } + PyModule_AddIntConstant(m, "MAX_WBITS", ISAL_DEF_MAX_HIST_BITS); PyModule_AddIntConstant(m, "DEFLATED", Z_DEFLATED); PyModule_AddIntMacro(m, DEF_MEM_LEVEL); diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index a5c9128d..d6430dc8 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -7,6 +7,8 @@ import gzip import io +import itertools +import os import tempfile from pathlib import Path @@ -25,10 +27,11 @@ def test_threaded_read(): assert thread_data == data -@pytest.mark.parametrize("mode", ["wb", "wt"]) -def test_threaded_write(mode): +@pytest.mark.parametrize(["mode", "threads"], + itertools.product(["wb", "wt"], [1, 3])) +def test_threaded_write(mode, threads): with tempfile.NamedTemporaryFile("wb", delete=False) as tmp: - with igzip_threaded.open(tmp, mode, threads=-1) as out_file: + with igzip_threaded.open(tmp, mode, threads=threads) as out_file: gzip_open_mode = "rb" if "b" in mode else "rt" with gzip.open(TEST_FILE, gzip_open_mode) as in_file: while True: @@ -73,14 +76,15 @@ def test_threaded_read_error(): @pytest.mark.timeout(5) -def test_threaded_write_error(monkeypatch): - tmp = tempfile.mktemp() - # Compressobj method is called in a worker thread. - monkeypatch.delattr(igzip_threaded.isal_zlib, "compressobj") - with pytest.raises(AttributeError) as error: - with igzip_threaded.open(tmp, "wb", compresslevel=3) as writer: - writer.write(b"x") - error.match("no attribute 'compressobj'") +@pytest.mark.parametrize("threads", [1, 3]) +def test_threaded_write_error(threads): + # parallel_deflate_and_crc method is called in a worker thread. + with pytest.raises(OverflowError) as error: + with igzip_threaded.open( + io.BytesIO(), "wb", compresslevel=3, threads=threads + ) as writer: + writer.write(os.urandom(1024 * 1024 * 50)) + error.match("Compressed output exceeds buffer size") def test_close_reader(): @@ -92,8 +96,10 @@ def test_close_reader(): f.close() -def test_close_writer(): - f = igzip_threaded._ThreadedGzipWriter(io.BytesIO()) +@pytest.mark.parametrize("threads", [1, 3]) +def test_close_writer(threads): + f = igzip_threaded._ThreadedGzipWriter( + io.BytesIO(), threads=threads) f.close() assert f.closed # Make sure double closing does not raise errors @@ -117,6 +123,13 @@ def test_writer_wrong_level(): error.match("42") +def test_writer_too_low_threads(): + with pytest.raises(ValueError) as error: + igzip_threaded._ThreadedGzipWriter(io.BytesIO(), threads=0) + error.match("threads") + error.match("at least 1") + + def test_reader_read_after_close(): with open(TEST_FILE, "rb") as test_f: f = igzip_threaded._ThreadedGzipReader(test_f) @@ -126,8 +139,9 @@ def test_reader_read_after_close(): error.match("closed") -def test_writer_write_after_close(): - f = igzip_threaded._ThreadedGzipWriter(io.BytesIO()) +@pytest.mark.parametrize("threads", [1, 3]) +def test_writer_write_after_close(threads): + f = igzip_threaded._ThreadedGzipWriter(io.BytesIO(), threads=threads) f.close() with pytest.raises(ValueError) as error: f.write(b"abc")