Skip to content

Commit

Permalink
Merge pull request #94 from cisco-system-traffic-generator/compress-d…
Browse files Browse the repository at this point in the history
…ecompress-changes

Moved CompressionUtils to trex-java-sdk
  • Loading branch information
EgorBlagov authored Aug 6, 2018
2 parents 85f695f + 022765c commit c3832c2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 151 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.exalttech.trex.application.guice;

import com.cisco.trex.stateless.util.IDataCompressor;
import com.cisco.trex.stateless.util.TRexDataCompressor;
import com.cisco.trex.stl.gui.storages.StatsStorage;
import com.cisco.trex.stl.gui.util.RunningConfiguration;
import com.exalttech.trex.core.RPCMethods;
Expand All @@ -17,5 +19,6 @@ protected void configure() {
bind(RunningConfiguration.class).in(Singleton.class);
bind(ProtocolDataView.class).toProvider(ProtocolDataViewProvider.class);
bind(StatsStorage.class).in(Singleton.class);
bind(IDataCompressor.class).to(TRexDataCompressor.class);
}
}
146 changes: 51 additions & 95 deletions src/main/java/com/exalttech/trex/core/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.cisco.trex.stateless.TRexClient;
import com.cisco.trex.stateless.exception.TRexConnectionException;
import com.cisco.trex.stateless.util.IDataCompressor;
import com.exalttech.trex.application.TrexApp;
import com.exalttech.trex.remote.exceptions.IncorrectRPCMethodException;
import com.exalttech.trex.remote.exceptions.InvalidRPCResponseException;
Expand All @@ -28,7 +29,6 @@
import com.exalttech.trex.ui.models.Port;
import com.exalttech.trex.ui.views.logs.LogType;
import com.exalttech.trex.ui.views.logs.LogsController;
import com.exalttech.trex.util.CompressionUtils;
import com.exalttech.trex.util.Constants;
import com.exalttech.trex.util.Util;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -46,26 +46,22 @@
import zmq.ZError;

import javax.naming.SizeLimitExceededException;
import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;


public class ConnectionManager {

public static final int MAX_REQUEST_SIZE = 999999; //TrexRpcServerReqRes does not handle requests greater this size
public static final int HEADER_SIZE = 8;
private static final int HEADER_SIZE = 8; //4 magic bytes and 4 bytes integer (length of request) (see
private TRexClient trexClient;
private ScapyServerClient scapyServerClient;
private static final Logger LOG = Logger.getLogger(ConnectionManager.class.getName());
Expand All @@ -81,6 +77,7 @@ public class ConnectionManager {

private final static int INTERNAL_TIMEOUT = 1000;
private final static int DEFAULT_TIMEOUT = 3000;
private IDataCompressor dataCompressor = TrexApp.injector.getInstance(IDataCompressor.class);

public static ConnectionManager getInstance() {
if (instance == null) {
Expand Down Expand Up @@ -329,6 +326,8 @@ String sendAddStreamRequest(Profile[] profilesList) throws JsonProcessingExcepti
* @throws SizeLimitExceededException if there is now possibility to split lists more but there is
* requests which are not fit (e.g. one request is greater than MAX_REQUEST_SIZE ({@value MAX_REQUEST_SIZE})
*/
//TODO move logic to transport layer e.g. not send streams group but send commands (TRexTransport has this method
//TODO make transport layer (or TRexClient) to be responsible for splitting and packing huge requests
private List<List<String>> packMultipleRequestsIntoGroups(List<String> requests) throws SizeLimitExceededException {
List<List<String>> sendingGroups = new ArrayList<>();
sendingGroups.add(requests);
Expand Down Expand Up @@ -484,7 +483,7 @@ private String getAsyncResponse() {

String res;
try {
res = getDecompressedString(subscriber.recv());
res = this.dataCompressor.decompressBytesToString(subscriber.recv());
if (res != null) {
handleAsyncResponse(res);
} else {
Expand Down Expand Up @@ -521,7 +520,7 @@ protected Void call() throws Exception {
int failsCount = 0;
while (!isCancelled() && !Thread.currentThread().isInterrupted()) {
try {
final String res = getDecompressedString(subscriber.recv());
final String res = dataCompressor.decompressBytesToString(subscriber.recv());
if (res != null) {
handleAsyncResponse(res);
failsCount = 0;
Expand Down Expand Up @@ -587,36 +586,6 @@ private static void runAndWait(Runnable action) throws InterruptedException {

}

private String getDecompressedString(byte[] data) {
if (data == null) return null;

// if the length is larger than 8 bytes
if (data.length > 8) {

// Take the first 4 bytes
byte[] magicBytes = Arrays.copyOfRange(data, 0, 4);

String magicString = DatatypeConverter.printHexBinary(magicBytes);

/* check MAGIC in the first 4 bytes in case we have it, it is compressed */
String MAGIC_STRING = "ABE85CEA";
if (magicString.equals(MAGIC_STRING)) {

// Skip another 4 bytes containing the uncompressed size of the message
byte[] compressedData = Arrays.copyOfRange(data, 8, data.length);

try {
return new String(CompressionUtils.decompress(compressedData));
} catch (IOException | DataFormatException ex) {
LOG.error("Failed to decompress data ", ex);
}

}

}
return new String(data);
}

private void handleAsyncResponse(String res) {
if (res.contains(Constants.TREX_GLOBAL_TAG)) {
AsyncResponseManager.getInstance().setTrexGlobalResponse(res);
Expand Down Expand Up @@ -668,72 +637,59 @@ public void setConnected(boolean connected) {
}

private byte[] getServerRPCResponse(String request) throws SizeLimitExceededException {
try {
// prepare compression header
ByteBuffer headerByteBuffer = ByteBuffer.allocate(HEADER_SIZE);
headerByteBuffer.put((byte) 0xAB);
headerByteBuffer.put((byte) 0xE8);
headerByteBuffer.put((byte) 0x5C);
headerByteBuffer.put((byte) 0xEA);
headerByteBuffer.putInt(request.length());
byte[] headerBytes = headerByteBuffer.array();
// compress request
byte[] compressedRequest = CompressionUtils.compress(request.getBytes());
byte[] finalRequest = concatByteArrays(headerBytes, compressedRequest);

if (finalRequest.length >= MAX_REQUEST_SIZE) {
throw new SizeLimitExceededException(MessageFormat.format("Size of request is too large (limit is {0} bytes)", MAX_REQUEST_SIZE));
}

byte[] serverResponse;
boolean success;

synchronized (sendRequestMonitor) {
if (connectionTimeout.get()) {
return null;
}
try {
success = requester.send(finalRequest);
} catch (ZMQException e) {
if (e.getErrorCode() == ZError.EFSM) {
success = resend(finalRequest);
} else {
throw e;
}
}
if (success) {
serverResponse = requester.recv(0);
if (serverResponse == null) {
if (requester.base().errno() == ZError.EAGAIN) {
int retries = timeout / INTERNAL_TIMEOUT;
while (serverResponse == null && retries > 0) {
if (connectionTimeout.get()) {
return null;
}
byte[] finalRequest = this.dataCompressor.compressStringToBytes(request);

retries--;
serverResponse = requester.recv(0);
}
if (retries == 0 && resend(finalRequest)) {
serverResponse = requester.recv(0);
if (finalRequest.length >= MAX_REQUEST_SIZE) {
throw new SizeLimitExceededException(MessageFormat.format("Size of request is too large (limit is {0} bytes)", MAX_REQUEST_SIZE));
}

byte[] serverResponse;
boolean success;

synchronized (sendRequestMonitor) {
if (connectionTimeout.get()) {
return null;
}
try {
success = requester.send(finalRequest);
} catch (ZMQException e) {
if (e.getErrorCode() == ZError.EFSM) {
success = resend(finalRequest);
} else {
throw e;
}
}
if (success) {
serverResponse = requester.recv(0);
if (serverResponse == null) {
if (requester.base().errno() == ZError.EAGAIN) {
int retries = timeout / INTERNAL_TIMEOUT;
while (serverResponse == null && retries > 0) {
if (connectionTimeout.get()) {
return null;
}
} else {
LOG.error("Error sending request");

retries--;
serverResponse = requester.recv(0);
}
if (retries == 0 && resend(finalRequest)) {
serverResponse = requester.recv(0);
}
} else {
LOG.error("Error sending request");
}
} else {
LOG.error("Error sending request");
return null;
}
} else {
LOG.error("Error sending request");
return null;
}

return serverResponse == null
? null
: getDecompressedString(serverResponse).getBytes();
} catch (IOException ex) {
LOG.error("Error sending request", ex);
return null;
}

return serverResponse == null
? null
: dataCompressor.decompressBytesToString(serverResponse).getBytes();
}

private boolean resend(byte[] msg) {
Expand Down
56 changes: 0 additions & 56 deletions src/main/java/com/exalttech/trex/util/CompressionUtils.java

This file was deleted.

0 comments on commit c3832c2

Please sign in to comment.