Skip to content

Commit

Permalink
lzo-addons
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghMTiwari committed Nov 21, 2019
1 parent cd36c75 commit 0bd4451
Show file tree
Hide file tree
Showing 9 changed files with 822 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,6 @@ dependencies {
shadowTest library.java.avro_tests
shadowTest library.java.zstd_jni
testRuntimeOnly library.java.slf4j_jdk14
compile group: 'io.airlift', name: 'aircompressor', version: '0.16'
compile group: 'com.facebook.presto.hadoop', name: 'hadoop-apache2', version: '3.2.0-1'
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public enum CompressionMode implements DecompressingChannelFactory {
/** @see Compression#ZSTD */
ZSTD(Compression.ZSTD),

/** @see Compression#LZO */
LZO(Compression.LZO),

/** @see Compression#LZOP */
LZOP(Compression.LZOP),

/** @see Compression#DEFLATE */
DEFLATE(Compression.DEFLATE);

Expand Down Expand Up @@ -139,6 +145,12 @@ static DecompressingChannelFactory fromCanonical(Compression compression) {
case ZSTD:
return ZSTD;

case LZO:
return LZO;

case LZOP:
return LZOP;

case DEFLATE:
return DEFLATE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.beam.sdk.util.LzoCompressorInputStream;
import org.apache.beam.sdk.util.LzoCompressorOutputStream;
import org.apache.beam.sdk.util.LzopCompressorInputStream;
import org.apache.beam.sdk.util.LzopCompressorOutputStream;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
Expand Down Expand Up @@ -151,6 +155,40 @@ public WritableByteChannel writeCompressed(WritableByteChannel channel) throws I
return Channels.newChannel(new ZstdCompressorOutputStream(Channels.newOutputStream(channel)));
}
},
LZO(".lzo", ".lzo") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
try {
return Channels.newChannel(new LzoCompressorInputStream(Channels.newInputStream(channel)));
} catch (IOException e) {
// TODO Auto-generated catch block
e = new IOException("try using lzop.");
throw e;
}
}

@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
return Channels.newChannel(new LzoCompressorOutputStream(Channels.newOutputStream(channel)));
}
},
LZOP(".lzo", ".lzo") {
@Override
public ReadableByteChannel readDecompressed(ReadableByteChannel channel) throws IOException {
try {
return Channels.newChannel(new LzopCompressorInputStream(Channels.newInputStream(channel)));
} catch (IOException e) {
// TODO Auto-generated catch block
e = new IOException("try using lzo.");
throw e;
}
}

@Override
public WritableByteChannel writeCompressed(WritableByteChannel channel) throws IOException {
return Channels.newChannel(new LzopCompressorOutputStream(Channels.newOutputStream(channel)));
}
},

/** Deflate compression. */
DEFLATE(".deflate", ".deflate", ".zlib") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public enum CompressionType implements WritableByteChannelFactory {
/** @see Compression#ZSTD */
ZSTD(Compression.ZSTD),

/** @see Compression#LZO */
LZO(Compression.LZO),

/** @see Compression#LZOP */
LZOP(Compression.LZOP),

/** @see Compression#DEFLATE */
DEFLATE(Compression.DEFLATE);

Expand Down Expand Up @@ -185,6 +191,12 @@ public static CompressionType fromCanonical(Compression canonical) {
case ZSTD:
return ZSTD;

case LZO:
return LZO;

case LZOP:
return LZOP;

case DEFLATE:
return DEFLATE;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import io.airlift.compress.lzo.LzoCodec;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.utils.CountingInputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.compress.utils.InputStreamStatistics;

public class LzoCompressorInputStream extends CompressorInputStream
implements InputStreamStatistics {

private final CountingInputStream countingStream;
private final InputStream lzoIS;

public LzoCompressorInputStream(final InputStream in) throws IOException {
this.lzoIS = new LzoCodec().createInputStream(countingStream = new CountingInputStream(in));
}

@Override
public int available() throws IOException {
return lzoIS.available();
}

@Override
public void close() throws IOException {
lzoIS.close();
}

@Override
public int read(final byte[] b) throws IOException {
return read(b, 0, b.length);
}

@Override
public long skip(final long n) throws IOException {
return IOUtils.skip(lzoIS, n);
}

@Override
public void mark(final int readlimit) {
lzoIS.mark(readlimit);
}

@Override
public boolean markSupported() {
return lzoIS.markSupported();
}

@Override
public int read() throws IOException {
final int ret = lzoIS.read();
count(ret == -1 ? 0 : 1);
return ret;
}

@Override
public int read(final byte[] buf, final int off, final int len) throws IOException {
if (len == 0) {
return 0;
}
final int ret = lzoIS.read(buf, off, len);
count(ret);
return ret;
}

@Override
public String toString() {
return lzoIS.toString();
}

@Override
public void reset() throws IOException {
lzoIS.reset();
}

@Override
public long getCompressedCount() {
return countingStream.getBytesRead();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import io.airlift.compress.lzo.LzoCodec;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.commons.compress.compressors.CompressorOutputStream;

public class LzoCompressorOutputStream extends CompressorOutputStream {

private final OutputStream lzoOS;

public LzoCompressorOutputStream(final OutputStream outStream) throws IOException {
this.lzoOS = new LzoCodec().createOutputStream(outStream);
}

@Override
public void close() throws IOException {
lzoOS.close();
}

@Override
public void write(final int b) throws IOException {
lzoOS.write(b);
}

@Override
public void write(final byte[] b) throws IOException {
lzoOS.write(b);
}

@Override
public void write(final byte[] buf, final int off, final int len) throws IOException {
lzoOS.write(buf, off, len);
}

@Override
public String toString() {
return lzoOS.toString();
}

@Override
public void flush() throws IOException {
lzoOS.flush();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;

import io.airlift.compress.lzo.LzopCodec;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.utils.CountingInputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.compress.utils.InputStreamStatistics;

public class LzopCompressorInputStream extends CompressorInputStream
implements InputStreamStatistics {

private final CountingInputStream countingStream;
private final InputStream lzoIS;

public LzopCompressorInputStream(final InputStream in) throws IOException {
this.lzoIS = new LzopCodec().createInputStream(countingStream = new CountingInputStream(in));
}

@Override
public int available() throws IOException {
return lzoIS.available();
}

@Override
public void close() throws IOException {
lzoIS.close();
}

@Override
public int read(final byte[] b) throws IOException {
return read(b, 0, b.length);
}

@Override
public long skip(final long n) throws IOException {
return IOUtils.skip(lzoIS, n);
}

@Override
public void mark(final int readlimit) {
lzoIS.mark(readlimit);
}

@Override
public boolean markSupported() {
return lzoIS.markSupported();
}

@Override
public int read() throws IOException {
final int ret = lzoIS.read();
count(ret == -1 ? 0 : 1);
return ret;
}

@Override
public int read(final byte[] buf, final int off, final int len) throws IOException {
if (len == 0) {
return 0;
}
final int ret = lzoIS.read(buf, off, len);
count(ret);
return ret;
}

@Override
public String toString() {
return lzoIS.toString();
}

@Override
public void reset() throws IOException {
lzoIS.reset();
}

@Override
public long getCompressedCount() {
return countingStream.getBytesRead();
}
}
Loading

0 comments on commit 0bd4451

Please sign in to comment.