Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrency condition to the soak test using exisiting blocking api #11658

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
Expand Down Expand Up @@ -1724,8 +1726,8 @@ private SoakIterationResult performOneSoakIteration(
}

/**
* Runs large unary RPCs in a loop with configurable failure thresholds
* and channel creation behavior.
* Runs large unary RPCs in a loop with configurable failure thresholds
* and channel creation behavior.
*/
public void performSoakTest(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than take a resetChannelPerIteration parameter, I think we can clean things up by taking a function parameter here instead.

The signature of the function param can be like this: ManagedChannel maybeCreateNewChannel(ManagedChannel channel)

In the rpc soak test, we can provide a callback that always returns the same channel.

In the channel soak test, we can provide a callback that closes the pass-in channel and returns a new one.

String serverUri,
Expand All @@ -1736,59 +1738,51 @@ public void performSoakTest(
int minTimeMsBetweenRpcs,
int overallTimeoutSeconds,
int soakRequestSize,
int soakResponseSize)
int soakResponseSize,
int numThreads)
throws Exception {
int iterationsDone = 0;
if (soakIterations % numThreads != 0) {
throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads.");
}
ManagedChannel sharedChannel = createChannel();
AtomicInteger iterationsDone = new AtomicInteger(0);
int totalFailures = 0;
Histogram latencies = new Histogram(4 /* number of significant value digits */);
long startNs = System.nanoTime();
ManagedChannel soakChannel = createChannel();
TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc
.newBlockingStub(soakChannel)
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
for (int i = 0; i < soakIterations; i++) {
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
break;
}
long earliestNextStartNs = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
if (resetChannelPerIteration) {
soakChannel.shutdownNow();
soakChannel.awaitTermination(10, TimeUnit.SECONDS);
soakChannel = createChannel();
soakStub = TestServiceGrpc
.newBlockingStub(soakChannel)
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
}
SoakIterationResult result =
performOneSoakIteration(soakStub, soakRequestSize, soakResponseSize);
SocketAddress peer = clientCallCapture
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
StringBuilder logStr = new StringBuilder(
String.format(
Locale.US,
"soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
if (!result.getStatus().equals(Status.OK)) {
totalFailures++;
logStr.append(String.format(" failed: %s", result.getStatus()));
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
totalFailures++;
logStr.append(
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
} else {
logStr.append(" succeeded");
}
System.err.println(logStr.toString());
iterationsDone++;
latencies.recordValue(result.getLatencyMs());
long remainingNs = earliestNextStartNs - System.nanoTime();
if (remainingNs > 0) {
TimeUnit.NANOSECONDS.sleep(remainingNs);
}
Thread[] threads = new Thread[numThreads];
int soakIterationsPerThread = soakIterations / numThreads;
// Hold pre-thread failure counts
AtomicInteger[] threadFailures = new AtomicInteger[numThreads];
for (int i = 0; i < numThreads; i++) {
threadFailures[i] = new AtomicInteger(0);
}
for (int threadInd = 0; threadInd < numThreads; threadInd++) {
final int currentThreadInd = threadInd;
threads[threadInd] = new Thread(() -> executeSoakTestInThreads(
soakIterationsPerThread,
startNs,
resetChannelPerIteration,
minTimeMsBetweenRpcs,
threadFailures[currentThreadInd],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several pieces of data that are output from each of these threads:

  • threadFailures
  • iterationsDone
  • latencies

Rather than pass each of these as separate atomic parameters, I think it would be cleaner to pass a single parameter to each of the threads. We can create a simple wrapper object to wrap each of threadFailures, iterationsDone, and latencies.

In the main thread here, we can create a list of such wrapper objects.

That way, each thread has a separate copy of mutable output data, and there is no need to synchronize anything.

After all threads are joined, the main thread can merge results across the list.

iterationsDone,
latencies,
soakRequestSize,
soakResponseSize,
maxAcceptablePerIterationLatencyMs,
overallTimeoutSeconds,
serverUri,
sharedChannel));
threads[threadInd].start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}}
for (AtomicInteger threadFailure :threadFailures) {
totalFailures += threadFailure.get();
}
soakChannel.shutdownNow();
soakChannel.awaitTermination(10, TimeUnit.SECONDS);
System.err.println(
String.format(
Locale.US,
Expand Down Expand Up @@ -1820,6 +1814,93 @@ public void performSoakTest(
+ "threshold: %d.",
serverUri, totalFailures, maxFailures);
assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures);
shutdownChannel(sharedChannel);
}

private void shutdownChannel(ManagedChannel channel) {
if (channel != null) {
channel.shutdownNow();
try {
channel.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

private void executeSoakTestInThreads(
int soakIterationsPerThread,
long startNs,
boolean resetChannelPerIteration,
int minTimeMsBetweenRpcs,
AtomicInteger threadFailures,
AtomicInteger iterationsDone,
Histogram latencies,
int soakRequestSize,
int soakResponseSize,
int maxAcceptablePerIterationLatencyMs,
int overallTimeoutSeconds,
String serverUri,
ManagedChannel sharedChannel) {
// Get the correct channel (shared channel or new channel)
ManagedChannel currentChannel = sharedChannel;
TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc
.newBlockingStub(currentChannel)
.withInterceptors(recordClientCallInterceptor(clientCallCapture));

for (int i = 0; i < soakIterationsPerThread; i++) {
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
break;
}
long earliestNextStartNs = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
if (resetChannelPerIteration) {
shutdownChannel(currentChannel);
currentChannel = createChannel();
currentStub = TestServiceGrpc
.newBlockingStub(currentChannel)
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
}
SoakIterationResult result;
try {
result = performOneSoakIteration(currentStub, soakRequestSize, soakResponseSize);
} catch (Exception e) {
threadFailures.incrementAndGet();
System.err.println("Error during soak iteration: " + e.getMessage());
continue; // Skip to the next iteration
}

SocketAddress peer = clientCallCapture
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
StringBuilder logStr = new StringBuilder(
String.format(
Locale.US,
"soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should include thread ID in this log

Let's do: "thread id: %d soak iteration: %d ...."

We can get the thread ID with Thread.currentThread().getId();

Side note: because we're adding flags and changing the logging format of this test, we'll probably need to send a separate pull request to update the interop test description document as well (https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#rpc_soak and https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#channel_soak should mention the new logging format as well as the new flags)

i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
if (!result.getStatus().equals(Status.OK)) {
threadFailures.incrementAndGet();
logStr.append(String.format(" failed: %s", result.getStatus()));
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
threadFailures.incrementAndGet();
logStr.append(
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
} else {
logStr.append(" succeeded");
}
System.err.println(logStr.toString());
synchronized (this) {
iterationsDone.incrementAndGet();
}
latencies.recordValue(result.getLatencyMs());
long remainingNs = earliestNextStartNs - System.nanoTime();
if (remainingNs > 0) {
try {
TimeUnit.NANOSECONDS.sleep(remainingNs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

private static void assertSuccess(StreamRecorder<?> recorder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public static void main(String[] args) throws Exception {
soakIterations * soakPerIterationMaxAcceptableLatencyMs / 1000;
private int soakRequestSize = 271828;
private int soakResponseSize = 314159;
private int numThreads = 1;
private String additionalMetadata = "";
private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider;

Expand Down Expand Up @@ -214,6 +215,8 @@ void parseArgs(String[] args) throws Exception {
soakRequestSize = Integer.parseInt(value);
} else if ("soak_response_size".equals(key)) {
soakResponseSize = Integer.parseInt(value);
} else if ("soak_num_threads".equals(key)) {
numThreads = Integer.parseInt(value);
} else if ("additional_metadata".equals(key)) {
additionalMetadata = value;
} else {
Expand Down Expand Up @@ -290,6 +293,9 @@ void parseArgs(String[] args) throws Exception {
+ "\n --soak_response_size "
+ "\n The response size in a soak RPC. Default "
+ c.soakResponseSize
+ "\n --soak_num_threads The number of threads for concurrent execution of the "
+ "\n soak tests (rpc_soak or channel_soak). Default "
+ c.numThreads
+ "\n --additional_metadata "
+ "\n Additional metadata to send in each request, as a "
+ "\n semicolon-separated list of key:value pairs. Default "
Expand Down Expand Up @@ -526,7 +532,8 @@ private void runTest(TestCases testCase) throws Exception {
soakMinTimeMsBetweenRpcs,
soakOverallTimeoutSeconds,
soakRequestSize,
soakResponseSize);
soakResponseSize,
numThreads);
break;
}

Expand All @@ -540,9 +547,9 @@ private void runTest(TestCases testCase) throws Exception {
soakMinTimeMsBetweenRpcs,
soakOverallTimeoutSeconds,
soakRequestSize,
soakResponseSize);
soakResponseSize,
numThreads);
break;

}

case ORCA_PER_RPC: {
Expand Down