diff --git a/src/main/java/com/exalttech/trex/application/guice/StatelessGUIModule.java b/src/main/java/com/exalttech/trex/application/guice/StatelessGUIModule.java index 60bc0107..c1fbc59a 100644 --- a/src/main/java/com/exalttech/trex/application/guice/StatelessGUIModule.java +++ b/src/main/java/com/exalttech/trex/application/guice/StatelessGUIModule.java @@ -1,5 +1,7 @@ package com.exalttech.trex.application.guice; +import com.cisco.trex.stateless.util.IDataCompressor; +import com.cisco.trex.stateless.util.TRexDataCompressor; import com.cisco.trex.stl.gui.storages.StatsStorage; import com.cisco.trex.stl.gui.util.RunningConfiguration; import com.exalttech.trex.core.RPCMethods; @@ -17,5 +19,6 @@ protected void configure() { bind(RunningConfiguration.class).in(Singleton.class); bind(ProtocolDataView.class).toProvider(ProtocolDataViewProvider.class); bind(StatsStorage.class).in(Singleton.class); + bind(IDataCompressor.class).to(TRexDataCompressor.class); } } diff --git a/src/main/java/com/exalttech/trex/core/ConnectionManager.java b/src/main/java/com/exalttech/trex/core/ConnectionManager.java index cfffc5a4..21c49880 100644 --- a/src/main/java/com/exalttech/trex/core/ConnectionManager.java +++ b/src/main/java/com/exalttech/trex/core/ConnectionManager.java @@ -18,6 +18,7 @@ import com.cisco.trex.stateless.TRexClient; import com.cisco.trex.stateless.exception.TRexConnectionException; +import com.cisco.trex.stateless.util.IDataCompressor; import com.exalttech.trex.application.TrexApp; import com.exalttech.trex.remote.exceptions.IncorrectRPCMethodException; import com.exalttech.trex.remote.exceptions.InvalidRPCResponseException; @@ -28,7 +29,6 @@ import com.exalttech.trex.ui.models.Port; import com.exalttech.trex.ui.views.logs.LogType; import com.exalttech.trex.ui.views.logs.LogsController; -import com.exalttech.trex.util.CompressionUtils; import com.exalttech.trex.util.Constants; import com.exalttech.trex.util.Util; import com.fasterxml.jackson.core.JsonProcessingException; @@ -46,26 +46,22 @@ import zmq.ZError; import javax.naming.SizeLimitExceededException; -import javax.xml.bind.DatatypeConverter; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.zip.DataFormatException; public class ConnectionManager { public static final int MAX_REQUEST_SIZE = 999999; //TrexRpcServerReqRes does not handle requests greater this size - public static final int HEADER_SIZE = 8; + private static final int HEADER_SIZE = 8; //4 magic bytes and 4 bytes integer (length of request) (see private TRexClient trexClient; private ScapyServerClient scapyServerClient; private static final Logger LOG = Logger.getLogger(ConnectionManager.class.getName()); @@ -81,6 +77,7 @@ public class ConnectionManager { private final static int INTERNAL_TIMEOUT = 1000; private final static int DEFAULT_TIMEOUT = 3000; + private IDataCompressor dataCompressor = TrexApp.injector.getInstance(IDataCompressor.class); public static ConnectionManager getInstance() { if (instance == null) { @@ -329,6 +326,8 @@ String sendAddStreamRequest(Profile[] profilesList) throws JsonProcessingExcepti * @throws SizeLimitExceededException if there is now possibility to split lists more but there is * requests which are not fit (e.g. one request is greater than MAX_REQUEST_SIZE ({@value MAX_REQUEST_SIZE}) */ + //TODO move logic to transport layer e.g. not send streams group but send commands (TRexTransport has this method + //TODO make transport layer (or TRexClient) to be responsible for splitting and packing huge requests private List> packMultipleRequestsIntoGroups(List requests) throws SizeLimitExceededException { List> sendingGroups = new ArrayList<>(); sendingGroups.add(requests); @@ -484,7 +483,7 @@ private String getAsyncResponse() { String res; try { - res = getDecompressedString(subscriber.recv()); + res = this.dataCompressor.decompressBytesToString(subscriber.recv()); if (res != null) { handleAsyncResponse(res); } else { @@ -521,7 +520,7 @@ protected Void call() throws Exception { int failsCount = 0; while (!isCancelled() && !Thread.currentThread().isInterrupted()) { try { - final String res = getDecompressedString(subscriber.recv()); + final String res = dataCompressor.decompressBytesToString(subscriber.recv()); if (res != null) { handleAsyncResponse(res); failsCount = 0; @@ -587,36 +586,6 @@ private static void runAndWait(Runnable action) throws InterruptedException { } - private String getDecompressedString(byte[] data) { - if (data == null) return null; - - // if the length is larger than 8 bytes - if (data.length > 8) { - - // Take the first 4 bytes - byte[] magicBytes = Arrays.copyOfRange(data, 0, 4); - - String magicString = DatatypeConverter.printHexBinary(magicBytes); - - /* check MAGIC in the first 4 bytes in case we have it, it is compressed */ - String MAGIC_STRING = "ABE85CEA"; - if (magicString.equals(MAGIC_STRING)) { - - // Skip another 4 bytes containing the uncompressed size of the message - byte[] compressedData = Arrays.copyOfRange(data, 8, data.length); - - try { - return new String(CompressionUtils.decompress(compressedData)); - } catch (IOException | DataFormatException ex) { - LOG.error("Failed to decompress data ", ex); - } - - } - - } - return new String(data); - } - private void handleAsyncResponse(String res) { if (res.contains(Constants.TREX_GLOBAL_TAG)) { AsyncResponseManager.getInstance().setTrexGlobalResponse(res); @@ -668,72 +637,59 @@ public void setConnected(boolean connected) { } private byte[] getServerRPCResponse(String request) throws SizeLimitExceededException { - try { - // prepare compression header - ByteBuffer headerByteBuffer = ByteBuffer.allocate(HEADER_SIZE); - headerByteBuffer.put((byte) 0xAB); - headerByteBuffer.put((byte) 0xE8); - headerByteBuffer.put((byte) 0x5C); - headerByteBuffer.put((byte) 0xEA); - headerByteBuffer.putInt(request.length()); - byte[] headerBytes = headerByteBuffer.array(); - // compress request - byte[] compressedRequest = CompressionUtils.compress(request.getBytes()); - byte[] finalRequest = concatByteArrays(headerBytes, compressedRequest); - - if (finalRequest.length >= MAX_REQUEST_SIZE) { - throw new SizeLimitExceededException(MessageFormat.format("Size of request is too large (limit is {0} bytes)", MAX_REQUEST_SIZE)); - } - byte[] serverResponse; - boolean success; - synchronized (sendRequestMonitor) { - if (connectionTimeout.get()) { - return null; - } - try { - success = requester.send(finalRequest); - } catch (ZMQException e) { - if (e.getErrorCode() == ZError.EFSM) { - success = resend(finalRequest); - } else { - throw e; - } - } - if (success) { - serverResponse = requester.recv(0); - if (serverResponse == null) { - if (requester.base().errno() == ZError.EAGAIN) { - int retries = timeout / INTERNAL_TIMEOUT; - while (serverResponse == null && retries > 0) { - if (connectionTimeout.get()) { - return null; - } + byte[] finalRequest = this.dataCompressor.compressStringToBytes(request); - retries--; - serverResponse = requester.recv(0); - } - if (retries == 0 && resend(finalRequest)) { - serverResponse = requester.recv(0); + if (finalRequest.length >= MAX_REQUEST_SIZE) { + throw new SizeLimitExceededException(MessageFormat.format("Size of request is too large (limit is {0} bytes)", MAX_REQUEST_SIZE)); + } + + byte[] serverResponse; + boolean success; + + synchronized (sendRequestMonitor) { + if (connectionTimeout.get()) { + return null; + } + try { + success = requester.send(finalRequest); + } catch (ZMQException e) { + if (e.getErrorCode() == ZError.EFSM) { + success = resend(finalRequest); + } else { + throw e; + } + } + if (success) { + serverResponse = requester.recv(0); + if (serverResponse == null) { + if (requester.base().errno() == ZError.EAGAIN) { + int retries = timeout / INTERNAL_TIMEOUT; + while (serverResponse == null && retries > 0) { + if (connectionTimeout.get()) { + return null; } - } else { - LOG.error("Error sending request"); + + retries--; + serverResponse = requester.recv(0); + } + if (retries == 0 && resend(finalRequest)) { + serverResponse = requester.recv(0); } + } else { + LOG.error("Error sending request"); } - } else { - LOG.error("Error sending request"); - return null; } + } else { + LOG.error("Error sending request"); + return null; } - - return serverResponse == null - ? null - : getDecompressedString(serverResponse).getBytes(); - } catch (IOException ex) { - LOG.error("Error sending request", ex); - return null; } + + return serverResponse == null + ? null + : dataCompressor.decompressBytesToString(serverResponse).getBytes(); } private boolean resend(byte[] msg) { diff --git a/src/main/java/com/exalttech/trex/util/CompressionUtils.java b/src/main/java/com/exalttech/trex/util/CompressionUtils.java deleted file mode 100644 index e295b765..00000000 --- a/src/main/java/com/exalttech/trex/util/CompressionUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * ***************************************************************************** - * Copyright (c) 2016 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ******************************************************************************/ - -package com.exalttech.trex.util; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.zip.DataFormatException; -import java.util.zip.Deflater; -import java.util.zip.Inflater; - -public class CompressionUtils { - - public static byte[] compress(byte[] data) throws IOException { - Deflater deflater = new Deflater(); - deflater.setInput(data); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(data.length); - deflater.finish(); - byte[] buffer = new byte[1024]; - while (!deflater.finished()) { - int count = deflater.deflate(buffer); - outputStream.write(buffer, 0, count); - } - outputStream.close(); - byte[] output = outputStream.toByteArray(); - - return output; - } - - public static byte[] decompress(byte[] data) throws IOException, DataFormatException { - Inflater inflater = new Inflater(); - inflater.setInput(data); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(data.length); - byte[] buffer = new byte[1024]; - while (!inflater.finished()) { - int count = inflater.inflate(buffer); - outputStream.write(buffer, 0, count); - } - outputStream.close(); - byte[] output = outputStream.toByteArray(); - - return output; - } -}