diff --git a/.gitignore b/.gitignore index 90f7cf1..bf380f2 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ columnar_format_qat_wrapper/target/ javah/ jni-header/ target/ +commons/ diff --git a/es_qat_wrapper/7.6.1/README.md b/es_qat_wrapper/7.6.1/README.md new file mode 100644 index 0000000..f34ceb1 --- /dev/null +++ b/es_qat_wrapper/7.6.1/README.md @@ -0,0 +1,134 @@ +#How to deploy the Elasticsearch with QAT + +##I. Install the QATzip and QAT Drivers + +1. Update + + $ sudo yum update + +2. Create new file intel-qat.repo in /etc/yum.repos.d, the content is as follows: + + [intel-qat] + name=Intel QAT + baseurl=https://download.01.org/QAT/repo + gpgcheck=0 +3. Install + + $ sudo yum install QAT + $ sudo yum install QATzip + +4. Configure huge page related setting + + $ sudo cat /etc/sysctl.conf + ensure the line vm.nr_hugepages=512 exists in the file + + $ sudo sysctl -p + $ sudo cat /proc/meminfo | grep -i hugepages_total + HugePages_Total: 512 + +5. Config QAT Codec + + $ chmod 777 $ICP_ROOT/build + $ chmod 777 $ICP_ROOT/build/libusdm_drv_s.so + $ chmod 777 /dev/qat_adf_ctl + $ chmod 777 /dev/qat_dev_processes + $ chmod 777 /dev/uio* + $ chmod 777 /dev/usdm_drv + $ chmod -R 777 /dev/hugepages + + The $ICP_ROOT/ is in /opt/intel/QAT by default, and the $DZ_ROOT is in +/opt/intel/QATzip by default +6. Start the qat service + + $ sudo service qat_service start +7. Run the following command to check if the QATzip is set up correctly for compressing or decompressing files. + + $ qzip -k $your_input_file +##II. Copy the .so files to the /lib64 + $ sudo cp libQatCodecEs.so /lib64 + $ sudo cp libqatcodec.so /lib64 + +##III. Unzip the Elasticsearch binary file + $ tar -zcvf elasticsearch-7.6.1-SNAPSHOT-linux-x86_64.tar.gz +##IV. Start the Elaticsearch service +1. Config the elasticsearch settings + + $ vim config/elasticsearch.yml +2. Start the Elasticsearch service + + $ bin/elasticsearch + +#How to build +## I. Set the environments +1. Install JDK + +we need jdk13 for Elasticsearch 7.6.1 and jdk8 for IntelQatCodec + + 1. $ sudo yum install -y curl + 2. $ curl -O https://download.java.net/java/GA/jdk13/5b8a42f3905b406298b72d750b6919f6/33/GPL/openjdk-13_linux-x64_bin.tar.gz + 3. $ tar xvf openjdk-13_linux-x64_bin.tar.gz + 4. $ sudo mv jdk-13 /opt/ + 5. Configure the java environment + export JAVA_HOME=/root/jdk-13 + export PATH=$JAVA_HOME/bin:$PATH + 6. Confirm the java version + $ java -version + +2.Install Gradle + + 1. Download Gradle + $ wget https://services.gradle.org/distributions/gradle-5.2.1-bin.zip -P /tmp + 2. Install Gradle + $ sudo unzip -d /opt/gradle /tmp/gradle-*.zip + 3. Setup environment variables + export GRADLE_HOME=/opt/gradle/gradle-5.2.1 + export PATH=${GRADLE_HOME}/bin:${PATH} + 4. Check if the Gradle install was successful. + $gradle -v +3.Install maven + + 1. $ sudo yum install maven + 2. $ mvn -version + +## II. Build IntelQATCodec + + $ cd $IntelQATCodecSrcDri + $ mvn clean install -Dqatzip.libs=/opt/intel/QATzip -Dqatzip.src=/opt/intel/QATzip -DskipTests + Then we can get the following files that we need. + + $IntelQATCodecSrcDri/lucene_qat_wrapper/target/lucene_qat_wrapper.jar + $IntelQATCodecSrcDri/lucene_qat_wrapper/target/libqatcodec.so + $IntelQATCodecSrcDri/es_qat_wrapper/target/es_qat_wrapper.jar + $IntelQATCodecSrcDri/es_qat_wrapper/target/classes/com/intel/qat/native/lib/Linux/amd64/libQatCodecEs.so + + We need to copy these files to other places: + + $ sudo cp libqatcodec.so libQatCodecEs.so /lib64 + $ cp lucene_qat_wrapper.jar es_qat_wrapper/7.6.1/lucene-8.4.0/lucene/lib/ + $ cp es_qat_wrapper.jar es_qat_wrapper/7.6.1/elasticsearch/buildSrc/libs/ + +## III. Apply the lucene patch + + $ cd $IntelQATCodecSrcDri/elasticsearch_qat_wrapper/7.6.1 + $ ./apply_lucene_jars.sh 8.4.0 $IntelQATCodecSrcDri + +##IV. Build the lucene in target folder + + $ cd $IntelQATCodecSrcDri/elasticsearch_qat_wrapper/7.6.1/target/LUCENE + $ ant -autoproxy clean compile +Then we need to copy the jars to the ./elasticsearch_qat_wrapper/7.6.1/elasticsearch/buildSrc/libs/ + + lucene-core-8.4.0-SNAPSHOT.jar + lucene_qat_wrapper.jar +##V. Apply the elastcsearch patch + $ cd $IntelQATCodecSrcDri/elasticsearch_qat_wrapper/7.6.1 + $ ./apply_es_jars.sh 7.6.1 $IntelQATCodecSrcDri + +##VI. Build the elasticsearch in target folder + + $ java -version +To make sure the java version is 11+.(We use jdk13) + + $ cd $IntelQATCodecSrcDri/elasticsearch_qat_wrapper/7.6.1/target/elasticsearch + $ ./gradlew :distribution:archives:linux-tar:assemble --parallel +and then we can get the binary files in the ./distribution/archives/linux-tar/build/distributions/ diff --git a/es_qat_wrapper/7.6.1/apply_es_jars.sh b/es_qat_wrapper/7.6.1/apply_es_jars.sh new file mode 100755 index 0000000..568178c --- /dev/null +++ b/es_qat_wrapper/7.6.1/apply_es_jars.sh @@ -0,0 +1,95 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ +#!/bin/bash + +declare -a supported_Elasticsearch_versions=("7.6.1") +declare -A es_lucene_version_m=(["7.6.1"]="8.4.0") + +# Repo Address +ES_REPO=https://github.com/elastic/elasticsearch.git +ES_version_base="7.6.1" + +ES_version=$1 +QATCodec_SRC_DIR=$2 + +ES_QAT_DIR=$QATCodec_SRC_DIR/es_qat_wrapper/${ES_version_base}/elasticsearch +TARGET_DIR=$QATCodec_SRC_DIR/es_qat_wrapper/${ES_version_base}/target +ES_SRC_DIR=$TARGET_DIR/elasticsearch + +function clone_repo(){ + echo "Clone Branch $1 from Repo $2" + git clone -b $1 --single-branch $2 $ES_SRC_DIR +} + +function checkout_branch(){ + pushd $ES_SRC_DIR + Branch_name=VERSION-${ES_version_base} + git checkout -b $Branch_name + popd +} + +function usage(){ + printf "Usage: sh build_es_jars.sh es_version [PATH/TO/QAT_Codec_SRC]\n (e.g. sh build_es_jars.sh 7.6.1 /home/user/workspace/QATCodec)\n" + exit 1 +} + +function check_ES_version(){ + valid_version=false + for v in $supported_Elasticsearch_versions + do + if [ "$v" = "$1" ]; then + valid_version=true + break; + fi + done + if ! $valid_version ; then + printf "Unsupported elasticsearch version $1, current supported versions include: ${supported_Elasticsearch_versions[@]} \n" + exit 1 + fi +} + +apply_patch_to_es(){ + pushd $TARGET_DIR + ES_TAG="v${ES_version_base}" + clone_repo $ES_TAG $ES_REPO + checkout_branch + echo yes | cp -rf $ES_QAT_DIR/buildSrc/libs $ES_SRC_DIR/buildSrc + popd +} + +apply_diff_to_es(){ + pushd $ES_SRC_DIR + git apply --reject --whitespace=fix $ES_QAT_DIR/elasticsearch_7_6_1.diff + popd +} + +if [ "$#" -ne 2 ]; then + usage +fi + + +check_ES_version $ES_version + +if [ -d $TARGET_DIR ]; then + echo "$TARGET_DIR is not clean" +else + mkdir -p $TARGET_DIR +fi + +apply_patch_to_es +apply_diff_to_es \ No newline at end of file diff --git a/es_qat_wrapper/7.6.1/apply_lucene_jars.sh b/es_qat_wrapper/7.6.1/apply_lucene_jars.sh new file mode 100755 index 0000000..a4f90ec --- /dev/null +++ b/es_qat_wrapper/7.6.1/apply_lucene_jars.sh @@ -0,0 +1,97 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ +#!/bin/bash + +declare -a supported_Lucene_versions=("8.4.0") + +# Repo Address +LUCENE_REPO=https://github.com/apache/lucene-solr.git +ES_version_base="7.6.1" +Lucene_version_base="8.4.0" + +LUCENE_version=$1 +QATCodec_SRC_DIR=$2 + +LUCENE_QAT_DIR=$QATCodec_SRC_DIR/es_qat_wrapper/${ES_version_base}/lucene-${Lucene_version_base} +TARGET_DIR=$QATCodec_SRC_DIR/es_qat_wrapper/${ES_version_base}/target +LUCENE_SRC_DIR=$TARGET_DIR/LUCENE + +function clone_repo(){ + + echo "Clone Branch $1 from Repo $2" + git clone -b $1 --single-branch $2 $LUCENE_SRC_DIR + +} + +function checkout_branch(){ + pushd $LUCENE_SRC_DIR + Branch_name=VERSION-${Lucene_version_base} + git checkout -b $Branch_name + popd +} + +function usage(){ + printf "Usage: sh build_lucene_jars.sh lucene_version [PATH/TO/QAT_Codec_SRC]\n (e.g. sh build_lucene_jars.sh 8.4.0 /home/user/workspace/QATCodec)\n" + exit 1 +} + +function check_LUCENE_version(){ + valid_version=false + for v in $supported_Lucene_versions + do + if [ "$v" = "$1" ]; then + valid_version=true + break; + fi + done + if ! $valid_version ; then + printf "Unsupported Lucene version $1, current supported versions include: ${supported_Lucene_versions[@]} \n" + exit 1 + fi +} + +apply_patch_to_lucene(){ + pushd $TARGET_DIR + LUCENE_TAG="releases/lucene-solr/${Lucene_version_base}" + clone_repo $LUCENE_TAG $LUCENE_REPO + checkout_branch + echo yes | cp -rf $LUCENE_QAT_DIR/lucene/lib $LUCENE_SRC_DIR/lucene + popd +} + +apply_diff_to_lucene(){ + pushd $LUCENE_SRC_DIR + git apply --reject --whitespace=fix $LUCENE_QAT_DIR/lucene_8_4_0.diff + popd +} + +if [ "$#" -ne 2 ]; then + usage +fi + + +check_LUCENE_version $LUCENE_version + +if [ -d $TARGET_DIR ]; then + echo "$TARGET_DIR is not clean" +else + mkdir -p $TARGET_DIR +fi + +apply_patch_to_lucene +apply_diff_to_lucene diff --git a/es_qat_wrapper/7.6.1/elasticsearch/elasticsearch_7_6_1.diff b/es_qat_wrapper/7.6.1/elasticsearch/elasticsearch_7_6_1.diff new file mode 100644 index 0000000..8347d56 --- /dev/null +++ b/es_qat_wrapper/7.6.1/elasticsearch/elasticsearch_7_6_1.diff @@ -0,0 +1,821 @@ +diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle +index 7a02f7c2cf3..7ad505a6a35 100644 +--- a/buildSrc/build.gradle ++++ b/buildSrc/build.gradle +@@ -110,6 +110,10 @@ dependencies { + compile 'commons-codec:commons-codec:1.12' + compile 'org.apache.commons:commons-compress:1.19' + ++ //compile 'com.intel.qat:es_qat_wrapper:1.0.0' ++ compile fileTree(dir: 'libs', includes: ['*jar']) ++ testCompile fileTree(dir: 'libs', includes: ['*jar']) ++ + compile 'com.netflix.nebula:gradle-extra-configurations-plugin:3.0.3' + compile 'com.netflix.nebula:nebula-publishing-plugin:4.4.4' + compile 'com.netflix.nebula:gradle-info-plugin:5.1.0' +diff --git a/server/build.gradle b/server/build.gradle +index 6e9dcdbd34e..8234454f056 100644 +--- a/server/build.gradle ++++ b/server/build.gradle +@@ -85,7 +85,7 @@ dependencies { + testRuntime project(':libs:elasticsearch-plugin-classloader') + + // lucene +- compile "org.apache.lucene:lucene-core:${versions.lucene}" ++ //compile "org.apache.lucene:lucene-core:${versions.lucene}" + compile "org.apache.lucene:lucene-analyzers-common:${versions.lucene}" + compile "org.apache.lucene:lucene-backward-codecs:${versions.lucene}" + compile "org.apache.lucene:lucene-grouping:${versions.lucene}" +@@ -105,6 +105,10 @@ dependencies { + compile project(":libs:elasticsearch-cli") + compile 'com.carrotsearch:hppc:0.8.1' + ++ //compile 'com.intel.qat:es_qat_wrapper:1.0.0' ++ compile fileTree(dir: '../buildSrc/libs', includes: ['*jar']) ++ testCompile fileTree(dir: '../buildSrc/libs', includes: ['*jar']) ++ + // time handling, remove with java 8 time + compile "joda-time:joda-time:${versions.joda}" + +diff --git a/server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java b/server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java +index 2ff2f4e95df..16629396a76 100644 +--- a/server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java ++++ b/server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java +@@ -30,7 +30,8 @@ import java.util.Objects; + + public class CompressorFactory { + +- public static final Compressor COMPRESSOR = new DeflateCompressor(); ++ //public static final Compressor COMPRESSOR = new DeflateCompressor(); ++ public static final Compressor COMPRESSOR = new QatCompressor(); + + public static boolean isCompressed(BytesReference bytes) { + return compressor(bytes) != null; +diff --git a/server/src/main/java/org/elasticsearch/common/compress/QatCompressor.java b/server/src/main/java/org/elasticsearch/common/compress/QatCompressor.java +new file mode 100644 +index 00000000000..5aef96c70c9 +--- /dev/null ++++ b/server/src/main/java/org/elasticsearch/common/compress/QatCompressor.java +@@ -0,0 +1,120 @@ ++/* ++ * Licensed to Elasticsearch under one or more contributor ++ * license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ++ * ownership. Elasticsearch licenses this file to you under ++ * the Apache License, Version 2.0 (the "License"); you may ++ * not use this file except in compliance with the License. ++ * You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package org.elasticsearch.common.compress; ++ ++import com.intel.qat.es.QatCompressionInputStream; ++import com.intel.qat.es.QatCompressionOutputStream; ++import org.apache.logging.log4j.LogManager; ++import org.apache.logging.log4j.Logger; ++import org.elasticsearch.common.bytes.BytesReference; ++import org.elasticsearch.common.io.stream.InputStreamStreamInput; ++import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; ++import org.elasticsearch.common.io.stream.StreamInput; ++import org.elasticsearch.common.io.stream.StreamOutput; ++ ++import java.io.*; ++import java.util.Arrays; ++import java.util.concurrent.atomic.AtomicBoolean; ++ ++ ++public class QatCompressor implements Compressor { ++ private static final byte[] HEADER = new byte[]{'Q', 'A', 'T', '\0'}; ++ private static final int LEVEL = 1; ++ // limit the number of JNI calls ++ private static final int BUFFER_SIZE = 4096; ++ //add log to identify whether using qat ++ private static final Logger logger = LogManager.getLogger(QatCompressor.class); ++ ++ @Override ++ public boolean isCompressed(BytesReference bytes) { ++ logger.debug("--> go into the isCompressed function"); ++ if (bytes.length() < HEADER.length) { ++ return false; ++ } ++ for (int i = 0; i < HEADER.length; ++i) { ++ if (bytes.get(i) != HEADER[i]) { ++ return false; ++ } ++ } ++ return true; ++ } ++ ++ @Override ++ public StreamInput streamInput(StreamInput in) throws IOException { ++ logger.debug("--> go into the streamInput function"); ++ final byte[] headerBytes = new byte[HEADER.length]; ++ int len = 0; ++ while (len < headerBytes.length) { ++ final int read = in.read(headerBytes, len, headerBytes.length - len); ++ if (read == -1) { ++ break; ++ } ++ len += read; ++ } ++ if (len != HEADER.length || Arrays.equals(headerBytes, HEADER) == false) { ++ throw new IllegalArgumentException("Input stream is not compressed with QAT!"); ++ } ++ ++ final boolean useNativeBuffer = false; ++ ++ QatCompressionInputStream qatInputStream = new QatCompressionInputStream(in, BUFFER_SIZE, useNativeBuffer); ++ InputStream decompressedIn = new BufferedInputStream(qatInputStream, BUFFER_SIZE); ++ ++ return new InputStreamStreamInput(decompressedIn) { ++ final AtomicBoolean closed = new AtomicBoolean(false); ++ ++ public void close() throws IOException { ++ try { ++ super.close(); ++ } finally { ++ if (closed.compareAndSet(false, true)) { ++ // important to release memory ++ qatInputStream.close(); ++ } ++ } ++ } ++ }; ++ } ++ ++ @Override ++ public StreamOutput streamOutput(StreamOutput out) throws IOException { ++ logger.debug("--> go into the streamOutput function"); ++ out.writeBytes(HEADER); ++ ++ final boolean useNativeBuffer = false; ++ QatCompressionOutputStream qatOutputStream = new QatCompressionOutputStream(out, LEVEL, BUFFER_SIZE, useNativeBuffer); ++ OutputStream compressedOut = new BufferedOutputStream(qatOutputStream, BUFFER_SIZE); ++ ++ return new OutputStreamStreamOutput(compressedOut) { ++ final AtomicBoolean closed = new AtomicBoolean(false); ++ ++ public void close() throws IOException { ++ try { ++ super.close(); ++ } finally { ++ if (closed.compareAndSet(false, true)) { ++ // important to release memory ++ qatOutputStream.close(); ++ } ++ } ++ } ++ }; ++ } ++} +diff --git a/server/src/main/java/org/elasticsearch/index/codec/CodecService.java b/server/src/main/java/org/elasticsearch/index/codec/CodecService.java +index 0909465da4f..63cdca262de 100644 +--- a/server/src/main/java/org/elasticsearch/index/codec/CodecService.java ++++ b/server/src/main/java/org/elasticsearch/index/codec/CodecService.java +@@ -41,6 +41,7 @@ public class CodecService { + + public static final String DEFAULT_CODEC = "default"; + public static final String BEST_COMPRESSION_CODEC = "best_compression"; ++ public static final String QAT_CODEC = "qat"; + /** the raw unfiltered lucene default. useful for testing */ + public static final String LUCENE_DEFAULT_CODEC = "lucene_default"; + +@@ -49,11 +50,14 @@ public class CodecService { + if (mapperService == null) { + codecs.put(DEFAULT_CODEC, new Lucene84Codec()); + codecs.put(BEST_COMPRESSION_CODEC, new Lucene84Codec(Mode.BEST_COMPRESSION)); ++ codecs.put(QAT_CODEC,new Lucene84Codec(Mode.QAT)); + } else { + codecs.put(DEFAULT_CODEC, + new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger)); + codecs.put(BEST_COMPRESSION_CODEC, + new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger)); ++ codecs.put(QAT_CODEC, ++ new PerFieldMappingPostingFormatCodec(Mode.QAT,mapperService, logger)); + } + codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault()); + for (String codec : Codec.availableCodecs()) { +diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +index 7716cf93ffd..23d89913a4d 100644 +--- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java ++++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +@@ -105,12 +105,13 @@ public final class EngineConfig { + switch (s) { + case "default": + case "best_compression": ++ case "qat": + case "lucene_default": + return s; + default: + if (Codec.availableCodecs().contains(s) == false) { // we don't error message the not officially supported ones + throw new IllegalArgumentException( +- "unknown value for [index.codec] must be one of [default, best_compression] but was: " + s); ++ "unknown value for [index.codec] must be one of [default, best_compression,qat] but was: " + s); + } + return s; + } +diff --git a/server/src/main/resources/org/elasticsearch/bootstrap/security.policy b/server/src/main/resources/org/elasticsearch/bootstrap/security.policy +index 4e3bc562770..395b6c9bd5e 100644 +--- a/server/src/main/resources/org/elasticsearch/bootstrap/security.policy ++++ b/server/src/main/resources/org/elasticsearch/bootstrap/security.policy +@@ -24,6 +24,12 @@ + //// SecurityManager impl: + //// Must have all permissions to properly perform access checks + ++grant { ++ permission java.lang.RuntimePermission "loadLibrary.*"; ++ permission java.io.FilePermission "/*", "read"; ++ permission java.io.FilePermission "/*", "write"; ++}; ++ + grant codeBase "${codebase.elasticsearch-secure-sm}" { + permission java.security.AllPermission; + }; +@@ -157,4 +163,10 @@ grant { + permission java.io.FilePermission "/sys/fs/cgroup/memory", "read"; + permission java.io.FilePermission "/sys/fs/cgroup/memory/-", "read"; + ++ permission java.io.FilePermission "/lib64/*", "read"; ++ permission java.io.FilePermission "/lib64/*", "write"; ++ permission java.lang.RuntimePermission "loadLibrary.* "; ++ permission java.io.FilePermission "/* ", "write "; ++ permission java.io.FilePermission "/* ", "delete "; ++ permission java.io.FilePermission "/*", "read"; + }; +diff --git a/server/src/test/java/org/elasticsearch/common/compress/QatCompressTests.java b/server/src/test/java/org/elasticsearch/common/compress/QatCompressTests.java +new file mode 100644 +index 00000000000..dec2c87b5dc +--- /dev/null ++++ b/server/src/test/java/org/elasticsearch/common/compress/QatCompressTests.java +@@ -0,0 +1,429 @@ ++/* ++ * Licensed to Elasticsearch under one or more contributor ++ * license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ++ * ownership. Elasticsearch licenses this file to you under ++ * the Apache License, Version 2.0 (the "License"); you may ++ * not use this file except in compliance with the License. ++ * You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package org.elasticsearch.common.compress; ++ ++import org.apache.lucene.util.LineFileDocs; ++import org.apache.lucene.util.TestUtil; ++import org.elasticsearch.common.io.stream.ByteBufferStreamInput; ++import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; ++import org.elasticsearch.common.io.stream.StreamInput; ++import org.elasticsearch.common.io.stream.StreamOutput; ++import org.elasticsearch.test.ESTestCase; ++ ++import java.io.ByteArrayOutputStream; ++import java.io.IOException; ++import java.nio.ByteBuffer; ++import java.nio.charset.StandardCharsets; ++import java.util.Random; ++import java.util.concurrent.CountDownLatch; ++ ++/** ++ * Test streaming compression (e.g. used for recovery) ++ */ ++public class QatCompressTests extends ESTestCase { ++ ++ private final Compressor compressor = new QatCompressor(); ++ ++ public void testRandom() throws IOException { ++ Random r = random(); ++ for (int i = 0; i < 10; i++) { ++ byte bytes[] = new byte[TestUtil.nextInt(r, 1, 100000)]; ++ r.nextBytes(bytes); ++ doTest(bytes); ++ } ++ } ++ ++ public void testRandomThreads() throws Exception { ++ final Random r = random(); ++ int threadCount = TestUtil.nextInt(r, 2, 6); ++ Thread[] threads = new Thread[threadCount]; ++ final CountDownLatch startingGun = new CountDownLatch(1); ++ for (int tid=0; tid < threadCount; tid++) { ++ final long seed = r.nextLong(); ++ threads[tid] = new Thread() { ++ @Override ++ public void run() { ++ try { ++ Random r = new Random(seed); ++ startingGun.await(); ++ for (int i = 0; i < 10; i++) { ++ byte bytes[] = new byte[TestUtil.nextInt(r, 1, 100000)]; ++ r.nextBytes(bytes); ++ doTest(bytes); ++ } ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ } ++ }; ++ threads[tid].start(); ++ } ++ startingGun.countDown(); ++ for (Thread t : threads) { ++ t.join(); ++ } ++ } ++ ++ public void testLineDocs() throws IOException { ++ Random r = random(); ++ LineFileDocs lineFileDocs = new LineFileDocs(r); ++ for (int i = 0; i < 10; i++) { ++ int numDocs = TestUtil.nextInt(r, 1, 200); ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ for (int j = 0; j < numDocs; j++) { ++ String s = lineFileDocs.nextDoc().get("body"); ++ bos.write(s.getBytes(StandardCharsets.UTF_8)); ++ } ++ doTest(bos.toByteArray()); ++ } ++ lineFileDocs.close(); ++ } ++ ++ public void testLineDocsThreads() throws Exception { ++ final Random r = random(); ++ int threadCount = TestUtil.nextInt(r, 2, 6); ++ Thread[] threads = new Thread[threadCount]; ++ final CountDownLatch startingGun = new CountDownLatch(1); ++ for (int tid=0; tid < threadCount; tid++) { ++ final long seed = r.nextLong(); ++ threads[tid] = new Thread() { ++ @Override ++ public void run() { ++ try { ++ Random r = new Random(seed); ++ startingGun.await(); ++ LineFileDocs lineFileDocs = new LineFileDocs(r); ++ for (int i = 0; i < 10; i++) { ++ int numDocs = TestUtil.nextInt(r, 1, 200); ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ for (int j = 0; j < numDocs; j++) { ++ String s = lineFileDocs.nextDoc().get("body"); ++ bos.write(s.getBytes(StandardCharsets.UTF_8)); ++ } ++ doTest(bos.toByteArray()); ++ } ++ lineFileDocs.close(); ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ } ++ }; ++ threads[tid].start(); ++ } ++ startingGun.countDown(); ++ for (Thread t : threads) { ++ t.join(); ++ } ++ } ++ ++ public void testRepetitionsL() throws IOException { ++ Random r = random(); ++ for (int i = 0; i < 10; i++) { ++ int numLongs = TestUtil.nextInt(r, 1, 10000); ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ long theValue = r.nextLong(); ++ for (int j = 0; j < numLongs; j++) { ++ if (r.nextInt(10) == 0) { ++ theValue = r.nextLong(); ++ } ++ bos.write((byte) (theValue >>> 56)); ++ bos.write((byte) (theValue >>> 48)); ++ bos.write((byte) (theValue >>> 40)); ++ bos.write((byte) (theValue >>> 32)); ++ bos.write((byte) (theValue >>> 24)); ++ bos.write((byte) (theValue >>> 16)); ++ bos.write((byte) (theValue >>> 8)); ++ bos.write((byte) theValue); ++ } ++ doTest(bos.toByteArray()); ++ } ++ } ++ ++ public void testRepetitionsLThreads() throws Exception { ++ final Random r = random(); ++ int threadCount = TestUtil.nextInt(r, 2, 6); ++ Thread[] threads = new Thread[threadCount]; ++ final CountDownLatch startingGun = new CountDownLatch(1); ++ for (int tid=0; tid < threadCount; tid++) { ++ final long seed = r.nextLong(); ++ threads[tid] = new Thread() { ++ @Override ++ public void run() { ++ try { ++ Random r = new Random(seed); ++ startingGun.await(); ++ for (int i = 0; i < 10; i++) { ++ int numLongs = TestUtil.nextInt(r, 1, 10000); ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ long theValue = r.nextLong(); ++ for (int j = 0; j < numLongs; j++) { ++ if (r.nextInt(10) == 0) { ++ theValue = r.nextLong(); ++ } ++ bos.write((byte) (theValue >>> 56)); ++ bos.write((byte) (theValue >>> 48)); ++ bos.write((byte) (theValue >>> 40)); ++ bos.write((byte) (theValue >>> 32)); ++ bos.write((byte) (theValue >>> 24)); ++ bos.write((byte) (theValue >>> 16)); ++ bos.write((byte) (theValue >>> 8)); ++ bos.write((byte) theValue); ++ } ++ doTest(bos.toByteArray()); ++ } ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ } ++ }; ++ threads[tid].start(); ++ } ++ startingGun.countDown(); ++ for (Thread t : threads) { ++ t.join(); ++ } ++ } ++ ++ public void testRepetitionsI() throws IOException { ++ Random r = random(); ++ for (int i = 0; i < 10; i++) { ++ int numInts = TestUtil.nextInt(r, 1, 20000); ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ int theValue = r.nextInt(); ++ for (int j = 0; j < numInts; j++) { ++ if (r.nextInt(10) == 0) { ++ theValue = r.nextInt(); ++ } ++ bos.write((byte) (theValue >>> 24)); ++ bos.write((byte) (theValue >>> 16)); ++ bos.write((byte) (theValue >>> 8)); ++ bos.write((byte) theValue); ++ } ++ doTest(bos.toByteArray()); ++ } ++ } ++ ++ public void testRepetitionsIThreads() throws Exception { ++ final Random r = random(); ++ int threadCount = TestUtil.nextInt(r, 2, 6); ++ Thread[] threads = new Thread[threadCount]; ++ final CountDownLatch startingGun = new CountDownLatch(1); ++ for (int tid=0; tid < threadCount; tid++) { ++ final long seed = r.nextLong(); ++ threads[tid] = new Thread() { ++ @Override ++ public void run() { ++ try { ++ Random r = new Random(seed); ++ startingGun.await(); ++ for (int i = 0; i < 10; i++) { ++ int numInts = TestUtil.nextInt(r, 1, 20000); ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ int theValue = r.nextInt(); ++ for (int j = 0; j < numInts; j++) { ++ if (r.nextInt(10) == 0) { ++ theValue = r.nextInt(); ++ } ++ bos.write((byte) (theValue >>> 24)); ++ bos.write((byte) (theValue >>> 16)); ++ bos.write((byte) (theValue >>> 8)); ++ bos.write((byte) theValue); ++ } ++ doTest(bos.toByteArray()); ++ } ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ } ++ }; ++ threads[tid].start(); ++ } ++ startingGun.countDown(); ++ for (Thread t : threads) { ++ t.join(); ++ } ++ } ++ ++ public void testRepetitionsS() throws IOException { ++ Random r = random(); ++ for (int i = 0; i < 10; i++) { ++ int numShorts = TestUtil.nextInt(r, 1, 40000); ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ short theValue = (short) r.nextInt(65535); ++ for (int j = 0; j < numShorts; j++) { ++ if (r.nextInt(10) == 0) { ++ theValue = (short) r.nextInt(65535); ++ } ++ bos.write((byte) (theValue >>> 8)); ++ bos.write((byte) theValue); ++ } ++ doTest(bos.toByteArray()); ++ } ++ } ++ ++ public void testMixed() throws IOException { ++ Random r = random(); ++ LineFileDocs lineFileDocs = new LineFileDocs(r); ++ for (int i = 0; i < 2; ++i) { ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ int prevInt = r.nextInt(); ++ long prevLong = r.nextLong(); ++ while (bos.size() < 400000) { ++ switch (r.nextInt(4)) { ++ case 0: ++ addInt(r, prevInt, bos); ++ break; ++ case 1: ++ addLong(r, prevLong, bos); ++ break; ++ case 2: ++ addString(lineFileDocs, bos); ++ break; ++ case 3: ++ addBytes(r, bos); ++ break; ++ default: ++ throw new IllegalStateException("Random is broken"); ++ } ++ } ++ doTest(bos.toByteArray()); ++ } ++ } ++ ++ private void addLong(Random r, long prev, ByteArrayOutputStream bos) { ++ long theValue = prev; ++ if (r.nextInt(10) != 0) { ++ theValue = r.nextLong(); ++ } ++ bos.write((byte) (theValue >>> 56)); ++ bos.write((byte) (theValue >>> 48)); ++ bos.write((byte) (theValue >>> 40)); ++ bos.write((byte) (theValue >>> 32)); ++ bos.write((byte) (theValue >>> 24)); ++ bos.write((byte) (theValue >>> 16)); ++ bos.write((byte) (theValue >>> 8)); ++ bos.write((byte) theValue); ++ } ++ ++ private void addInt(Random r, int prev, ByteArrayOutputStream bos) { ++ int theValue = prev; ++ if (r.nextInt(10) != 0) { ++ theValue = r.nextInt(); ++ } ++ bos.write((byte) (theValue >>> 24)); ++ bos.write((byte) (theValue >>> 16)); ++ bos.write((byte) (theValue >>> 8)); ++ bos.write((byte) theValue); ++ } ++ ++ private void addString(LineFileDocs lineFileDocs, ByteArrayOutputStream bos) throws IOException { ++ String s = lineFileDocs.nextDoc().get("body"); ++ bos.write(s.getBytes(StandardCharsets.UTF_8)); ++ } ++ ++ private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { ++ byte bytes[] = new byte[TestUtil.nextInt(r, 1, 10000)]; ++ r.nextBytes(bytes); ++ bos.write(bytes); ++ } ++ ++ public void testRepetitionsSThreads() throws Exception { ++ final Random r = random(); ++ int threadCount = TestUtil.nextInt(r, 2, 6); ++ Thread[] threads = new Thread[threadCount]; ++ final CountDownLatch startingGun = new CountDownLatch(1); ++ for (int tid=0; tid < threadCount; tid++) { ++ final long seed = r.nextLong(); ++ threads[tid] = new Thread() { ++ @Override ++ public void run() { ++ try { ++ Random r = new Random(seed); ++ startingGun.await(); ++ for (int i = 0; i < 10; i++) { ++ int numShorts = TestUtil.nextInt(r, 1, 40000); ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ short theValue = (short) r.nextInt(65535); ++ for (int j = 0; j < numShorts; j++) { ++ if (r.nextInt(10) == 0) { ++ theValue = (short) r.nextInt(65535); ++ } ++ bos.write((byte) (theValue >>> 8)); ++ bos.write((byte) theValue); ++ } ++ doTest(bos.toByteArray()); ++ } ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ } ++ }; ++ threads[tid].start(); ++ } ++ startingGun.countDown(); ++ for (Thread t : threads) { ++ t.join(); ++ } ++ } ++ ++ private void doTest(byte bytes[]) throws IOException { ++ ByteBuffer bb = ByteBuffer.wrap(bytes); ++ StreamInput rawIn = new ByteBufferStreamInput(bb); ++ Compressor c = compressor; ++ ++ ByteArrayOutputStream bos = new ByteArrayOutputStream(); ++ OutputStreamStreamOutput rawOs = new OutputStreamStreamOutput(bos); ++ StreamOutput os = c.streamOutput(rawOs); ++ ++ Random r = random(); ++ int bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000); ++ int prepadding = r.nextInt(70000); ++ int postpadding = r.nextInt(70000); ++ byte buffer[] = new byte[prepadding + bufferSize + postpadding]; ++ r.nextBytes(buffer); // fill block completely with junk ++ int len; ++ while ((len = rawIn.read(buffer, prepadding, bufferSize)) != -1) { ++ os.write(buffer, prepadding, len); ++ } ++ os.close(); ++ rawIn.close(); ++ ++ // now we have compressed byte array ++ ++ byte compressed[] = bos.toByteArray(); ++ ByteBuffer bb2 = ByteBuffer.wrap(compressed); ++ StreamInput compressedIn = new ByteBufferStreamInput(bb2); ++ StreamInput in = c.streamInput(compressedIn); ++ ++ // randomize constants again ++ bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000); ++ prepadding = r.nextInt(70000); ++ postpadding = r.nextInt(70000); ++ buffer = new byte[prepadding + bufferSize + postpadding]; ++ r.nextBytes(buffer); // fill block completely with junk ++ ++ ByteArrayOutputStream uncompressedOut = new ByteArrayOutputStream(); ++ while ((len = in.read(buffer, prepadding, bufferSize)) != -1) { ++ uncompressedOut.write(buffer, prepadding, len); ++ } ++ uncompressedOut.close(); ++ ++ assertArrayEquals(bytes, uncompressedOut.toByteArray()); ++ } ++} +diff --git a/server/src/test/java/org/elasticsearch/common/compress/QatCompressedXContentTests.java b/server/src/test/java/org/elasticsearch/common/compress/QatCompressedXContentTests.java +new file mode 100644 +index 00000000000..b1ee13ff528 +--- /dev/null ++++ b/server/src/test/java/org/elasticsearch/common/compress/QatCompressedXContentTests.java +@@ -0,0 +1,100 @@ ++/* ++ * Licensed to Elasticsearch under one or more contributor ++ * license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ++ * ownership. Elasticsearch licenses this file to you under ++ * the Apache License, Version 2.0 (the "License"); you may ++ * not use this file except in compliance with the License. ++ * You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package org.elasticsearch.common.compress; ++ ++import org.apache.lucene.util.TestUtil; ++import org.elasticsearch.common.bytes.BytesReference; ++import org.elasticsearch.common.io.stream.BytesStreamOutput; ++import org.elasticsearch.common.io.stream.StreamOutput; ++import org.elasticsearch.test.ESTestCase; ++import org.junit.Assert; ++ ++import java.io.IOException; ++import java.util.Random; ++ ++import static org.hamcrest.Matchers.equalTo; ++import static org.hamcrest.Matchers.not; ++ ++public class QatCompressedXContentTests extends ESTestCase { ++ ++ private final Compressor compressor = new QatCompressor(); ++ ++ private void assertEquals(CompressedXContent s1, CompressedXContent s2) { ++ Assert.assertEquals(s1, s2); ++ assertArrayEquals(s1.uncompressed(), s2.uncompressed()); ++ assertEquals(s1.hashCode(), s2.hashCode()); ++ } ++ ++ public void simpleTests() throws IOException { ++ String str = "---\nf:this is a simple string"; ++ CompressedXContent cstr = new CompressedXContent(str); ++ assertThat(cstr.string(), equalTo(str)); ++ assertThat(new CompressedXContent(str), equalTo(cstr)); ++ ++ String str2 = "---\nf:this is a simple string 2"; ++ CompressedXContent cstr2 = new CompressedXContent(str2); ++ assertThat(cstr2.string(), not(equalTo(str))); ++ assertThat(new CompressedXContent(str2), not(equalTo(cstr))); ++ assertEquals(new CompressedXContent(str2), cstr2); ++ } ++ ++ public void testRandom() throws IOException { ++ Random r = random(); ++ for (int i = 0; i < 1000; i++) { ++ String string = TestUtil.randomUnicodeString(r, 10000); ++ // hack to make it detected as YAML ++ string = "---\n" + string; ++ CompressedXContent compressedXContent = new CompressedXContent(string); ++ assertThat(compressedXContent.string(), equalTo(string)); ++ } ++ } ++ ++ public void testDifferentCompressedRepresentation() throws Exception { ++ byte[] b = "---\nf:abcdefghijabcdefghij".getBytes("UTF-8"); ++ BytesStreamOutput bout = new BytesStreamOutput(); ++ StreamOutput out = compressor.streamOutput(bout); ++ out.writeBytes(b); ++ out.flush(); ++ out.writeBytes(b); ++ out.close(); ++ final BytesReference b1 = bout.bytes(); ++ ++ bout = new BytesStreamOutput(); ++ out = compressor.streamOutput(bout); ++ out.writeBytes(b); ++ out.writeBytes(b); ++ out.close(); ++ final BytesReference b2 = bout.bytes(); ++ ++ // because of the intermediate flush, the two compressed representations ++ // are different. It can also happen for other reasons like if hash tables ++ // of different size are being used ++ assertFalse(b1.equals(b2)); ++ // we used the compressed representation directly and did not recompress ++ assertArrayEquals(BytesReference.toBytes(b1), new CompressedXContent(b1).compressed()); ++ assertArrayEquals(BytesReference.toBytes(b2), new CompressedXContent(b2).compressed()); ++ // but compressedstring instances are still equal ++ assertEquals(new CompressedXContent(b1), new CompressedXContent(b2)); ++ } ++ ++ public void testHashCode() throws IOException { ++ assertFalse(new CompressedXContent("{\"a\":\"b\"}").hashCode() == new CompressedXContent("{\"a\":\"c\"}").hashCode()); ++ } ++ ++} +diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +index 062c2f970b6..51de7137772 100644 +--- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java ++++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +@@ -413,7 +413,7 @@ public abstract class ESIntegTestCase extends ESTestCase { + // otherwise, use it, it has assertions and so on that can find bugs. + SuppressCodecs annotation = getClass().getAnnotation(SuppressCodecs.class); + if (annotation != null && annotation.value().length == 1 && "*".equals(annotation.value()[0])) { +- randomSettingsBuilder.put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC)); ++ randomSettingsBuilder.put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC,CodecService.QAT_CODEC)); + } else { + randomSettingsBuilder.put("index.codec", CodecService.LUCENE_DEFAULT_CODEC); + } +diff --git a/x-pack/plugin/sql/sql-action/build.gradle b/x-pack/plugin/sql/sql-action/build.gradle +index 4e74d9be353..1c60ab0e8b0 100644 +--- a/x-pack/plugin/sql/sql-action/build.gradle ++++ b/x-pack/plugin/sql/sql-action/build.gradle +@@ -19,7 +19,8 @@ dependencies { + transitive = false + } + compile xpackProject('plugin:sql:sql-proto') +- compile "org.apache.lucene:lucene-core:${versions.lucene}" ++ compile fileTree(dir: '../buildSrc/libs', includes: ['*jar']) ++ //compile "org.apache.lucene:lucene-core:${versions.lucene}" + compile "joda-time:joda-time:${versions.joda}" + runtime "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" + runtime "org.apache.logging.log4j:log4j-api:${versions.log4j}" diff --git a/es_qat_wrapper/7.6.1/lucene-8.4.0/lucene_8_4_0.diff b/es_qat_wrapper/7.6.1/lucene-8.4.0/lucene_8_4_0.diff new file mode 100644 index 0000000..cecfee8 --- /dev/null +++ b/es_qat_wrapper/7.6.1/lucene-8.4.0/lucene_8_4_0.diff @@ -0,0 +1,437 @@ +diff --git a/lucene/build.xml b/lucene/build.xml +index e3cf905c97..01c6fa942d 100644 +--- a/lucene/build.xml ++++ b/lucene/build.xml +@@ -75,6 +75,9 @@ + + ++ ++ ++ + + + +diff --git a/lucene/core/build.xml b/lucene/core/build.xml +index a6302728c1..495657a7da 100644 +--- a/lucene/core/build.xml ++++ b/lucene/core/build.xml +@@ -29,7 +29,10 @@ + + + +- ++ ++ ++ ++ + + + +@@ -43,6 +46,7 @@ + + + ++ + + + +diff --git a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressionMode.java b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressionMode.java +index 53a84cbdd5..7ca534d1d2 100644 +--- a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressionMode.java ++++ b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressionMode.java +@@ -28,6 +28,9 @@ import org.apache.lucene.store.DataOutput; + import org.apache.lucene.util.ArrayUtil; + import org.apache.lucene.util.BytesRef; + ++import com.intel.qat.jni.QatCompressorJNI; ++import com.intel.qat.jni.QatDecompressorJNI; ++ + /** + * A compression mode. Tells how much effort should be spent on compression and + * decompression of stored fields. +@@ -113,6 +116,27 @@ public abstract class CompressionMode { + + }; + ++ /** ++ * This compression mode is using the QAT ++ */ ++ public static final CompressionMode QAT = new CompressionMode() { ++ ++ @Override ++ public Compressor newCompressor() { ++ return new QatCompressor(); ++ } ++ ++ @Override ++ public Decompressor newDecompressor() { ++ return new QatDecompressor(); ++ } ++ ++ @Override ++ public String toString() { ++ return "QAT"; ++ } ++ }; ++ + /** Sole constructor. */ + protected CompressionMode() {} + +@@ -296,4 +320,95 @@ public abstract class CompressionMode { + + } + ++ private static final class QatDecompressor extends Decompressor { ++ ++ byte[] compressed; ++ ++ QatDecompressor() { ++ compressed = new byte[0]; ++ } ++ ++ @Override ++ public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { ++ assert offset + length <= originalLength; ++ if (length == 0) { ++ bytes.length = 0; ++ return; ++ } ++ final int compressedLength = in.readVInt(); ++ compressed = new byte[compressedLength]; ++ in.readBytes(compressed, 0, compressedLength); ++ ++ final QatDecompressorJNI decompressor = new QatDecompressorJNI(); ++ ++ try { ++ decompressor.setInput(compressed, 0, compressedLength); ++ bytes.offset = bytes.length = 0; ++ bytes.bytes = ArrayUtil.grow(bytes.bytes, originalLength); ++ try { ++ bytes.length = decompressor.decompress(bytes.bytes, bytes.length, originalLength); ++ } catch (Error e) { ++ throw new Error(e); ++ } ++ ++ if (!decompressor.finished()) { ++ throw new CorruptIndexException("Invalid decoder state in QAT decompressor: needsInput=" + decompressor.needsInput() ++ + ", needsDict=" + decompressor.needsDictionary(), in); ++ } ++ } finally { ++ decompressor.end(); ++ } ++ if (bytes.length != originalLength) { ++ throw new CorruptIndexException("Lengths mismatch in QAT decompressor: " + bytes.length + " != " + originalLength, in); ++ } ++ bytes.offset = offset; ++ bytes.length = length; ++ } ++ ++ @Override ++ public Decompressor clone() { ++ return new QatDecompressor(); ++ } ++ } ++ ++ private static class QatCompressor extends Compressor { ++ ++ final QatCompressorJNI compressor; ++ byte[] compressed; ++ boolean closed; ++ ++ QatCompressor() { ++ compressor = new QatCompressorJNI(); ++ compressed = new byte[64]; ++ } ++ ++ @Override ++ public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException { ++ compressor.reset(); ++ compressor.setInput(bytes, off, len); ++ compressor.finish(); ++ ++ int totalCount = 0; ++ while (!compressor.finished() ) { ++ final int count = compressor.compress(compressed, totalCount, compressed.length - totalCount); ++ totalCount += count; ++ assert totalCount <= compressed.length; ++ if (compressor.finished()) { ++ break; ++ } else { ++ compressed = ArrayUtil.grow(compressed); ++ } ++ } ++ out.writeVInt(totalCount); ++ out.writeBytes(compressed, totalCount); ++ } ++ ++ @Override ++ public void close() throws IOException { ++ if (closed == false) { ++ compressor.end(); ++ closed = true; ++ } ++ } ++ } + } +diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50StoredFieldsFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50StoredFieldsFormat.java +index fdfba5b767..9232951d58 100644 +--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50StoredFieldsFormat.java ++++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50StoredFieldsFormat.java +@@ -145,7 +145,9 @@ public final class Lucene50StoredFieldsFormat extends StoredFieldsFormat { + /** Trade compression ratio for retrieval speed. */ + BEST_SPEED, + /** Trade retrieval speed for compression ratio. */ +- BEST_COMPRESSION ++ BEST_COMPRESSION, ++ /** QAT */ ++ QAT + } + + /** Attribute key for compression mode. */ +@@ -189,6 +191,8 @@ public final class Lucene50StoredFieldsFormat extends StoredFieldsFormat { + return new CompressingStoredFieldsFormat("Lucene50StoredFieldsFast", CompressionMode.FAST, 1 << 14, 128, 1024); + case BEST_COMPRESSION: + return new CompressingStoredFieldsFormat("Lucene50StoredFieldsHigh", CompressionMode.HIGH_COMPRESSION, 61440, 512, 1024); ++ case QAT: ++ return new CompressingStoredFieldsFormat("Lucene50StoredFieldsQAT", CompressionMode.QAT, 61440, 512, 1024); + default: throw new AssertionError(); + } + } +diff --git a/lucene/core/src/test/org/apache/lucene/codecs/compressing/TestQatCompressionDecompressionMode.java b/lucene/core/src/test/org/apache/lucene/codecs/compressing/TestQatCompressionDecompressionMode.java +new file mode 100644 +index 0000000000..9bacf20caa +--- /dev/null ++++ b/lucene/core/src/test/org/apache/lucene/codecs/compressing/TestQatCompressionDecompressionMode.java +@@ -0,0 +1,26 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.lucene.codecs.compressing; ++ ++public class TestQatCompressionDecompressionMode extends AbstractTestCompressionMode { ++ @Override ++ public void setUp() throws Exception { ++ super.setUp(); ++ mode = CompressionMode.QAT; ++ } ++} +diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene50/TestLucene50StoredFieldsFormatQatCompression.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene50/TestLucene50StoredFieldsFormatQatCompression.java +new file mode 100644 +index 0000000000..d18d93cc20 +--- /dev/null ++++ b/lucene/core/src/test/org/apache/lucene/codecs/lucene50/TestLucene50StoredFieldsFormatQatCompression.java +@@ -0,0 +1,81 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.lucene.codecs.lucene50; ++ ++ ++import org.apache.lucene.codecs.Codec; ++import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat.Mode; ++import org.apache.lucene.codecs.lucene84.Lucene84Codec; ++import org.apache.lucene.document.Document; ++import org.apache.lucene.document.StoredField; ++import org.apache.lucene.index.BaseStoredFieldsFormatTestCase; ++import org.apache.lucene.index.DirectoryReader; ++import org.apache.lucene.index.IndexWriter; ++import org.apache.lucene.index.IndexWriterConfig; ++import org.apache.lucene.store.Directory; ++ ++import com.carrotsearch.randomizedtesting.generators.RandomPicks; ++ ++public class TestLucene50StoredFieldsFormatQatCompression extends BaseStoredFieldsFormatTestCase { ++ @Override ++ protected Codec getCodec() { ++ return new Lucene84Codec(Mode.QAT); ++ } ++ ++ /** ++ * Change compression params (leaving it the same for old segments) ++ * and tests that nothing breaks. ++ */ ++ public void testMixedCompressions() throws Exception { ++ Directory dir = newDirectory(); ++ for (int i = 0; i < 10; i++) { ++ IndexWriterConfig iwc = newIndexWriterConfig(); ++ iwc.setCodec(new Lucene84Codec(RandomPicks.randomFrom(random(), Mode.values()))); ++ IndexWriter iw = new IndexWriter(dir, newIndexWriterConfig()); ++ Document doc = new Document(); ++ doc.add(new StoredField("field1", "value1")); ++ doc.add(new StoredField("field2", "value2")); ++ iw.addDocument(doc); ++ if (random().nextInt(4) == 0) { ++ iw.forceMerge(1); ++ } ++ iw.commit(); ++ iw.close(); ++ } ++ ++ DirectoryReader ir = DirectoryReader.open(dir); ++ assertEquals(10, ir.numDocs()); ++ for (int i = 0; i < 10; i++) { ++ Document doc = ir.document(i); ++ assertEquals("value1", doc.get("field1")); ++ assertEquals("value2", doc.get("field2")); ++ } ++ ir.close(); ++ // checkindex ++ dir.close(); ++ } ++ ++ public void testInvalidOptions() { ++ expectThrows(NullPointerException.class, () -> { ++ new Lucene84Codec(null); ++ }); ++ ++ expectThrows(NullPointerException.class, () -> { ++ new Lucene50StoredFieldsFormat(null); ++ }); ++ } ++} +\ No newline at end of file +diff --git a/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java b/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java +index 4fd5e16cef..3c79716c1f 100644 +--- a/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java ++++ b/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java +@@ -36,7 +36,7 @@ public abstract class CompressingCodec extends FilterCodec { + * Create a random instance. + */ + public static CompressingCodec randomInstance(Random random, int chunkSize, int maxDocsPerChunk, boolean withSegmentSuffix, int blockSize) { +- switch (random.nextInt(4)) { ++ switch (random.nextInt(6)) { + case 0: + return new FastCompressingCodec(chunkSize, maxDocsPerChunk, withSegmentSuffix, blockSize); + case 1: +@@ -45,6 +45,10 @@ public abstract class CompressingCodec extends FilterCodec { + return new HighCompressionCompressingCodec(chunkSize, maxDocsPerChunk, withSegmentSuffix, blockSize); + case 3: + return new DummyCompressingCodec(chunkSize, maxDocsPerChunk, withSegmentSuffix, blockSize); ++ case 4: ++ return new QatCompressionCompressingCodec(chunkSize, maxDocsPerChunk, withSegmentSuffix, blockSize); ++ case 5: ++ return new QatCompressionCompressingCodec(); + default: + throw new AssertionError(); + } +diff --git a/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/QatCompressionCompressingCodec.java b/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/QatCompressionCompressingCodec.java +new file mode 100644 +index 0000000000..4ff32e682b +--- /dev/null ++++ b/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/QatCompressionCompressingCodec.java +@@ -0,0 +1,35 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.lucene.codecs.compressing; ++ ++/** CompressionCodec that uses {@link CompressionMode#QAT} */ ++public class QatCompressionCompressingCodec extends CompressingCodec { ++ ++ /** Constructor that allows to configure the chunk size. */ ++ public QatCompressionCompressingCodec(int chunkSize, int maxDocsPerChunk, boolean withSegmentSuffix, int blockSize) { ++ super("QatCompressionCompressingStoredFields", ++ withSegmentSuffix ? "QatCompressionCompressingStoredFields" : "", ++ CompressionMode.QAT, chunkSize, maxDocsPerChunk, blockSize); ++ } ++ ++ /** Default constructor. */ ++ public QatCompressionCompressingCodec() { ++ // we don't worry about zlib block overhead as it's ++ // not bad and try to save space instead: ++ this(60*1024, 512, false, 1024); ++ } ++} +diff --git a/lucene/test-framework/src/resources/META-INF/services/org.apache.lucene.codecs.Codec b/lucene/test-framework/src/resources/META-INF/services/org.apache.lucene.codecs.Codec +index 282f5dd20c..292c393196 100644 +--- a/lucene/test-framework/src/resources/META-INF/services/org.apache.lucene.codecs.Codec ++++ b/lucene/test-framework/src/resources/META-INF/services/org.apache.lucene.codecs.Codec +@@ -19,3 +19,4 @@ org.apache.lucene.codecs.compressing.FastCompressingCodec + org.apache.lucene.codecs.compressing.FastDecompressionCompressingCodec + org.apache.lucene.codecs.compressing.HighCompressionCompressingCodec + org.apache.lucene.codecs.compressing.dummy.DummyCompressingCodec ++org.apache.lucene.codecs.compressing.QatCompressionCompressingCodec +diff --git a/lucene/tools/build.xml b/lucene/tools/build.xml +index b245dce2e9..330fdeaaf8 100644 +--- a/lucene/tools/build.xml ++++ b/lucene/tools/build.xml +@@ -29,9 +29,12 @@ + + + ++ + + +- ++ ++ ++ + + + +diff --git a/lucene/tools/junit4/tests.policy b/lucene/tools/junit4/tests.policy +index 74949813b7..c31be9867c 100644 +--- a/lucene/tools/junit4/tests.policy ++++ b/lucene/tools/junit4/tests.policy +@@ -28,6 +28,12 @@ grant { + // should be enclosed within common.dir, but just in case: + permission java.io.FilePermission "${junit4.childvm.cwd}", "read"; + ++ // the system files ++ permission java.io.FilePermission "/lib64", "read,execute"; ++ permission java.io.FilePermission "/lib", "read,execute"; ++ permission java.io.FilePermission "/*", "read,write,execute"; ++ permission java.lang.RuntimePermission "loadLibrary.*"; ++ + // write only to sandbox + permission java.io.FilePermission "${junit4.childvm.cwd}${/}temp", "read,write,delete"; + permission java.io.FilePermission "${junit4.childvm.cwd}${/}temp${/}-", "read,write,delete"; +@@ -70,6 +76,7 @@ grant { + permission java.lang.RuntimePermission "getenv.*"; + permission java.lang.RuntimePermission "getClassLoader"; + permission java.lang.RuntimePermission "setContextClassLoader"; ++ permission java.lang.RuntimePermission "*"; + + // read access to all system properties: + permission java.util.PropertyPermission "*", "read"; diff --git a/es_qat_wrapper/MANIFEST.MF b/es_qat_wrapper/MANIFEST.MF new file mode 100644 index 0000000..37d43b7 --- /dev/null +++ b/es_qat_wrapper/MANIFEST.MF @@ -0,0 +1,12 @@ +Manifest-Version: 1.0 +Built-By: root +Created-By: Apache Maven 3.3.9 +Build-Jdk: 1.8.0_131 +Application-Name: One Applet +Codebase: * +Permissions: all-permissions +Application-Library-Allowable-Codebase: * +Caller-Allowable-Codebase: * +Trusted-Only: false +Trusted-Library: false + diff --git a/es_qat_wrapper/pom.xml b/es_qat_wrapper/pom.xml new file mode 100755 index 0000000..0283e04 --- /dev/null +++ b/es_qat_wrapper/pom.xml @@ -0,0 +1,224 @@ + + + + + qat-parent + com.intel.qat + 1.0.0 + ../pom.xml + + 4.0.0 + + es_qat_wrapper + + + + UTF-8 + UTF-8 + false + ./ + + 1.8 + + 3.3.9 + 3.6.1 + 1.4.1 + 2.19.1 + 3.0.2 + 1.8 + 3.0.1 + 3.0.0 + 2.10.4 + 3.0.0 + 3.0.0 + 2.5.2 + 2.8.2 + 3.0.0 + 2.17 + 4.12 + + + ${project.build.directory}/jars + + + prepare-package + package + + 512m + + + + + mavencentral + maven central repository + http://repo1.maven.org/maven2 + default + + true + warn + + + false + + + + datanucleus + datanucleus maven repository + http://www.datanucleus.org/downloads/maven2 + default + + true + warn + + + false + + + + glassfish-repository + http://maven.glassfish.org/content/groups/glassfish + + false + + + false + + + + sonatype-snapshot + https://oss.sonatype.org/content/repositories/snapshots + + false + + + false + + + + + + + central + https://repo1.maven.org/maven2 + + true + + + false + + + + + + + + org.apache.logging.log4j + log4j-api + 2.11.2 + + + org.apache.logging.log4j + log4j-core + 2.11.2 + + + + junit + junit + ${junit.version} + test + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + ${java.version} + ${java.version} + UTF-8 + 1024m + true + + -Xlint:all,-serial,-path + + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + prepare-test-jar + ${build.testJarPhase} + + test-jar + + + + log4j.properties + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + ${maven-antrun-plugin.version} + + + build-native + compile + + run + + + + + + + + + + + + clean-native + clean + + run + + + + + + + + + + + + + + + diff --git a/es_qat_wrapper/src/main/java/com/intel/qat/es/QatCompressionInputStream.java b/es_qat_wrapper/src/main/java/com/intel/qat/es/QatCompressionInputStream.java new file mode 100755 index 0000000..f183c0c --- /dev/null +++ b/es_qat_wrapper/src/main/java/com/intel/qat/es/QatCompressionInputStream.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.qat.es; + + +import com.intel.qat.jni.QatCodecJNI; +import com.intel.qat.util.buffer.BufferAllocator; +import com.intel.qat.util.buffer.CachedBufferAllocator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.EOFException; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * This class implements a stream filter for uncompressing data in the + * "qat" compression format. + */ +public class QatCompressionInputStream extends FilterInputStream { + private static final Logger LOG = LogManager.getLogger(QatCompressionInputStream.class); + private final BufferAllocator compressedBufferAllocator; + private final BufferAllocator uncompressedBufferAllocator; + private final ByteBuffer compressedBuffer; + private final ByteBuffer uncompressedBuffer; + private final BufferAllocator tempBufferAllocator; + + //Input buffer for decompression. + protected byte[] buf; + //Length of input buffer. + protected int len; + boolean useDefaultQatDecompressor = false; + private boolean closed; + // this flag is set to true after EOF has reached + private boolean reachEOF; + private long context; + private int compressedSize; + private int uncompressedSize; + private int originalLen; + private int uncompressedBufferPosition; + private byte[] tempBuffer; + private byte[] singleByteBuf = new byte[1]; + + /** + * Creates a new input stream with the specified decompressor and + * buffer size. + * + * @param in the input stream + * @param size the input buffer size + * @param useNativeBuffer identify the buffer type + * @throws IllegalArgumentException if {@code size <= 0} + */ + public QatCompressionInputStream(InputStream in, int size, boolean useNativeBuffer) { + super(in); + if (in == null) { + throw new NullPointerException(); + } else if (size <= 0) { + throw new IllegalArgumentException("buffer size <= 0"); + } + + this.compressedSize = size * 3 / 2; + this.uncompressedSize = size; + + // allocate the buffer + this.uncompressedBufferAllocator = CachedBufferAllocator + .getBufferAllocatorFactory().getBufferAllocator(uncompressedSize); + this.compressedBufferAllocator = CachedBufferAllocator + .getBufferAllocatorFactory().getBufferAllocator(compressedSize); + this.uncompressedBuffer = uncompressedBufferAllocator + .allocateDirectByteBuffer(useNativeBuffer, uncompressedSize, 64); + this.compressedBuffer = compressedBufferAllocator + .allocateDirectByteBuffer(useNativeBuffer, compressedSize, 64); + + if (null != uncompressedBuffer) { + uncompressedBuffer.clear(); + } + if (null != compressedBuffer) { + compressedBuffer.clear(); + } + + this.uncompressedBufferPosition = 0; + this.originalLen = 0; + + // cache compressed stream + tempBufferAllocator = CachedBufferAllocator + .getBufferAllocatorFactory().getBufferAllocator(compressedSize); + tempBuffer = tempBufferAllocator + .allocateByteArray(compressedSize); + + this.context = QatCodecJNI.createDecompressContext(); + buf = new byte[size]; + closed = false; + reachEOF = false; + + } + + + /** + * Creates a new input stream with the specified buffer and a + * default buffer size. + * + * @param in the input stream + * @param useNativeBuffer true if the buffer is native + */ + public QatCompressionInputStream(InputStream in, boolean useNativeBuffer) { + this(in, 512, useNativeBuffer); + } + + /*** + * Creates a new input stream with a default buffer and buffer size. + * @param in the input stream + */ + public QatCompressionInputStream(InputStream in) { + this(in, 512, true); + useDefaultQatDecompressor = true; + } + + /** + * Check to make sure that this stream has not been closed + */ + private void checkStream() throws IOException { + if (context == 0) { + throw new NullPointerException("This output stream's context is not initialized"); + } + if (closed) { + throw new IOException("stream closed"); + } + } + + /** + * @return 0 after EOF has been reached ,otherwise always return 1 (before EOF) + * @throws IOException + */ + @Override + public int available() throws IOException { + checkStream(); + return originalLen - uncompressedBufferPosition; + } + + /** + * Reads a byte of uncompressed data. This method will block until + * enough input is available for decompression. + * + * @return the byte read, or -1 if end of compressed input is reached + * @throws IOException if an I/O error has occurred + */ + public int read() throws IOException { + checkStream(); + return read(singleByteBuf, 0, 1) == -1 ? -1 : Byte.toUnsignedInt(singleByteBuf[0]); + } + + + /** + * Reads uncompressed data into an array of bytes. If len is not + * zero, the method will block until some input can be decompressed; otherwise, + * no bytes are read and 0 is returned. + * + * @param b the buffer into which the data is read + * @param off the start offset in the destination array b + * @param len the maximum number of bytes read + * @return the actual number of bytes read, or -1 if the end of the + * compressed input is reached + */ + public int read(byte[] b, int off, int len) throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new ArrayIndexOutOfBoundsException("BlockInputStream read requested lenght " + len + + " from offset " + off + " in buffer of size " + b.length); + } else if (len == 0) { + return 0; + } + + if (uncompressedBufferPosition == originalLen) { + fill(); + LOG.debug("--> Reads uncompressed(QAT) data into bytes"); + } + if (reachEOF) { + return -1; + } + + len = Math.min(len, originalLen - uncompressedBufferPosition); + + uncompressedBuffer.get(b, off, len); + uncompressedBufferPosition += len; + return len; + + } + + private void fill() throws IOException { + checkStream(); + int compressedLen = 0; + try { + compressedLen = readCompressedBlockLength(); + } catch (IOException e) { + reachEOF = true; + return; + } + + if (compressedBuffer.capacity() < compressedLen) { + throw new IOException("Input Stream is corrupted, compressed length large than " + compressedSize); + } + + readCompressedData(compressedBuffer, compressedLen); + + try { + final int uncompressed_size = QatCodecJNI.decompress(context, compressedBuffer, 0, compressedLen, uncompressedBuffer, 0, uncompressedSize); + originalLen = uncompressed_size; + } catch (RuntimeException e) { // need change + throw new IOException("Input Stream is corrupted, can't decompress", e); + } + + uncompressedBuffer.position(0); + uncompressedBuffer.limit(originalLen); + uncompressedBufferPosition = 0; + + } + + private int readCompressedBlockLength() throws IOException { + int b1 = in.read(); + int b2 = in.read(); + int b3 = in.read(); + int b4 = in.read(); + if ((b1 | b2 | b3 | b4) < 0) + throw new EOFException(); + return ((b4 << 24) + (b3 << 16) + (b2 << 8) + (b1 << 0)); + } + + private void readCompressedData(ByteBuffer bf, int len) throws IOException { + int read = 0; + assert bf.capacity() >= len; + bf.clear(); + + while (read < len) { + final int bytesToRead = Math.min(len - read, tempBuffer.length); + final int r = in.read(tempBuffer, 0, bytesToRead); + if (r < 0) { + throw new EOFException("Unexpected end of input stream"); + } + read += r; + bf.put(tempBuffer, 0, r); + } + + bf.flip(); + } + + + /** + * Skips specified number of bytes of uncompressed data. + * + * @param n the number of bytes to skip + * @return the actual number of bytes skipped. + * @throws IOException if an I/O error has occurred + * @throws IllegalArgumentException if {@code n < 0} + */ + public long skip(long n) throws IOException { + if (n < 0) { + throw new IllegalArgumentException("negative skip length"); + } + + checkStream(); + + if (uncompressedBufferPosition == originalLen) { + fill(); + LOG.debug("--> Decompress the data with QAT in skip <--"); + } + if (reachEOF) { + return -1; + } + + final int skipped = (int) Math.min(n, originalLen - uncompressedBufferPosition); + uncompressedBufferPosition += skipped; + uncompressedBuffer.position(uncompressedBufferPosition); + + return skipped; + } + + /** + * Closes this input stream and releases any system resources associated + * with the stream. + * + * @throws IOException if an I/O error has occurred + */ + public void close() throws IOException { + if (closed) { + return; + } + try { + in.close(); + } finally { + uncompressedBufferAllocator.releaseDirectByteBuffer(uncompressedBuffer); + compressedBufferAllocator.releaseDirectByteBuffer(compressedBuffer); + tempBufferAllocator.releaseByteArray(tempBuffer); + tempBuffer = null; + in = null; + QatCodecJNI.destroyContext(context); + context = 0; + closed = true; + } + } + + /** + * Tests if this input stream supports the mark and + * reset methods. The markSupported + * method returns false. + * + * @return a boolean indicating if this stream type supports + * the mark and reset methods. + * @see java.io.InputStream#mark(int) + * @see java.io.InputStream#reset() + */ + + public boolean markSupported() { + return false; + } + + @SuppressWarnings("sync-override") + @Override + public synchronized void mark(int readlimit) { + + } + + /** + * Repositions this stream to the position at the time the + * mark method was last called on this input stream. + * + *

The method reset does nothing except throw an + * IOException. + * + * @throws IOException if this method is invoked. + * @see java.io.InputStream#mark(int) + * @see java.io.IOException + */ + public synchronized void reset() throws IOException { + throw new IOException("mark/reset not supported"); + } +} diff --git a/es_qat_wrapper/src/main/java/com/intel/qat/es/QatCompressionOutputStream.java b/es_qat_wrapper/src/main/java/com/intel/qat/es/QatCompressionOutputStream.java new file mode 100755 index 0000000..cf61b34 --- /dev/null +++ b/es_qat_wrapper/src/main/java/com/intel/qat/es/QatCompressionOutputStream.java @@ -0,0 +1,290 @@ +package com.intel.qat.es; + +import com.intel.qat.jni.QatCodecJNI; +import com.intel.qat.util.buffer.BufferAllocator; +import com.intel.qat.util.buffer.CachedBufferAllocator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + + +public class QatCompressionOutputStream extends FilterOutputStream { + private static final Logger LOG = LogManager.getLogger(QatCompressionOutputStream.class); + private final boolean syncFlush; + private final BufferAllocator tempBufferAllocator; + protected byte[] buf; + private long context; + private int level; + private int compressedSize; + private int uncompressedSize; + private BufferAllocator compressedBufferAllocator; + private BufferAllocator uncompressedBufferAllocator; + private ByteBuffer compressedBuffer; + private ByteBuffer uncompressedBuffer; + private boolean closed; + private int uncompressedBufferPosition; + private byte[] tempBuffer; + + /** + * Create a new {@link OutputStream} with configurable codec level and block size. Large + * blocks require more memory at compression and decompression time but + * should improve the compression ratio. + * + * @param out the {@link OutputStream} to feed + * @param level the compression codec level + * @param size the maximum number of bytes to try to compress at once, + * must be >= 32 K + * @param useNativeBuffer to identify if use nativeBuffer or not + * @throws IllegalArgumentException if {@code size <= 0} + */ + public QatCompressionOutputStream(OutputStream out, + int level, int size, boolean useNativeBuffer) { + super(out); + + if (out == null) { + throw new NullPointerException(); + } else if (size <= 0) { + throw new IllegalArgumentException("buffer size <= 0"); + } + + this.level = level; + this.uncompressedSize = size; + this.compressedSize = size * 3 / 2; + this.uncompressedBufferAllocator = CachedBufferAllocator. + getBufferAllocatorFactory().getBufferAllocator(uncompressedSize); + this.compressedBufferAllocator = CachedBufferAllocator. + getBufferAllocatorFactory().getBufferAllocator(compressedSize); + this.uncompressedBuffer = uncompressedBufferAllocator. + allocateDirectByteBuffer(useNativeBuffer, uncompressedSize, 64); + this.compressedBuffer = compressedBufferAllocator. + allocateDirectByteBuffer(useNativeBuffer, compressedSize, 64); + if (uncompressedBuffer != null) { + uncompressedBuffer.clear(); + } + + if (compressedBuffer != null) { + compressedBuffer.clear(); + } + + uncompressedBufferPosition = 0; + closed = false; + + tempBufferAllocator = CachedBufferAllocator.getBufferAllocatorFactory(). + getBufferAllocator(compressedSize); + tempBuffer = tempBufferAllocator.allocateByteArray(compressedSize); + + context = QatCodecJNI.createCompressContext(level); + LOG.debug("Create Qat OutputStream with level " + level); + + this.buf = new byte[size]; + this.syncFlush = false; + } + + /** + * Creates a new output stream with the default buffer. + * + * @param out the output stream + * @param level the compression codec level + * @param size the maximum number of bytes to try to compress at once, + * must be >= 32 K + * @throws IllegalArgumentException if {@code size <= 0} + */ + public QatCompressionOutputStream(OutputStream out, int level, int size) { + this(out, level, size, false); + } + + /** + * Creates a new output stream with the default size. + * + * @param out the output stream + * @param level the compression codec level + * @param useNativeBuffer to identify if use nativeBuffer or not + * @throws IllegalArgumentException if {@code size <= 0} + */ + + public QatCompressionOutputStream(OutputStream out, int level, boolean useNativeBuffer) { + this(out, level, 512, useNativeBuffer); + } + + /** + * Creates a new output stream with the default buffer size and level. + *

The new output stream instance is created as if by invoking + * the 3-argument constructor QatOutputStream(out,level,useNativeBuffer). + * + * @param out the output stream + * @param useNativeBuffer to identify the buffer + */ + + public QatCompressionOutputStream(OutputStream out, boolean useNativeBuffer) { + this(out, 3, useNativeBuffer); + } + + + /** + * Creates a new output stream with the default buffer and level. + *

The new output stream instance is created as if by invoking + * the 3-argument constructor QatOutputStream(out, level, size). + * + * @param out the output stream + * @param size the maximum number of bytes to try to compress at once, + * must be >= 32 K + */ + public QatCompressionOutputStream(OutputStream out, int size) { + this(out, 3, size); + } + + /** + * Creates a new output stream with the default buffer. + * + *

The new output stream instance is created as if by invoking + * the 2-argument constructor QatOutputStream(out, false). + * + * @param out the output stream + */ + public QatCompressionOutputStream(OutputStream out) { + this(out, false); + } + + private static void WriteIntLE(int i, byte[] buf, int off) { + buf[off] = (byte) i; + buf[off + 1] = (byte) (i >>> 8); + buf[off + 2] = (byte) (i >>> 16); + buf[off + 3] = (byte) (i >>> 24); + + } + + private void checkStream() { + if (context == 0) { + throw new NullPointerException(); + } + if (closed) { + throw new IllegalStateException("The output stream has been closed"); + } + } + + /** + * Writes a byte to the compressed output stream. This method will + * block until the byte can be written. + * + * @param b the byte to be written + * @throws IOException if an I/O error has occurred + */ + public void write(int b) throws IOException { + byte[] buf = new byte[1]; + buf[0] = (byte) (b & 0xff); + write(buf, 0, 1); + } + + /** + * Writes an array of bytes to the compressed output stream. This + * method will block until all the bytes are written. + * + * @param b the data to be written + * @param off the start offset of the data + * @param len the length of the data + * @throws IOException if an I/O error has occurred + */ + public void write(byte[] b, int off, int len) throws IOException { + checkStream(); + + if (b == null) { + throw new NullPointerException(); + } + if (len < 0 || off < 0 || len > b.length - off) { + throw new ArrayIndexOutOfBoundsException("The output stream need length " + len + " from offset " + off + " in buffer of size " + b.length); + } + + while (uncompressedBufferPosition + len > uncompressedSize) { + int left = uncompressedSize - uncompressedBufferPosition; + uncompressedBuffer.put(b, off, left); + uncompressedBufferPosition = uncompressedSize; + + compressedBufferData(); + + off += left; + len -= left; + + } + + uncompressedBuffer.put(b, off, len); + uncompressedBufferPosition += len; + LOG.debug("--> Writes bytes to the compressed(QAT) output stream"); + } + + private void compressedBufferData() throws IOException { + if (uncompressedBufferPosition == 0) { + return; + } + int compressedLen = QatCodecJNI.compress(context, uncompressedBuffer, 0, uncompressedBufferPosition, compressedBuffer, 0, compressedSize); + + WriteIntLE(compressedLen, tempBuffer, 0); + + compressedBuffer.position(0); + compressedBuffer.limit(compressedLen); + + int totalWrite = 0; + int off = 4; + + while (totalWrite < compressedLen) { + int byteToWrite = Math.min((compressedLen - totalWrite), (tempBuffer.length - off)); + compressedBuffer.get(tempBuffer, off, byteToWrite); + out.write(tempBuffer, 0, byteToWrite + off); + totalWrite += byteToWrite; + off = 0; + } + uncompressedBuffer.clear(); + compressedBuffer.clear(); + uncompressedBufferPosition = 0; + } + + /** + * Finishes writing compressed data to the output stream without closing + * the underlying stream. Use this method when applying multiple filters + * in succession to the same output stream. + * + * @throws IOException if an I/O error has occurred + */ + public void finish() throws IOException { + checkStream(); + compressedBufferData(); + LOG.debug("--> Finishes writing compressed(QAT) data to the output stream"); + out.flush(); + } + + /** + * Writes remaining compressed data to the output stream and closes the + * underlying stream. + * + * @throws IOException if an I/O error has occurred + */ + public void close() throws IOException { + if (closed) { + return; + } + try { + finish(); + out.close(); + } finally { + closed = true; + uncompressedBufferAllocator.releaseDirectByteBuffer(uncompressedBuffer); + compressedBufferAllocator.releaseDirectByteBuffer(compressedBuffer); + tempBufferAllocator.releaseByteArray(tempBuffer); + tempBuffer = null; + out = null; + QatCodecJNI.destroyContext(context); + context = 0; + LOG.debug("Close Qat OutputStream with level " + level); + } + } + + public void flush() throws IOException { + checkStream(); + compressedBufferData(); + LOG.debug("--> Flush the data(QAT Compress)"); + out.flush(); + } +} diff --git a/es_qat_wrapper/src/main/java/com/intel/qat/jni/QatCodecJNI.java b/es_qat_wrapper/src/main/java/com/intel/qat/jni/QatCodecJNI.java new file mode 100755 index 0000000..24876ea --- /dev/null +++ b/es_qat_wrapper/src/main/java/com/intel/qat/jni/QatCodecJNI.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.qat.jni; + +import com.intel.qat.util.NativeCodeLoader; + +import java.nio.ByteBuffer; + +/** + * JNI bindings to the original C implementation of QatCodec. + */ +public class QatCodecJNI { + static { + NativeCodeLoader.load(); + init(); + } + + static native void init(); + + public static native Object allocNativeBuffer(int capacity, int align); + + public static native long createCompressContext(int level); + + public static native long createDecompressContext(); + + public static native void destroyContext(long context); + + public static native int compress(long context, ByteBuffer srcBuffer, int srcOff, int srcLen, + ByteBuffer destBuffer, int destOff, int maxDestLen); + + public static native int decompress(long context, ByteBuffer srcBuffer, int srcOff, int srcLen, + ByteBuffer destBuffer, int destOff, int destLen); + + public static native String getLibraryName(int codec); +} + diff --git a/es_qat_wrapper/src/main/java/com/intel/qat/util/NativeCodeLoader.java b/es_qat_wrapper/src/main/java/com/intel/qat/util/NativeCodeLoader.java new file mode 100755 index 0000000..e0ebe3a --- /dev/null +++ b/es_qat_wrapper/src/main/java/com/intel/qat/util/NativeCodeLoader.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.qat.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; + +public class NativeCodeLoader { + + private static final Logger LOG = LogManager.getLogger(NativeCodeLoader.class); + + private static final String LIBRARY_NAME = "QatCodecEs"; + private static boolean nativeCodeLoaded = false; + + static { + // Try to load native library + LOG.info("Trying to load the native library..."); + load(); + } + + private static String arch() { + String archName = System.getProperty("os.arch"); + if (archName.contains("amd64")) { + return archName; + } else { + throw new UnsupportedOperationException("Unsupported arch: " + + archName); + } + } + + private static OS os() { + String osName = System.getProperty("os.name"); + if (osName.contains("Linux")) { + return OS.LINUX; + } else if (osName.contains("Mac")) { + return OS.MAC; + } else if (osName.contains("Windows")) { + return OS.WINDOWS; + } else { + throw new UnsupportedOperationException("Unsupported operating system: " + osName); + } + } + + private static String resourceName() { + OS os = os(); + + return "/com/intel/qat/native/lib/" + os.name + "/" + arch() + + "/lib" + LIBRARY_NAME + "." + os.libExtension; + } + + public static synchronized boolean isNativeCodeLoaded() { + return nativeCodeLoaded; + } + + public static synchronized void load() { + + if (nativeCodeLoaded) { + return; + } + + // Try to load library from the java.library. + try { + System.loadLibrary(LIBRARY_NAME); + nativeCodeLoaded = true; + LOG.info("Loaded native lib" + LIBRARY_NAME + "." + os().libExtension + + " from the system library path"); + return; + } catch (UnsatisfiedLinkError ex) { + // Doesn't exist, so proceed to loading bundled library. + } + + String resourceName = resourceName(); + + InputStream is = NativeCodeLoader.class.getResourceAsStream(resourceName); + if (is == null) { + throw new UnsupportedOperationException("Unsupported OS/arch, cannot find " + + resourceName + ". Please try building from source."); + } + File tempLib; + try { + tempLib = File.createTempFile("lib" + LIBRARY_NAME, "." + os().libExtension); + // copy to tempLib + FileOutputStream out = new FileOutputStream(tempLib); + try { + byte[] buf = new byte[4096]; + while (true) { + int read = is.read(buf); + if (read == -1) { + break; + } + out.write(buf, 0, read); + } + try { + out.close(); + out = null; + } catch (IOException e) { + // ignore + } + try { + System.load(tempLib.getAbsolutePath()); + } catch (UnsatisfiedLinkError e) { + LOG.info("Failed to load native lib" + LIBRARY_NAME + "." + os().libExtension + + " from the embedded jar package"); + throw e; + } + nativeCodeLoaded = true; + LOG.info("Loaded native lib" + LIBRARY_NAME + "." + os().libExtension + + " from the embedded jar package"); + } finally { + try { + if (out != null) { + out.close(); + } + } catch (IOException e) { + // ignore + } + if (tempLib != null && tempLib.exists()) { + if (!nativeCodeLoaded) { + tempLib.delete(); + } else { + // try to delete on exit, does it work on Windows? + tempLib.deleteOnExit(); + } + } + } + } catch (IOException e) { + LOG.error("Failed to load native lib" + LIBRARY_NAME + "." + os().libExtension); + throw new ExceptionInInitializerError(new Throwable("Cannot unpack " + LIBRARY_NAME, e)); + } + } + + private enum OS { + WINDOWS("Windows", "dll"), LINUX("Linux", "so"), MAC("Mac", "dylib"); + public final String name, libExtension; + + OS(String name, String libExtension) { + this.name = name; + this.libExtension = libExtension; + } + } + +} diff --git a/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/BufferAllocator.java b/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/BufferAllocator.java new file mode 100755 index 0000000..67717ae --- /dev/null +++ b/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/BufferAllocator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.qat.util.buffer; + +import java.nio.ByteBuffer; + +/** + * BufferAllocator interface. The implementation of this interface must be thread-safe + */ +public interface BufferAllocator { + + ByteBuffer allocateDirectByteBuffer(boolean useNativeBuffer, int size, int align); + + void releaseDirectByteBuffer(ByteBuffer buffer); + + byte[] allocateByteArray(int size); + + void releaseByteArray(byte[] buffer); +} diff --git a/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/BufferAllocatorFactory.java b/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/BufferAllocatorFactory.java new file mode 100755 index 0000000..3eebb85 --- /dev/null +++ b/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/BufferAllocatorFactory.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.qat.util.buffer; + +public interface BufferAllocatorFactory { + + BufferAllocator getBufferAllocator(int minSize); +} + diff --git a/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/CachedBufferAllocator.java b/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/CachedBufferAllocator.java new file mode 100755 index 0000000..1208840 --- /dev/null +++ b/es_qat_wrapper/src/main/java/com/intel/qat/util/buffer/CachedBufferAllocator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.qat.util.buffer; + +import com.intel.qat.jni.QatCodecJNI; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * Cached buffer + */ +public class CachedBufferAllocator implements BufferAllocator { + private static final Logger LOG = LogManager.getLogger(CachedBufferAllocator.class); + //Use SoftReference so that having this queueTable does not prevent the GC of CachedBufferAllocator instances + private static final Map> queueTable = new HashMap>(); + private static BufferAllocatorFactory factory = new BufferAllocatorFactory() { + @Override + public BufferAllocator getBufferAllocator(int bufferSize) { + return CachedBufferAllocator.getAllocator(bufferSize); + } + }; + private final int bufferSize; + private final Deque directByteBufferQueue; + private final Deque byteArrayQueue; + + public CachedBufferAllocator(int bufferSize) { + this.bufferSize = bufferSize; + this.byteArrayQueue = new ArrayDeque(); + this.directByteBufferQueue = new ArrayDeque(); + } + + public static BufferAllocatorFactory getBufferAllocatorFactory() { + return factory; + } + + public static void setBufferAllocatorFactory(BufferAllocatorFactory factory) { + assert (factory != null); + CachedBufferAllocator.factory = factory; + } + + public static synchronized CachedBufferAllocator getAllocator(int bufferSize) { + CachedBufferAllocator result = null; + + if (queueTable.containsKey(bufferSize)) { + result = queueTable.get(bufferSize).get(); + } + if (result == null) { + result = new CachedBufferAllocator(bufferSize); + queueTable.put(bufferSize, new SoftReference(result)); + } + return result; + } + + @Override + public ByteBuffer allocateDirectByteBuffer(boolean useNativeBuffer, int size, int align) { + synchronized (this) { + if (directByteBufferQueue.isEmpty()) { + if (useNativeBuffer) { + try { + return (ByteBuffer) QatCodecJNI.allocNativeBuffer(size, align); + } catch (Throwable t) { + LOG.error("Native buffer allocation is failed and fall back to direct allocation."); + return ByteBuffer.allocateDirect(size); + } + } + return ByteBuffer.allocateDirect(size); + } else { + return directByteBufferQueue.pollFirst(); + } + } + } + + @Override + public void releaseDirectByteBuffer(ByteBuffer buffer) { + synchronized (this) { + directByteBufferQueue.addLast(buffer); + } + } + + @Override + public byte[] allocateByteArray(int size) { + synchronized (this) { + if (byteArrayQueue.isEmpty()) { + return new byte[size]; + } else { + return byteArrayQueue.pollFirst(); + } + } + } + + @Override + public void releaseByteArray(byte[] buffer) { + synchronized (this) { + byteArrayQueue.addLast(buffer); + } + } +} diff --git a/es_qat_wrapper/src/main/native/Makefile b/es_qat_wrapper/src/main/native/Makefile new file mode 100755 index 0000000..3aaa76f --- /dev/null +++ b/es_qat_wrapper/src/main/native/Makefile @@ -0,0 +1,83 @@ +#/* +# * Licensed to the Apache Software Foundation (ASF) under one or more +# * contributor license agreements. See the NOTICE file distributed with +# * this work for additional information regarding copyright ownership. +# * The ASF licenses this file to You under the Apache License, Version 2.0 +# * (the "License"); you may not use this file except in compliance with +# * the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +ifeq ($(VERBOSE),1) +ECHO := +else +ECHO := @ +endif + +# A function to check whether every environment variable in a list is defined. +check = $(foreach var,$(1),$(if $($(var)),,$(error Please set $(var)))) + +TARGET := libQatCodecEs.so + +MODULES := + +# Source files. +SRCS := $(foreach D,$(MODULES),$(wildcard $D/*.c)) QatCodecJNI.c + +# Include files. +INCLUDES := $(addprefix -I,$(MODULES)) \ + -I$(JAVA_HOME)/include \ + -I$(JAVA_HOME)/include/linux \ + -I $(QATZIPSRC)/include \ + -I. + +LIB_DIRS := + +# Libraries. +LIBS := dl + +CPPFLAGS += $(INCLUDES) +CFLAGS ?= -O3 +CFLAGS += -Wall -fstack-protector-all -D_FORTIFY_SOURCE=2 +CFLAGS += -Wformat -Wformat-security -Werror=format-security +CFLAGS += -fPIC + +ifeq ($(DEBUG),1) +CFLAGS += -g +endif + +SONAME_FLAGS = -Wl,-soname=$(TARGET) +LDFLAGS += -shared + +all: jni $(TARGET) + @echo "Build native library done" + +$(TARGET) : $(SRCS) + @echo Compiling dynamic library + $(ECHO)$(CC) $(CPPFLAGS) $(CFLAGS) \ + $(SRCS) \ + $(LDFLAGS) $(SONAME_FLAGS) \ + $(foreach D,$(LIB_DIRS),-L$D) \ + $(foreach L,$(LIBS),-l$L) \ + -o $@ + $(ECHO)mkdir -p ../../../target/classes/com/intel/qat/native/lib/Linux/amd64/ + $(ECHO)cp $(TARGET) ../../../target/classes/com/intel/qat/native/lib/Linux/amd64/ + +jni: + @echo Generating jni header + $(call check,JAVA_HOME) + $(ECHO)$(JAVA_HOME)/bin/javah -d jni-header -cp ../../../target/classes \ + com.intel.qat.jni.QatCodecJNI + +clean: + $(ECHO)rm -rf *.so *.o jni-header + @echo Cleaning completed + +.PHONY : all clean diff --git a/es_qat_wrapper/src/main/native/QatCodecJNI.c b/es_qat_wrapper/src/main/native/QatCodecJNI.c new file mode 100755 index 0000000..79b6fb5 --- /dev/null +++ b/es_qat_wrapper/src/main/native/QatCodecJNI.c @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include + +#include +#include + +#include + +#include "qatzip.h" + +#include + +/* A helper macro to 'throw' a java exception. */ +#define THROW(env, exception_name, message) \ +{ \ + jclass ecls = (*env)->FindClass(env, exception_name); \ + if (ecls) { \ + (*env)->ThrowNew(env, ecls, message); \ + (*env)->DeleteLocalRef(env, ecls); \ + } \ +} + +#define QAT_ZIP_LIBRARY_NAME "libqatzip.so" + +typedef int (*dlsym_qzCompress)(QzSession_T *sess, const unsigned char* src, + unsigned int* src_len, unsigned char* dest, unsigned int* dest_len, + unsigned int last); +typedef int (*dlsym_qzDecompress)(QzSession_T *sess, const unsigned char* src, + unsigned int* compressed_buf_len, unsigned char* dest, + unsigned int* uncompressed_buffer_len); +typedef int (*dlsym_qzGetDefaults)(QzSessionParams_T *defaults); +typedef int (*dlsym_qzSetDefaults)(QzSessionParams_T *defaults); + + +typedef struct qat_wrapper_context { + int magic; + dlsym_qzCompress compress; + dlsym_qzDecompress decompress; + dlsym_qzGetDefaults getDefaults; + dlsym_qzSetDefaults setDefaults; +} qat_wrapper_context_t; + +qat_wrapper_context_t g_qat_wrapper_context; + +__thread QzSession_T g_qzSession = { + .internal = NULL, +}; + +/* + * Class: com_intel_qat_jni_QatCodecJNI + * Method: init + * Signature: ()V + */ +JNIEXPORT void JNICALL +Java_com_intel_qat_jni_QatCodecJNI_init( + JNIEnv *env, jclass cls) +{ + qat_wrapper_context_t *qat_wrapper_context = &g_qat_wrapper_context; + void *lib = dlopen(QAT_ZIP_LIBRARY_NAME, RTLD_LAZY | RTLD_GLOBAL); + if (!lib) + { + char msg[128]; + snprintf(msg, 128, "Can't load %s due to %s", QAT_ZIP_LIBRARY_NAME, dlerror()); + THROW(env, "java/lang/UnsatisfiedLinkError", msg); + } + + dlerror(); // Clear any existing error + + qat_wrapper_context->compress = dlsym(lib, "qzCompress"); + if (qat_wrapper_context->compress == NULL) + { + THROW(env, "java/lang/UnsatisfiedLinkError", "Failed to load qzCompress"); + } + + qat_wrapper_context->decompress = dlsym(lib, "qzDecompress"); + if (qat_wrapper_context->compress == NULL) + { + THROW(env, "java/lang/UnsatisfiedLinkError", "Failed to load qzDecompress"); + } + + qat_wrapper_context->getDefaults = dlsym(lib, "qzGetDefaults"); + if (qat_wrapper_context->getDefaults == NULL) + { + THROW(env, "java/lang/UnsatisfiedLinkError", "Failed to load qzGetDefaults"); + } + + qat_wrapper_context->setDefaults = dlsym(lib, "qzSetDefaults"); + if (qat_wrapper_context->setDefaults == NULL) + { + THROW(env, "java/lang/UnsatisfiedLinkError", "Failed to load qzSetDefaults"); + } +} + +/* + * Class: com_intel_qat_jni_QatCodecJNI + * Method: allocNativeBuffer + * Signature: (II)Ljava/lang/Object; + */ +JNIEXPORT jobject JNICALL +Java_com_intel_qat_jni_QatCodecJNI_allocNativeBuffer( + JNIEnv *env, jclass cls, jint capacity, jint align) +{ + void *buffer = NULL; + posix_memalign (&buffer, align, capacity); + if (buffer != NULL) + { + return (*env)->NewDirectByteBuffer(env, buffer, capacity); + } + else + { + THROW(env, "java/lang/OutOfMemoryError", "Error alloc the native buffer"); + return NULL; + } +} + +/* + * Class: com_intel_qat_jni_QatCodecJNI + * Method: createCompressContext + * Signature: (I)J + */ +JNIEXPORT jlong JNICALL +Java_com_intel_qat_jni_QatCodecJNI_createCompressContext( + JNIEnv *env, jclass cls, jint level) +{ + qat_wrapper_context_t *qat_wrapper_context = &g_qat_wrapper_context; + QzSessionParams_T params; + qat_wrapper_context->getDefaults(¶ms); + params.comp_lvl = level; + //fprintf(stderr, "compression level is %d, tid is %ld\n", level, syscall(__NR_gettid)); + qat_wrapper_context->setDefaults(¶ms); + return (jlong)1; +} + +/* + * Class: com_intel_qat_jni_QatCodecJNI + * Method: createDecompressContext + * Signature: ()J + */ +JNIEXPORT jlong JNICALL +Java_com_intel_qat_jni_QatCodecJNI_createDecompressContext( + JNIEnv *env, jclass cls) +{ + return (jlong)1; +} + +/* + * Class: com_intel_qat_jni_QatCodecJNI + * Method: destroyContext + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_com_intel_qat_jni_QatCodecJNI_destroyContext( + JNIEnv *env, jclass cls, jlong contextFromJava) +{ + +} + +/* + * Class: com_intel_qat_jni_QatCodecJNI + * Method: compress + * Signature: (JLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;II)I + */ +JNIEXPORT jint JNICALL +Java_com_intel_qat_jni_QatCodecJNI_compress( + JNIEnv *env, jclass cls, jlong contextFromJava, + jobject srcBuffer, jint srcOff, jint srcLen, + jobject destBuffer, jint destOff, jint destLen) +{ + + uint8_t* in; + uint8_t* out; + uint32_t uncompressed_size = 0; + uint32_t compressed_size = 0; + qat_wrapper_context_t *qat_wrapper_context = &g_qat_wrapper_context; + + in = (uint8_t*)(*env)->GetDirectBufferAddress(env, srcBuffer); + if (in == NULL) + { + THROW(env, "java/lang/OutOfMemoryError", "Can't get compressor input buffer"); + } + + out = (uint8_t*)(*env)->GetDirectBufferAddress(env, destBuffer); + + if (out == NULL) + { + THROW(env, "java/lang/OutOfMemoryError", "Can't get compressor output buffer"); + } + + in += srcOff; + out += destOff; + + uncompressed_size = srcLen; + compressed_size = destLen; + int ret = qat_wrapper_context->compress(&g_qzSession, in, &uncompressed_size, out, &compressed_size, 1); + if (ret == QZ_OK) + { + } + else if (ret == QZ_PARAMS) + { + THROW(env, "java/lang/InternalError", "Could not compress data. *sess is NULL or member of params is invalid."); + } + else if (ret == QZ_FAIL) + { + THROW(env, "java/lang/InternalError", "Could not compress data. Function did not succeed."); + } + else + { + char temp[256]; + sprintf(temp, "Could not compress data. Return error code %d", ret); + THROW(env, "java/lang/InternalError", temp); + } + + return compressed_size; +} + +/* + * Class: com_intel_qat_jni_QatCodecJNI + * Method: decompress + * Signature: (JLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;II)I + */ +JNIEXPORT jint JNICALL +Java_com_intel_qat_jni_QatCodecJNI_decompress( + JNIEnv *env, jclass cls, jlong contextFromJava, + jobject srcBuffer, jint srcOff, jint srcLen, + jobject destBuffer, jint destOff, jint destLen) +{ + + uint8_t* in; + uint8_t* out; + uint32_t uncompressed_size = 0; + uint32_t compressed_size = 0; + qat_wrapper_context_t *qat_wrapper_context = &g_qat_wrapper_context; + + in = (uint8_t*)(*env)->GetDirectBufferAddress(env, srcBuffer); + if (in == NULL) + { + THROW(env, "java/lang/OutOfMemoryError", "Can't get decompressor input buffer"); + } + + out = (uint8_t*)(*env)->GetDirectBufferAddress(env, destBuffer); + if (out == NULL) + { + THROW(env, "java/lang/OutOfMemoryError", "Can't get decompressor output buffer"); + } + + in += srcOff; + out += destOff; + + compressed_size = srcLen; + uncompressed_size = destLen; + int ret = qat_wrapper_context->decompress(&g_qzSession, in, &compressed_size, out, &uncompressed_size); + if (ret == QZ_OK) + { + } + else if (ret == QZ_PARAMS) + { + THROW(env, "java/lang/InternalError", "Could not decompress data. *sess is NULL or member of params is invalid"); + } + else if (ret == QZ_FAIL) + { + THROW(env, "java/lang/InternalError", "Could not decompress data. Function did not succeed."); + } + else + { + char temp[256]; + sprintf(temp, "Could not decompress data. Return error code %d", ret); + THROW(env, "java/lang/InternalError", temp); + } + + return uncompressed_size; +} diff --git a/pom.xml b/pom.xml index 1264f60..f8ef290 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,8 @@ hadoop_qat_wrapper spark_qat_wrapper carbondata_qat_wrapper + common + es_qat_wrapper