Skip to content

Commit

Permalink
Merge pull request #164 from pycompression/fixbuffercrash
Browse files Browse the repository at this point in the history
Make sure large write calls cannot overflow the buffer in ThreadedGzipWriter
  • Loading branch information
rhpvorderman authored Oct 12, 2023
2 parents 71b7d61 + dc7f6dd commit 36551e5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 15 deletions.
27 changes: 19 additions & 8 deletions src/isal/igzip_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,10 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
gzip_file = io.BufferedReader(
_ThreadedGzipReader(binary_file, block_size=block_size))
else:
# Deflating random data results in an output a little larger than the
# input. Making the output buffer 10% larger is sufficient overkill.
compress_buffer_size = block_size + max(
block_size // 10, 500)
gzip_file = io.BufferedWriter(
_ThreadedGzipWriter(
fp=binary_file,
buffer_size=compress_buffer_size,
block_size=block_size,
level=compresslevel,
threads=threads
),
Expand Down Expand Up @@ -201,15 +197,19 @@ def __init__(self,
level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION,
threads: int = 1,
queue_size: int = 1,
buffer_size: int = 1024 * 1024,
block_size: int = 1024 * 1024,
):
self.lock = threading.Lock()
self.exception: Optional[Exception] = None
self.raw = fp
self.level = level
self.previous_block = b""
# Deflating random data results in an output a little larger than the
# input. Making the output buffer 10% larger is sufficient overkill.
compress_buffer_size = block_size + max(block_size // 10, 500)
self.block_size = block_size
self.compressors: List[isal_zlib._ParallelCompress] = [
isal_zlib._ParallelCompress(buffersize=buffer_size,
isal_zlib._ParallelCompress(buffersize=compress_buffer_size,
level=level) for _ in range(threads)
]
if threads > 1:
Expand Down Expand Up @@ -273,8 +273,19 @@ def write(self, b) -> int:
with self.lock:
if self.exception:
raise self.exception
index = self.index
length = b.nbytes if isinstance(b, memoryview) else len(b)
if length > self.block_size:
# write smaller chunks and return the result
memview = memoryview(b)
start = 0
total_written = 0
while start < length:
total_written += self.write(
memview[start:start+self.block_size])
start += self.block_size
return total_written
data = bytes(b)
index = self.index
zdict = memoryview(self.previous_block)[-DEFLATE_WINDOW_SIZE:]
self.previous_block = data
self.index += 1
Expand Down
36 changes: 29 additions & 7 deletions tests/test_igzip_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ def test_threaded_read():


@pytest.mark.parametrize(["mode", "threads"],
itertools.product(["wb", "wt"], [1, 3]))
itertools.product(["wb", "wt"], [1, 3, -1]))
def test_threaded_write(mode, threads):
with tempfile.NamedTemporaryFile("wb", delete=False) as tmp:
with igzip_threaded.open(tmp, mode, threads=threads) as out_file:
# Use a small block size to simulate many writes.
with igzip_threaded.open(tmp, mode, threads=threads,
block_size=8*1024) as out_file:
gzip_open_mode = "rb" if "b" in mode else "rt"
with gzip.open(TEST_FILE, gzip_open_mode) as in_file:
while True:
Expand Down Expand Up @@ -77,13 +79,33 @@ def test_threaded_read_error():

@pytest.mark.timeout(5)
@pytest.mark.parametrize("threads", [1, 3])
def test_threaded_write_error(threads):
# parallel_deflate_and_crc method is called in a worker thread.
with pytest.raises(OverflowError) as error:
def test_threaded_write_oversized_block_no_error(threads):
# Random bytes are incompressible, and therefore are guaranteed to
# trigger a buffer overflow when larger than block size unless handled
# correctly.
data = os.urandom(1024 * 63) # not a multiple of block_size
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp:
with igzip_threaded.open(
io.BytesIO(), "wb", compresslevel=3, threads=threads
tmp, "wb", compresslevel=3, threads=threads,
block_size=8 * 1024
) as writer:
writer.write(os.urandom(1024 * 1024 * 50))
writer.write(data)
with gzip.open(tmp.name, "rb") as gzipped:
decompressed = gzipped.read()
assert data == decompressed


@pytest.mark.timeout(5)
@pytest.mark.parametrize("threads", [1, 3])
def test_threaded_write_error(threads):
f = igzip_threaded._ThreadedGzipWriter(
fp=io.BytesIO(), level=3,
threads=threads, block_size=8 * 1024)
# Bypass the write method which should not allow blocks larger than
# block_size.
f.input_queues[0].put((os.urandom(1024 * 64), b""))
with pytest.raises(OverflowError) as error:
f.close()
error.match("Compressed output exceeds buffer size")


Expand Down

0 comments on commit 36551e5

Please sign in to comment.