diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index a7ed6c2e87db..1bc7b7e4263f 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -90,4 +90,6 @@ dependencies { shadowTest library.java.avro_tests shadowTest library.java.zstd_jni testRuntimeOnly library.java.slf4j_jdk14 + compile group: 'io.airlift', name: 'aircompressor', version: '0.16' + compile group: 'com.facebook.presto.hadoop', name: 'hadoop-apache2', version: '3.2.0-1' } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index ae878e7623c0..ddfe26b66e1e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -91,6 +91,12 @@ public enum CompressionMode implements DecompressingChannelFactory { /** @see Compression#ZSTD */ ZSTD(Compression.ZSTD), + /** @see Compression#LZO */ + LZO(Compression.LZO), + + /** @see Compression#LZOP */ + LZOP(Compression.LZOP), + /** @see Compression#DEFLATE */ DEFLATE(Compression.DEFLATE); @@ -139,6 +145,12 @@ static DecompressingChannelFactory fromCanonical(Compression compression) { case ZSTD: return ZSTD; + case LZO: + return LZO; + + case LZOP: + return LZOP; + case DEFLATE: return DEFLATE; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java index e4f3624e3655..e3b72e330c34 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java @@ -27,6 +27,10 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import org.apache.beam.sdk.util.LzoCompressorInputStream; +import org.apache.beam.sdk.util.LzoCompressorOutputStream; +import org.apache.beam.sdk.util.LzopCompressorInputStream; +import org.apache.beam.sdk.util.LzopCompressorOutputStream; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints; @@ -151,6 +155,40 @@ public WritableByteChannel writeCompressed(WritableByteChannel channel) throws I return Channels.newChannel(new ZstdCompressorOutputStream(Channels.newOutputStream(channel))); } }, + LZO(".lzo", ".lzo") { + @Override + public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException { + try { + return Channels.newChannel(new LzoCompressorInputStream(Channels.newInputStream(channel))); + } catch (IOException e) { + // TODO Auto-generated catch block + e = new IOException("try using lzop."); + throw e; + } + } + + @Override + public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException { + return Channels.newChannel(new LzoCompressorOutputStream(Channels.newOutputStream(channel))); + } + }, + LZOP(".lzo", ".lzo") { + @Override + public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException { + try { + return Channels.newChannel(new LzopCompressorInputStream(Channels.newInputStream(channel))); + } catch (IOException e) { + // TODO Auto-generated catch block + e = new IOException("try using lzo."); + throw e; + } + } + + @Override + public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException { + return Channels.newChannel(new LzopCompressorOutputStream(Channels.newOutputStream(channel))); + } + }, /** Deflate compression. */ DEFLATE(".deflate", ".deflate", ".zlib") { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 21a2cf4fe78a..dce9b30088bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -140,6 +140,12 @@ public enum CompressionType implements WritableByteChannelFactory { /** @see Compression#ZSTD */ ZSTD(Compression.ZSTD), + /** @see Compression#LZO */ + LZO(Compression.LZO), + + /** @see Compression#LZOP */ + LZOP(Compression.LZOP), + /** @see Compression#DEFLATE */ DEFLATE(Compression.DEFLATE); @@ -185,6 +191,12 @@ public static CompressionType fromCanonical(Compression canonical) { case ZSTD: return ZSTD; + case LZO: + return LZO; + + case LZOP: + return LZOP; + case DEFLATE: return DEFLATE; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzoCompressorInputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzoCompressorInputStream.java new file mode 100644 index 000000000000..2698722cc937 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzoCompressorInputStream.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import io.airlift.compress.lzo.LzoCodec; +import java.io.IOException; +import java.io.InputStream; +import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.utils.CountingInputStream; +import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.compress.utils.InputStreamStatistics; + +public class LzoCompressorInputStream extends CompressorInputStream + implements InputStreamStatistics { + + private final CountingInputStream countingStream; + private final InputStream lzoIS; + + public LzoCompressorInputStream(final InputStream in) throws IOException { + this.lzoIS = new LzoCodec().createInputStream(countingStream = new CountingInputStream(in)); + } + + @Override + public int available() throws IOException { + return lzoIS.available(); + } + + @Override + public void close() throws IOException { + lzoIS.close(); + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(final long n) throws IOException { + return IOUtils.skip(lzoIS, n); + } + + @Override + public void mark(final int readlimit) { + lzoIS.mark(readlimit); + } + + @Override + public boolean markSupported() { + return lzoIS.markSupported(); + } + + @Override + public int read() throws IOException { + final int ret = lzoIS.read(); + count(ret == -1 ? 0 : 1); + return ret; + } + + @Override + public int read(final byte[] buf, final int off, final int len) throws IOException { + if (len == 0) { + return 0; + } + final int ret = lzoIS.read(buf, off, len); + count(ret); + return ret; + } + + @Override + public String toString() { + return lzoIS.toString(); + } + + @Override + public void reset() throws IOException { + lzoIS.reset(); + } + + @Override + public long getCompressedCount() { + return countingStream.getBytesRead(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzoCompressorOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzoCompressorOutputStream.java new file mode 100644 index 000000000000..e82b718a79c2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzoCompressorOutputStream.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import io.airlift.compress.lzo.LzoCodec; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.commons.compress.compressors.CompressorOutputStream; + +public class LzoCompressorOutputStream extends CompressorOutputStream { + + private final OutputStream lzoOS; + + public LzoCompressorOutputStream(final OutputStream outStream) throws IOException { + this.lzoOS = new LzoCodec().createOutputStream(outStream); + } + + @Override + public void close() throws IOException { + lzoOS.close(); + } + + @Override + public void write(final int b) throws IOException { + lzoOS.write(b); + } + + @Override + public void write(final byte[] b) throws IOException { + lzoOS.write(b); + } + + @Override + public void write(final byte[] buf, final int off, final int len) throws IOException { + lzoOS.write(buf, off, len); + } + + @Override + public String toString() { + return lzoOS.toString(); + } + + @Override + public void flush() throws IOException { + lzoOS.flush(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzopCompressorInputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzopCompressorInputStream.java new file mode 100644 index 000000000000..9c38f0a631a8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzopCompressorInputStream.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import io.airlift.compress.lzo.LzopCodec; +import java.io.IOException; +import java.io.InputStream; +import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.utils.CountingInputStream; +import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.compress.utils.InputStreamStatistics; + +public class LzopCompressorInputStream extends CompressorInputStream + implements InputStreamStatistics { + + private final CountingInputStream countingStream; + private final InputStream lzoIS; + + public LzopCompressorInputStream(final InputStream in) throws IOException { + this.lzoIS = new LzopCodec().createInputStream(countingStream = new CountingInputStream(in)); + } + + @Override + public int available() throws IOException { + return lzoIS.available(); + } + + @Override + public void close() throws IOException { + lzoIS.close(); + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(final long n) throws IOException { + return IOUtils.skip(lzoIS, n); + } + + @Override + public void mark(final int readlimit) { + lzoIS.mark(readlimit); + } + + @Override + public boolean markSupported() { + return lzoIS.markSupported(); + } + + @Override + public int read() throws IOException { + final int ret = lzoIS.read(); + count(ret == -1 ? 0 : 1); + return ret; + } + + @Override + public int read(final byte[] buf, final int off, final int len) throws IOException { + if (len == 0) { + return 0; + } + final int ret = lzoIS.read(buf, off, len); + count(ret); + return ret; + } + + @Override + public String toString() { + return lzoIS.toString(); + } + + @Override + public void reset() throws IOException { + lzoIS.reset(); + } + + @Override + public long getCompressedCount() { + return countingStream.getBytesRead(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzopCompressorOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzopCompressorOutputStream.java new file mode 100644 index 000000000000..565272c63db2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/LzopCompressorOutputStream.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import io.airlift.compress.lzo.LzopCodec; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.commons.compress.compressors.CompressorOutputStream; + +public class LzopCompressorOutputStream extends CompressorOutputStream { + + private final OutputStream lzoOS; + + public LzopCompressorOutputStream(final OutputStream outStream) throws IOException { + this.lzoOS = new LzopCodec().createOutputStream(outStream); + } + + @Override + public void close() throws IOException { + lzoOS.close(); + } + + @Override + public void write(final int b) throws IOException { + lzoOS.write(b); + } + + @Override + public void write(final byte[] b) throws IOException { + lzoOS.write(b); + } + + @Override + public void write(final byte[] buf, final int off, final int len) throws IOException { + lzoOS.write(buf, off, len); + } + + @Override + public String toString() { + return lzoOS.toString(); + } + + @Override + public void flush() throws IOException { + lzoOS.flush(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 7afde418b541..a83d77db50e8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -28,6 +28,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import io.airlift.compress.lzo.LzoCodec; +import io.airlift.compress.lzo.LzopCodec; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -58,6 +60,8 @@ import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.LzoCompressorOutputStream; +import org.apache.beam.sdk.util.LzopCompressorOutputStream; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultiset; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; @@ -68,6 +72,7 @@ import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -91,6 +96,20 @@ public void testReadGzip() throws Exception { runReadTest(input, CompressionMode.GZIP); } + /** Test reading nonempty input with lzo. */ + @Test + public void testReadLzo() throws Exception { + byte[] input = generateInput(5000); + runReadTest(input, CompressionMode.LZO); + } + + /** Test reading nonempty input with lzop. */ + @Test + public void testReadLzop() throws Exception { + byte[] input = generateInput(5000); + runReadTest(input, CompressionMode.LZOP); + } + /** Test splittability of files in AUTO mode. */ @Test public void testAutoSplittable() throws Exception { @@ -122,6 +141,12 @@ public void testAutoSplittable() throws Exception { source = CompressedSource.from(new ByteSource("input.zstd", 1)); assertFalse(source.isSplittable()); + // LZO files are not splittable + source = CompressedSource.from(new ByteSource("input.lzo", 1)); + assertFalse(source.isSplittable()); + source = CompressedSource.from(new ByteSource("input.LZO", 1)); + assertFalse(source.isSplittable()); + // DEFLATE files are not splittable source = CompressedSource.from(new ByteSource("input.deflate", 1)); assertFalse(source.isSplittable()); @@ -144,6 +169,7 @@ public void testGzipSplittable() throws Exception { source = CompressedSource.from(new ByteSource("input.gz", 1)) .withDecompression(CompressionMode.GZIP); + assertFalse(source.isSplittable()); source = CompressedSource.from(new ByteSource("input.GZ", 1)) @@ -161,6 +187,28 @@ public void testGzipSplittable() throws Exception { assertFalse(source.isSplittable()); } + /** Test splittability of files in LZO mode -- none should be splittable. */ + @Test + public void testLzoSplittable() throws Exception { + CompressedSource source; + + // LZO files are not splittable + source = + CompressedSource.from(new ByteSource("input.lzo", 1)) + .withDecompression(CompressionMode.LZO); + assertFalse(source.isSplittable()); + + // Other extensions are also not splittable. + source = + CompressedSource.from(new ByteSource("input.txt", 1)) + .withDecompression(CompressionMode.LZO); + assertFalse(source.isSplittable()); + source = + CompressedSource.from(new ByteSource("input.csv", 1)) + .withDecompression(CompressionMode.LZO); + assertFalse(source.isSplittable()); + } + /** Test reading nonempty input with bzip2. */ @Test public void testReadBzip2() throws Exception { @@ -196,6 +244,20 @@ public void testEmptyReadZstd() throws Exception { runReadTest(input, CompressionMode.ZSTD); } + /** Test reading empty input with lzo. */ + @Test + public void testEmptyReadLzo() throws Exception { + byte[] input = generateInput(0); + runReadTest(input, CompressionMode.LZO); + } + + /** Test reading empty input with lzop. */ + @Test + public void testEmptyReadLzop() throws Exception { + byte[] input = generateInput(0); + runReadTest(input, CompressionMode.LZOP); + } + private static byte[] compressGzip(byte[] input) throws IOException { ByteArrayOutputStream res = new ByteArrayOutputStream(); try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) { @@ -204,6 +266,22 @@ private static byte[] compressGzip(byte[] input) throws IOException { return res.toByteArray(); } + private static byte[] compressLzo(byte[] input) throws IOException { + ByteArrayOutputStream res = new ByteArrayOutputStream(); + try (CompressionOutputStream lzoStream = new LzoCodec().createOutputStream(res)) { + lzoStream.write(input); + } + return res.toByteArray(); + } + + private static byte[] compressLzop(byte[] input) throws IOException { + ByteArrayOutputStream res = new ByteArrayOutputStream(); + try (CompressionOutputStream lzopStream = new LzopCodec().createOutputStream(res)) { + lzopStream.write(input); + } + return res.toByteArray(); + } + private static byte[] concat(byte[] first, byte[] second) { byte[] res = new byte[first.length + second.length]; System.arraycopy(first, 0, res, 0, first.length); @@ -235,6 +313,58 @@ public void testReadConcatenatedGzip() throws IOException { assertEquals(Bytes.asList(expected), actual); } + /** + * Using Lzo Codec Test a concatenation of lzo files is correctly decompressed. + * + *

A concatenation of lzo files as one file is a valid lzo file and should decompress to be the + * concatenation of those individual files. + */ + @Test + public void testReadConcatenatedLzo() throws IOException { + byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); + byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); + byte[] expected = concat(header, body); + byte[] totalLzo = concat(compressLzo(header), compressLzo(body)); + File tmpFile = tmpFolder.newFile(); + try (FileOutputStream os = new FileOutputStream(tmpFile)) { + os.write(totalLzo); + } + + CompressedSource source = + CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) + .withDecompression(CompressionMode.LZO); + List actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertEquals(Bytes.asList(expected), actual); + } + + /** + * Using Lzop Codec. Concatenation of lzo files using Lzop codec.The current behavior is it only + * reads the contents of the first file. + * + *

A concatenation of lzo files as one file is a valid lzo file and should decompress to be the + * concatenation of those individual files. + */ + @Test + public void testFailReadConcatenatedLzop() throws IOException { + byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); + byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); + byte[] expected = concat(header, body); + byte[] totalLzop = concat(compressLzop(header), compressLzop(body)); + File tmpFile = tmpFolder.newFile(); + try (FileOutputStream os = new FileOutputStream(tmpFile)) { + os.write(totalLzop); + } + + CompressedSource source = + CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) + .withDecompression(CompressionMode.LZOP); + List actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + // The current behavior is Lzop Codec only reads the contents of the first file. + // Expected and Actual value will not match + thrown.expectMessage("expected:"); + assertEquals(Bytes.asList(expected), actual); + } + /** * Test a bzip2 file containing multiple streams is correctly decompressed. * @@ -267,6 +397,69 @@ public void testReadMultiStreamBzip2() throws IOException { verifyReadContents(output, tmpFile, mode); } + /** + * Test a lzo file containing multiple streams is correctly decompressed. + * + *

A lzo file may contain multiple streams and should decompress as the concatenation of those + * streams. + */ + @Test + public void testReadMultiStreamLzo() throws IOException { + CompressionMode mode = CompressionMode.LZO; + byte[] input1 = generateInput(5, 587973); + byte[] input2 = generateInput(5, 387374); + + ByteArrayOutputStream stream1 = new ByteArrayOutputStream(); + try (OutputStream os = getOutputStreamForMode(mode, stream1)) { + os.write(input1); + } + + ByteArrayOutputStream stream2 = new ByteArrayOutputStream(); + try (OutputStream os = getOutputStreamForMode(mode, stream2)) { + os.write(input2); + } + + File tmpFile = tmpFolder.newFile(); + try (OutputStream os = new FileOutputStream(tmpFile)) { + os.write(stream1.toByteArray()); + os.write(stream2.toByteArray()); + } + + byte[] output = Bytes.concat(input1, input2); + verifyReadContents(output, tmpFile, mode); + } + + /** + * Test a lzop file containing multiple streams.The current behavior is it only reads the contents + * of the first file. + */ + @Test + public void testReadMultiStreamLzop() throws IOException { + CompressionMode mode = CompressionMode.LZOP; + byte[] input1 = generateInput(5, 587973); + byte[] input2 = generateInput(5, 387374); + + ByteArrayOutputStream stream1 = new ByteArrayOutputStream(); + try (OutputStream os = getOutputStreamForMode(mode, stream1)) { + os.write(input1); + } + + ByteArrayOutputStream stream2 = new ByteArrayOutputStream(); + try (OutputStream os = getOutputStreamForMode(mode, stream2)) { + os.write(input2); + } + + File tmpFile = tmpFolder.newFile(); + try (OutputStream os = new FileOutputStream(tmpFile)) { + os.write(stream1.toByteArray()); + os.write(stream2.toByteArray()); + } + + byte[] output = Bytes.concat(input1, input2); + thrown.expectMessage("expected"); + verifyReadContents(output, tmpFile, mode); + } + /** Test reading empty input with bzip2. */ @Test public void testCompressedReadBzip2() throws Exception { @@ -308,6 +501,26 @@ public void testCompressedAccordingToFilepatternZstd() throws Exception { verifyReadContents(input, tmpFile, null /* default auto decompression factory */); } + /** Test reading according to filepattern when the file is lzo compressed using LZO Codec. */ + @Test + public void testCompressedAccordingToFilepatternLzo() throws Exception { + byte[] input = generateInput(100); + File tmpFile = tmpFolder.newFile("test.lzo"); + writeFile(tmpFile, input, CompressionMode.LZO); + verifyReadContents(input, tmpFile, null /* default auto decompression factory */); + } + + /** Test reading according to filepattern when the file is lzo compressed using LZOP Codec. */ + @Test + public void testFailCompressedAccordingToFilepatternLzop() throws Exception { + byte[] input = generateInput(100); + File tmpFile = tmpFolder.newFile("test.lzo"); + writeFile(tmpFile, input, CompressionMode.LZOP); + // By default LZO Codec is called + thrown.expectMessage("encountered EOF while reading block data"); + verifyReadContents(input, tmpFile, null /* default auto decompression factory */); + } + /** Test reading multiple files with different compression. */ @Test public void testHeterogeneousCompression() throws Exception { @@ -338,6 +551,11 @@ public void testHeterogeneousCompression() throws Exception { writeFile(zstdFile, generated, CompressionMode.ZSTD); expected.addAll(Bytes.asList(generated)); + File lzoFile = tmpFolder.newFile(baseName + ".lzo"); + generated = generateInput(1000, 4); + writeFile(lzoFile, generated, CompressionMode.LZO); + expected.addAll(Bytes.asList(generated)); + String filePattern = new File(tmpFolder.getRoot().toString(), baseName + ".*").toString(); CompressedSource source = CompressedSource.from(new ByteSource(filePattern, 1)); @@ -408,6 +626,30 @@ public void testZstdFileIsNotSplittable() throws Exception { assertFalse(source.isSplittable()); } + @Test + public void testLzoFileIsNotSplittable() throws Exception { + String baseName = "test-input"; + + File compressedFile = tmpFolder.newFile(baseName + ".lzo"); + writeFile(compressedFile, generateInput(10), CompressionMode.LZO); + + CompressedSource source = + CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); + assertFalse(source.isSplittable()); + } + + @Test + public void testLzopFileIsNotSplittable() throws Exception { + String baseName = "test-input"; + + File compressedFile = tmpFolder.newFile(baseName + ".lzo"); + writeFile(compressedFile, generateInput(10), CompressionMode.LZOP); + + CompressedSource source = + CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); + assertFalse(source.isSplittable()); + } + /** * Test reading an uncompressed file with {@link CompressionMode#GZIP}, since we must support this * due to properties of services that we read from. @@ -442,6 +684,26 @@ public void testFalseZstdStream() throws Exception { verifyReadContents(input, tmpFile, CompressionMode.ZSTD); } + /** Test reading an uncompressed file with {@link Compression#LZO}, and show that we fail. */ + @Test + public void testFalseLzoStream() throws Exception { + byte[] input = generateInput(1000); + File tmpFile = tmpFolder.newFile("test.lzo"); + Files.write(input, tmpFile); + thrown.expectMessage("expected:"); + verifyReadContents(input, tmpFile, CompressionMode.LZO); + } + + /** Test reading an uncompressed file with {@link Compression#LZOP}, and show that we fail. */ + @Test + public void testFalseLzopStream() throws Exception { + byte[] input = generateInput(1000); + File tmpFile = tmpFolder.newFile("test.lzo"); + Files.write(input, tmpFile); + thrown.expectMessage("try using lzo."); + verifyReadContents(input, tmpFile, CompressionMode.LZOP); + } + /** * Test reading an empty input file with gzip; it must be interpreted as uncompressed because the * gzip header is two bytes. @@ -488,6 +750,50 @@ public void testCompressedReadMultipleFiles() throws Exception { assertEquals(HashMultiset.create(expected), HashMultiset.create(actual)); } + /** Test reading multiple files. LZO Codec */ + @Test + public void testCompressedReadMultipleLzoFiles() throws Exception { + int numFiles = 3; + String baseName = "test_input-"; + String filePattern = new File(tmpFolder.getRoot().toString(), baseName + "*").toString(); + List expected = new ArrayList<>(); + + for (int i = 0; i < numFiles; i++) { + byte[] generated = generateInput(100); + File tmpFile = tmpFolder.newFile(baseName + i); + writeFile(tmpFile, generated, CompressionMode.LZO); + expected.addAll(Bytes.asList(generated)); + } + + CompressedSource source = + CompressedSource.from(new ByteSource(filePattern, 1)) + .withDecompression(CompressionMode.LZO); + List actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertEquals(HashMultiset.create(expected), HashMultiset.create(actual)); + } + + /** Test reading multiple files. LZOP Codec */ + @Test + public void testCompressedReadMultipleLzopFiles() throws Exception { + int numFiles = 3; + String baseName = "test_input-"; + String filePattern = new File(tmpFolder.getRoot().toString(), baseName + "*").toString(); + List expected = new ArrayList<>(); + + for (int i = 0; i < numFiles; i++) { + byte[] generated = generateInput(100); + File tmpFile = tmpFolder.newFile(baseName + i); + writeFile(tmpFile, generated, CompressionMode.LZOP); + expected.addAll(Bytes.asList(generated)); + } + + CompressedSource source = + CompressedSource.from(new ByteSource(filePattern, 1)) + .withDecompression(CompressionMode.LZOP); + List actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertEquals(HashMultiset.create(expected), HashMultiset.create(actual)); + } + @Test public void testDisplayData() { ByteSource inputSource = @@ -539,6 +845,10 @@ private OutputStream getOutputStreamForMode(CompressionMode mode, OutputStream s return new ZstdCompressorOutputStream(stream); case DEFLATE: return new DeflateCompressorOutputStream(stream); + case LZO: + return new LzoCompressorOutputStream(stream); + case LZOP: + return new LzopCompressorOutputStream(stream); default: throw new RuntimeException("Unexpected compression mode"); } @@ -761,6 +1071,132 @@ public void testGzipProgress() throws IOException { } } + @Test + public void testEmptyLzoProgress() throws IOException { + File tmpFile = tmpFolder.newFile("empty.lzo"); + String filename = tmpFile.toPath().toString(); + writeFile(tmpFile, new byte[0], CompressionMode.LZO); + + PipelineOptions options = PipelineOptionsFactory.create(); + CompressedSource source = + CompressedSource.from(new ByteSource(filename, 1)).withDecompression(CompressionMode.LZO); + try (BoundedReader readerOrig = source.createReader(options)) { + assertThat(readerOrig, instanceOf(CompressedReader.class)); + CompressedReader reader = (CompressedReader) readerOrig; + // before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + // confirm empty + assertFalse(reader.start()); + // after reading empty source + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test + public void testLzoProgress() throws IOException { + int numRecords = 3; + File tmpFile = tmpFolder.newFile("nonempty.lzo"); + String filename = tmpFile.toPath().toString(); + writeFile(tmpFile, new byte[numRecords], CompressionMode.LZO); + + PipelineOptions options = PipelineOptionsFactory.create(); + CompressedSource source = + CompressedSource.from(new ByteSource(filename, 1)).withDecompression(CompressionMode.LZO); + try (BoundedReader readerOrig = source.createReader(options)) { + assertThat(readerOrig, instanceOf(CompressedReader.class)); + CompressedReader reader = (CompressedReader) readerOrig; + // before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // confirm has three records + for (int i = 0; i < numRecords; ++i) { + if (i == 0) { + assertTrue(reader.start()); + } else { + assertTrue(reader.advance()); + } + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + } + assertFalse(reader.advance()); + + // after reading empty source + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test + public void testEmptyLzopProgress() throws IOException { + File tmpFile = tmpFolder.newFile("empty.lzo"); + String filename = tmpFile.toPath().toString(); + writeFile(tmpFile, new byte[0], CompressionMode.LZOP); + + PipelineOptions options = PipelineOptionsFactory.create(); + CompressedSource source = + CompressedSource.from(new ByteSource(filename, 1)).withDecompression(CompressionMode.LZOP); + try (BoundedReader readerOrig = source.createReader(options)) { + assertThat(readerOrig, instanceOf(CompressedReader.class)); + CompressedReader reader = (CompressedReader) readerOrig; + // before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // confirm empty + assertFalse(reader.start()); + + // after reading empty source + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test + public void testLzopProgress() throws IOException { + int numRecords = 3; + File tmpFile = tmpFolder.newFile("nonempty.lzo"); + String filename = tmpFile.toPath().toString(); + writeFile(tmpFile, new byte[numRecords], CompressionMode.LZOP); + + PipelineOptions options = PipelineOptionsFactory.create(); + CompressedSource source = + CompressedSource.from(new ByteSource(filename, 1)).withDecompression(CompressionMode.LZOP); + try (BoundedReader readerOrig = source.createReader(options)) { + assertThat(readerOrig, instanceOf(CompressedReader.class)); + CompressedReader reader = (CompressedReader) readerOrig; + // before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // confirm has three records + for (int i = 0; i < numRecords; ++i) { + if (i == 0) { + assertTrue(reader.start()); + } else { + assertTrue(reader.advance()); + } + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + } + assertFalse(reader.advance()); + + // after reading empty source + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + @Test public void testUnsplittable() throws IOException { String baseName = "test-input";