Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-common] Added safeguard for compressor #1307

Merged
merged 6 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public GzipCompressor() {
}

@Override
public byte[] compress(byte[] data) throws IOException {
protected byte[] compressInternal(byte[] data) throws IOException {
ReusableGzipOutputStream out = gzipPool.getReusableGzipOutputStream();
try {
out.writeHeader();
Expand All @@ -37,7 +37,7 @@ public byte[] compress(byte[] data) throws IOException {
}

@Override
public void close() throws IOException {
protected void closeInternal() throws IOException {
try {
gzipPool.close();
} catch (Exception e) {
Expand All @@ -47,7 +47,7 @@ public void close() throws IOException {
}

@Override
public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException {
protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException {
/**
* N.B.: We initialize the size of buffer in this output stream at the size of the deflated payload, which is not
* ideal, but not necessarily bad either. The assumption is that GZIP usually doesn't compress our payloads that
Expand All @@ -74,7 +74,7 @@ public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IO
}

@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException {
if (data.hasRemaining()) {
if (data.hasArray()) {
return decompress(data.array(), data.position(), data.remaining());
Expand All @@ -89,14 +89,14 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException {
}

@Override
public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException {
protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException {
try (InputStream gis = decompress(new ByteArrayInputStream(data, offset, length))) {
return ByteBuffer.wrap(IOUtils.toByteArray(gis));
}
}

@Override
public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader)
protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader)
throws IOException {
byte[] decompressedByteArray;
try (InputStream gis = decompress(new ByteArrayInputStream(data, offset, length))) {
Expand All @@ -111,7 +111,7 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int
}

@Override
public InputStream decompress(InputStream inputStream) throws IOException {
protected InputStream decompressInternal(InputStream inputStream) throws IOException {
return new GZIPInputStream(inputStream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ public NoopCompressor() {
}

@Override
public byte[] compress(byte[] data) throws IOException {
protected byte[] compressInternal(byte[] data) throws IOException {
return data;
}

@Override
public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException {
protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException {
if (startPositionOfOutput != 0) {
throw new UnsupportedOperationException("Compression with front padding is not supported for NO_OP.");
}
Expand All @@ -30,17 +30,17 @@ public int hashCode() {
}

@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException {
return data;
}

@Override
public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException {
protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException {
return ByteBuffer.wrap(data, offset, length);
}

@Override
public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader)
protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader)
throws IOException {
if (offset < SCHEMA_HEADER_LENGTH) {
throw new VeniceException("Start offset does not have enough room for schema header.");
Expand All @@ -51,10 +51,15 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int
}

@Override
public InputStream decompress(InputStream inputStream) throws IOException {
protected InputStream decompressInternal(InputStream inputStream) throws IOException {
return inputStream;
}

@Override
protected void closeInternal() throws IOException {
// do nothing
}

@Override
public boolean equals(Object o) {
if (o == this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,100 @@
package com.linkedin.venice.compression;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.utils.ByteUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public abstract class VeniceCompressor implements Closeable {
protected static final int SCHEMA_HEADER_LENGTH = ByteUtils.SIZE_OF_INT;
private final CompressionStrategy compressionStrategy;
private boolean isClosed = false;
gaojieliu marked this conversation as resolved.
Show resolved Hide resolved
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
gaojieliu marked this conversation as resolved.
Show resolved Hide resolved

protected VeniceCompressor(CompressionStrategy compressionStrategy) {
this.compressionStrategy = compressionStrategy;
}

public abstract byte[] compress(byte[] data) throws IOException;
interface CompressionRunnable<R> {
R run() throws IOException;
}

private <R> R executeWithSafeGuard(CompressionRunnable<R> runnable) throws IOException {
readWriteLock.readLock().lock();
try {
if (isClosed) {
throw new VeniceException(getCompressionStrategy() + " has been closed");
gaojieliu marked this conversation as resolved.
Show resolved Hide resolved
}
return runnable.run();
} finally {
readWriteLock.readLock().unlock();
}
}

public byte[] compress(byte[] data) throws IOException {
return executeWithSafeGuard(() -> compressInternal(data));
}

public abstract ByteBuffer compress(ByteBuffer src, int startPositionOfOutput) throws IOException;
protected abstract byte[] compressInternal(byte[] data) throws IOException;

public abstract ByteBuffer decompress(ByteBuffer data) throws IOException;
public ByteBuffer compress(ByteBuffer src, int startPositionOfOutput) throws IOException {
return executeWithSafeGuard(() -> compressInternal(src, startPositionOfOutput));
}

protected abstract ByteBuffer compressInternal(ByteBuffer src, int startPositionOfOutput) throws IOException;

public ByteBuffer decompress(ByteBuffer data) throws IOException {
return executeWithSafeGuard(() -> decompressInternal(data));
}

public abstract ByteBuffer decompress(byte[] data, int offset, int length) throws IOException;
protected abstract ByteBuffer decompressInternal(ByteBuffer data) throws IOException;

public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException {
return executeWithSafeGuard(() -> decompressInternal(data, offset, length));
}

protected abstract ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException;

/**
* This method tries to decompress data and maybe prepend the schema header.
* The returned ByteBuffer will be backed by byte array that starts with schema header, followed by the
* decompressed data. The ByteBuffer will be positioned at the beginning of the decompressed data and the remaining of
* the ByteBuffer will be the length of the decompressed data.
*/
public abstract ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader)
throws IOException;
public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader)
throws IOException {
return executeWithSafeGuard(() -> decompressAndPrependSchemaHeaderInternal(data, offset, length, schemaHeader));
}

protected abstract ByteBuffer decompressAndPrependSchemaHeaderInternal(
byte[] data,
int offset,
int length,
int schemaHeader) throws IOException;

public CompressionStrategy getCompressionStrategy() {
return compressionStrategy;
}

public abstract InputStream decompress(InputStream inputStream) throws IOException;
public InputStream decompress(InputStream inputStream) throws IOException {
return executeWithSafeGuard(() -> decompressInternal(inputStream));
}

protected abstract InputStream decompressInternal(InputStream inputStream) throws IOException;

public void close() throws IOException {
readWriteLock.writeLock().lock();
try {
isClosed = true;
closeInternal();
} finally {
readWriteLock.writeLock().unlock();
}
}

protected abstract void closeInternal() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ public ZstdWithDictCompressor(final byte[] dictionary, int level) {
}

@Override
public byte[] compress(byte[] data) {
protected byte[] compressInternal(byte[] data) {
return compressor.get().compress(data);
}

@Override
public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException {
protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException {
long maxDstSize = Zstd.compressBound(data.remaining());
if (maxDstSize + startPositionOfOutput > Integer.MAX_VALUE) {
throw new ZstdException(Zstd.errGeneric(), "Max output size is greater than Integer.MAX_VALUE");
Expand Down Expand Up @@ -87,7 +87,7 @@ public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IO
}

@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException {
if (data.hasRemaining()) {
if (data.hasArray()) {
return decompress(data.array(), data.position(), data.remaining());
Expand All @@ -107,7 +107,7 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException {
}

@Override
public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException {
protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException {
int expectedSize = validateExpectedDecompressedSize(Zstd.decompressedSize(data, offset, length));
ByteBuffer returnedData = ByteBuffer.allocate(expectedSize);
int actualSize = decompressor.get()
Expand All @@ -124,7 +124,7 @@ public ByteBuffer decompress(byte[] data, int offset, int length) throws IOExcep
}

@Override
public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader)
protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader)
throws IOException {
int expectedDecompressedDataSize = validateExpectedDecompressedSize(Zstd.decompressedSize(data, offset, length));

Expand All @@ -138,12 +138,12 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int
}

@Override
public InputStream decompress(InputStream inputStream) throws IOException {
protected InputStream decompressInternal(InputStream inputStream) throws IOException {
return new ZstdInputStream(inputStream).setDict(this.dictDecompress);
}

@Override
public void close() throws IOException {
protected void closeInternal() throws IOException {
this.compressor.close();
this.decompressor.close();
IOUtils.closeQuietly(this.dictCompress);
Expand Down