From 9d948b0fe8759fdcb87df0d09d40186ed985d83b Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Thu, 14 Nov 2024 16:42:12 -0800 Subject: [PATCH 1/6] [client-common] Added safeguard for compressor Today, the `compress`/`decompress` can still be invoked even the compressor is closed already and for zstd based compressor, it would crash. This PR add some safeguard and fail fast if the compressor is already closed. --- .../venice/compression/GzipCompressor.java | 14 ++-- .../venice/compression/NoopCompressor.java | 17 +++-- .../venice/compression/VeniceCompressor.java | 73 +++++++++++++++++-- .../compression/ZstdWithDictCompressor.java | 14 ++-- 4 files changed, 91 insertions(+), 27 deletions(-) diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/GzipCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/GzipCompressor.java index c1e68428e6..4ace2a69ce 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/GzipCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/GzipCompressor.java @@ -24,7 +24,7 @@ public GzipCompressor() { } @Override - public byte[] compress(byte[] data) throws IOException { + protected byte[] compressInternal(byte[] data) throws IOException { ReusableGzipOutputStream out = gzipPool.getReusableGzipOutputStream(); try { out.writeHeader(); @@ -37,7 +37,7 @@ public byte[] compress(byte[] data) throws IOException { } @Override - public void close() throws IOException { + protected void closeInternal() throws IOException { try { gzipPool.close(); } catch (Exception e) { @@ -47,7 +47,7 @@ public void close() throws IOException { } @Override - public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException { + protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException { /** * N.B.: We initialize the size of buffer in this output stream at the size of the deflated payload, which is not * ideal, but not necessarily bad either. The assumption is that GZIP usually doesn't compress our payloads that @@ -74,7 +74,7 @@ public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IO } @Override - public ByteBuffer decompress(ByteBuffer data) throws IOException { + protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException { if (data.hasRemaining()) { if (data.hasArray()) { return decompress(data.array(), data.position(), data.remaining()); @@ -89,14 +89,14 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException { } @Override - public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { + protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException { try (InputStream gis = decompress(new ByteArrayInputStream(data, offset, length))) { return ByteBuffer.wrap(IOUtils.toByteArray(gis)); } } @Override - public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) + protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader) throws IOException { byte[] decompressedByteArray; try (InputStream gis = decompress(new ByteArrayInputStream(data, offset, length))) { @@ -111,7 +111,7 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int } @Override - public InputStream decompress(InputStream inputStream) throws IOException { + protected InputStream decompressInternal(InputStream inputStream) throws IOException { return new GZIPInputStream(inputStream); } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java index baa3e80bd8..8e59059696 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java @@ -12,12 +12,12 @@ public NoopCompressor() { } @Override - public byte[] compress(byte[] data) throws IOException { + protected byte[] compressInternal(byte[] data) throws IOException { return data; } @Override - public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException { + protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException { if (startPositionOfOutput != 0) { throw new UnsupportedOperationException("Compression with front padding is not supported for NO_OP."); } @@ -30,17 +30,17 @@ public int hashCode() { } @Override - public ByteBuffer decompress(ByteBuffer data) throws IOException { + protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException { return data; } @Override - public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { + protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException { return ByteBuffer.wrap(data, offset, length); } @Override - public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) + protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader) throws IOException { if (offset < SCHEMA_HEADER_LENGTH) { throw new VeniceException("Start offset does not have enough room for schema header."); @@ -51,10 +51,15 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int } @Override - public InputStream decompress(InputStream inputStream) throws IOException { + protected InputStream decompressInternal(InputStream inputStream) throws IOException { return inputStream; } + @Override + protected void closeInternal() throws IOException { + // do nothing + } + @Override public boolean equals(Object o) { if (o == this) { diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java index bf7b30639d..d2eee47119 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java @@ -1,27 +1,65 @@ package com.linkedin.venice.compression; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.utils.ByteUtils; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class VeniceCompressor implements Closeable { protected static final int SCHEMA_HEADER_LENGTH = ByteUtils.SIZE_OF_INT; private final CompressionStrategy compressionStrategy; + private boolean isClosed = false; + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); protected VeniceCompressor(CompressionStrategy compressionStrategy) { this.compressionStrategy = compressionStrategy; } - public abstract byte[] compress(byte[] data) throws IOException; + interface CompressionRunnable { + R run() throws IOException; + } + + private R executeWithSafeGuard(CompressionRunnable runnable) throws IOException { + readWriteLock.readLock().lock(); + try { + if (isClosed) { + throw new VeniceException(getCompressionStrategy() + " has been closed"); + } + return runnable.run(); + } catch (IOException e) { + throw e; + } finally { + readWriteLock.readLock().unlock(); + } + } + + public byte[] compress(byte[] data) throws IOException { + return executeWithSafeGuard(() -> compressInternal(data)); + } - public abstract ByteBuffer compress(ByteBuffer src, int startPositionOfOutput) throws IOException; + protected abstract byte[] compressInternal(byte[] data) throws IOException; - public abstract ByteBuffer decompress(ByteBuffer data) throws IOException; + public ByteBuffer compress(ByteBuffer src, int startPositionOfOutput) throws IOException { + return executeWithSafeGuard(() -> compressInternal(src, startPositionOfOutput)); + } + + protected abstract ByteBuffer compressInternal(ByteBuffer src, int startPositionOfOutput) throws IOException; + + public ByteBuffer decompress(ByteBuffer data) throws IOException { + return executeWithSafeGuard(() -> decompressInternal(data)); + } - public abstract ByteBuffer decompress(byte[] data, int offset, int length) throws IOException; + protected abstract ByteBuffer decompressInternal(ByteBuffer data) throws IOException; + + public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { + return executeWithSafeGuard(() -> decompressInternal(data, offset, length)); + } + + protected abstract ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException; /** * This method tries to decompress data and maybe prepend the schema header. @@ -29,15 +67,36 @@ protected VeniceCompressor(CompressionStrategy compressionStrategy) { * decompressed data. The ByteBuffer will be positioned at the beginning of the decompressed data and the remaining of * the ByteBuffer will be the length of the decompressed data. */ - public abstract ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) - throws IOException; + public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) + throws IOException { + return executeWithSafeGuard(() -> decompressAndPrependSchemaHeader(data, offset, length, schemaHeader)); + } + + protected abstract ByteBuffer decompressAndPrependSchemaHeaderInternal( + byte[] data, + int offset, + int length, + int schemaHeader) throws IOException; public CompressionStrategy getCompressionStrategy() { return compressionStrategy; } - public abstract InputStream decompress(InputStream inputStream) throws IOException; + public InputStream decompress(InputStream inputStream) throws IOException { + return executeWithSafeGuard(() -> decompressInternal(inputStream)); + } + + protected abstract InputStream decompressInternal(InputStream inputStream) throws IOException; public void close() throws IOException { + readWriteLock.writeLock().lock(); + try { + isClosed = true; + closeInternal(); + } finally { + readWriteLock.writeLock().unlock(); + } } + + protected abstract void closeInternal() throws IOException; } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/ZstdWithDictCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/ZstdWithDictCompressor.java index 0755320486..3b67a2dfda 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/ZstdWithDictCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/ZstdWithDictCompressor.java @@ -47,12 +47,12 @@ public ZstdWithDictCompressor(final byte[] dictionary, int level) { } @Override - public byte[] compress(byte[] data) { + protected byte[] compressInternal(byte[] data) { return compressor.get().compress(data); } @Override - public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException { + protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException { long maxDstSize = Zstd.compressBound(data.remaining()); if (maxDstSize + startPositionOfOutput > Integer.MAX_VALUE) { throw new ZstdException(Zstd.errGeneric(), "Max output size is greater than Integer.MAX_VALUE"); @@ -87,7 +87,7 @@ public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IO } @Override - public ByteBuffer decompress(ByteBuffer data) throws IOException { + protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException { if (data.hasRemaining()) { if (data.hasArray()) { return decompress(data.array(), data.position(), data.remaining()); @@ -107,7 +107,7 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException { } @Override - public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { + protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException { int expectedSize = validateExpectedDecompressedSize(Zstd.decompressedSize(data, offset, length)); ByteBuffer returnedData = ByteBuffer.allocate(expectedSize); int actualSize = decompressor.get() @@ -124,7 +124,7 @@ public ByteBuffer decompress(byte[] data, int offset, int length) throws IOExcep } @Override - public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) + protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader) throws IOException { int expectedDecompressedDataSize = validateExpectedDecompressedSize(Zstd.decompressedSize(data, offset, length)); @@ -138,12 +138,12 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int } @Override - public InputStream decompress(InputStream inputStream) throws IOException { + protected InputStream decompressInternal(InputStream inputStream) throws IOException { return new ZstdInputStream(inputStream).setDict(this.dictDecompress); } @Override - public void close() throws IOException { + protected void closeInternal() throws IOException { this.compressor.close(); this.decompressor.close(); IOUtils.closeQuietly(this.dictCompress); From 6b860c8e4bab7fe42006b8b7d6905ae4be6c1d63 Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Thu, 14 Nov 2024 17:32:06 -0800 Subject: [PATCH 2/6] Fixed integration test failures --- .../java/com/linkedin/venice/compression/VeniceCompressor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java index d2eee47119..d5a6e1733d 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java @@ -69,7 +69,7 @@ public ByteBuffer decompress(byte[] data, int offset, int length) throws IOExcep */ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) throws IOException { - return executeWithSafeGuard(() -> decompressAndPrependSchemaHeader(data, offset, length, schemaHeader)); + return executeWithSafeGuard(() -> decompressAndPrependSchemaHeaderInternal(data, offset, length, schemaHeader)); } protected abstract ByteBuffer decompressAndPrependSchemaHeaderInternal( From be94e2574ec71b62d1386c2ec10c4d4598d53b02 Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Fri, 15 Nov 2024 10:54:06 -0800 Subject: [PATCH 3/6] Minor tweak --- .../java/com/linkedin/venice/compression/VeniceCompressor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java index d5a6e1733d..b0ad577dc1 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java @@ -30,8 +30,6 @@ private R executeWithSafeGuard(CompressionRunnable runnable) throws IOExc throw new VeniceException(getCompressionStrategy() + " has been closed"); } return runnable.run(); - } catch (IOException e) { - throw e; } finally { readWriteLock.readLock().unlock(); } From 8a0ce3a112859e064a3de49d37fffaa2e03ee34c Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Fri, 15 Nov 2024 11:24:04 -0800 Subject: [PATCH 4/6] Added a unit test --- .../compression/TestVeniceCompressor.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/compression/TestVeniceCompressor.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/compression/TestVeniceCompressor.java index 596e3642b3..ba015d4834 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/compression/TestVeniceCompressor.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/compression/TestVeniceCompressor.java @@ -1,6 +1,11 @@ package com.linkedin.venice.compression; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + import com.github.luben.zstd.Zstd; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; @@ -14,6 +19,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.testng.Assert; @@ -173,7 +179,7 @@ private enum SourceDataType { @Test public void testZSTDThrowsExceptionOnNullDictionary() { - Assert.assertThrows( + assertThrows( () -> new CompressorFactory() .createVersionSpecificCompressorIfNotExist(CompressionStrategy.ZSTD_WITH_DICT, "foo_v1", null)); } @@ -205,4 +211,15 @@ public void testCompressorEqual() { } } } + + @Test + public void testCompressorClose() throws IOException { + VeniceCompressor compressor = new ZstdWithDictCompressor("abc".getBytes(), Zstd.maxCompressionLevel()); + String largePayload = RandomStringUtils.randomAlphabetic(500000); + compressor.compress(largePayload.getBytes()); + compressor.close(); + VeniceException exception = + expectThrows(VeniceException.class, () -> compressor.compress(ByteBuffer.wrap(largePayload.getBytes()), 4)); + assertTrue(exception.getMessage().contains("has been closed")); + } } From d8064d4e303ea0c55c5ab33b602d71d172e5a851 Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Fri, 15 Nov 2024 12:04:22 -0800 Subject: [PATCH 5/6] Fixed minor comment --- .../java/com/linkedin/venice/compression/VeniceCompressor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java index b0ad577dc1..1e2680692f 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java @@ -27,7 +27,7 @@ private R executeWithSafeGuard(CompressionRunnable runnable) throws IOExc readWriteLock.readLock().lock(); try { if (isClosed) { - throw new VeniceException(getCompressionStrategy() + " has been closed"); + throw new VeniceException("Compressor for " + getCompressionStrategy() + " has been closed"); } return runnable.run(); } finally { From 8e8ec26418d93cab81b0d9b8c36c37ae20c594cb Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Fri, 15 Nov 2024 12:58:57 -0800 Subject: [PATCH 6/6] Skipped locking for NoopCompressor --- .../venice/compression/NoopCompressor.java | 53 ++++++++++++++++--- .../venice/compression/VeniceCompressor.java | 3 ++ 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java index 8e59059696..3fde4e7d00 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java @@ -6,41 +6,64 @@ import java.nio.ByteBuffer; +/** + * Locking is not necessary for {@link NoopCompressor}, so this class overrides all the public APIs to avoid locking. + */ public class NoopCompressor extends VeniceCompressor { public NoopCompressor() { super(CompressionStrategy.NO_OP); } @Override - protected byte[] compressInternal(byte[] data) throws IOException { + public byte[] compress(byte[] data) throws IOException { return data; } @Override - protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException { + protected byte[] compressInternal(byte[] data) throws IOException { + throw new UnsupportedOperationException("compressInternal"); + } + + @Override + public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException { if (startPositionOfOutput != 0) { throw new UnsupportedOperationException("Compression with front padding is not supported for NO_OP."); } return data; } + @Override + protected ByteBuffer compressInternal(ByteBuffer src, int startPositionOfOutput) throws IOException { + throw new UnsupportedOperationException("compressInternal"); + } + @Override public int hashCode() { return super.hashCode(); } @Override - protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException { + public ByteBuffer decompress(ByteBuffer data) throws IOException { return data; } @Override - protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException { + protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException { + throw new UnsupportedOperationException("decompressInternal"); + } + + @Override + public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { return ByteBuffer.wrap(data, offset, length); } @Override - protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader) + protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException { + throw new UnsupportedOperationException("decompressInternal"); + } + + @Override + public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) throws IOException { if (offset < SCHEMA_HEADER_LENGTH) { throw new VeniceException("Start offset does not have enough room for schema header."); @@ -51,15 +74,31 @@ protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int o } @Override - protected InputStream decompressInternal(InputStream inputStream) throws IOException { + protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader) + throws IOException { + throw new UnsupportedOperationException("decompressAndPrependSchemaHeaderInternal"); + } + + @Override + public InputStream decompress(InputStream inputStream) throws IOException { return inputStream; } @Override - protected void closeInternal() throws IOException { + protected InputStream decompressInternal(InputStream inputStream) throws IOException { + throw new UnsupportedOperationException("decompressInternal"); + } + + @Override + public void close() throws IOException { // do nothing } + @Override + protected void closeInternal() throws IOException { + throw new UnsupportedOperationException("closeInternal"); + } + @Override public boolean equals(Object o) { if (o == this) { diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java index 1e2680692f..6b18eb94de 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java @@ -13,6 +13,9 @@ public abstract class VeniceCompressor implements Closeable { protected static final int SCHEMA_HEADER_LENGTH = ByteUtils.SIZE_OF_INT; private final CompressionStrategy compressionStrategy; private boolean isClosed = false; + /** + * To avoid the race condition between 'compress'/'decompress' operation and 'close'. + */ private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); protected VeniceCompressor(CompressionStrategy compressionStrategy) {