From fedbd6441a310c348400a94113734103f49d4a43 Mon Sep 17 00:00:00 2001 From: zbilun Date: Wed, 30 Oct 2024 09:08:46 -0700 Subject: [PATCH 1/6] Add concurrency condition to the soak test using exisiting blocking api --- .../integration/AbstractInteropTest.java | 98 ++++++++++++++----- 1 file changed, 73 insertions(+), 25 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index a581c750028..b66dd0cba7f 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -111,11 +111,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; @@ -1724,12 +1726,13 @@ private SoakIterationResult performOneSoakIteration( } /** - * Runs large unary RPCs in a loop with configurable failure thresholds - * and channel creation behavior. + * Runs large unary RPCs in a loop with configurable failure thresholds + * and channel creation behavior. */ public void performSoakTest( String serverUri, boolean resetChannelPerIteration, + boolean concurrent, int soakIterations, int maxFailures, int maxAcceptablePerIterationLatencyMs, @@ -1738,14 +1741,19 @@ public void performSoakTest( int soakRequestSize, int soakResponseSize) throws Exception { - int iterationsDone = 0; - int totalFailures = 0; + // int iterationsDone = 0; + // int totalFailures = 0; + AtomicInteger iterationsDone = new AtomicInteger(0); + AtomicInteger totalFailures = new AtomicInteger(0); Histogram latencies = new Histogram(4 /* number of significant value digits */); long startNs = System.nanoTime(); ManagedChannel soakChannel = createChannel(); TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc .newBlockingStub(soakChannel) .withInterceptors(recordClientCallInterceptor(clientCallCapture)); + List threads = new ArrayList<>(); + // Only allow up to 10 threads to run concurrently + Semaphore semaphore = new Semaphore(10); for (int i = 0; i < soakIterations; i++) { if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { break; @@ -1760,33 +1768,38 @@ public void performSoakTest( .newBlockingStub(soakChannel) .withInterceptors(recordClientCallInterceptor(clientCallCapture)); } - SoakIterationResult result = - performOneSoakIteration(soakStub, soakRequestSize, soakResponseSize); - SocketAddress peer = clientCallCapture - .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); - StringBuilder logStr = new StringBuilder( - String.format( - Locale.US, - "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", - i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); - if (!result.getStatus().equals(Status.OK)) { - totalFailures++; - logStr.append(String.format(" failed: %s", result.getStatus())); - } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { - totalFailures++; - logStr.append( - " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); + + final TestServiceGrpc.TestServiceBlockingStub currSoakStub = soakStub; + if (concurrent) { + semaphore.acquire(); + // Create a new thread for each soak iteration + Thread thread = new Thread(() -> { + try { + executeSoakIteration(currSoakStub,soakRequestSize, + soakResponseSize, iterationsDone, totalFailures, serverUri, latencies, + maxAcceptablePerIterationLatencyMs); + } finally { + semaphore.release(); + } + }); + threads.add(thread); + thread.start(); } else { - logStr.append(" succeeded"); + executeSoakIteration(currSoakStub, soakRequestSize, soakResponseSize, iterationsDone, + totalFailures, serverUri, latencies, maxAcceptablePerIterationLatencyMs); } - System.err.println(logStr.toString()); - iterationsDone++; - latencies.recordValue(result.getLatencyMs()); long remainingNs = earliestNextStartNs - System.nanoTime(); if (remainingNs > 0) { TimeUnit.NANOSECONDS.sleep(remainingNs); } } + // Wait for all threads to finish if running in concurrent mode + if (concurrent) { + for (Thread thread :threads) { + thread.join(); + }} + + soakChannel.shutdownNow(); soakChannel.awaitTermination(10, TimeUnit.SECONDS); System.err.println( @@ -1819,9 +1832,44 @@ public void performSoakTest( "(server_uri: %s) soak test total failures: %d exceeds max failures " + "threshold: %d.", serverUri, totalFailures, maxFailures); - assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures); + assertTrue(tooManyFailuresErrorMessage, totalFailures.get() <= maxFailures); } + private void executeSoakIteration( + TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, + int soakResponseSize, AtomicInteger iterationsDone, AtomicInteger totalFailures, + String serverUri, Histogram latencies, int maxAcceptablePerIterationLatencyMs) { + try { + SoakIterationResult result = + performOneSoakIteration(soakStub, soakRequestSize, soakResponseSize); + SocketAddress peer = clientCallCapture + .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + StringBuilder logStr = new StringBuilder( + String.format( + Locale.US, + "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", + iterationsDone.get(), result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); + if (!result.getStatus().equals(Status.OK)) { + // totalFailures++; + totalFailures.incrementAndGet(); + logStr.append(String.format(" failed: %s", result.getStatus())); + } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { + // totalFailures++; + totalFailures.incrementAndGet(); + logStr.append( + " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); + } else { + logStr.append(" succeeded"); + iterationsDone.incrementAndGet(); + } + System.err.println(logStr.toString()); + // iterationsDone++; + latencies.recordValue(result.getLatencyMs()); + } catch (Exception e) { + e.printStackTrace(); + totalFailures.incrementAndGet(); + }} + private static void assertSuccess(StreamRecorder recorder) { if (recorder.getError() != null) { throw new AssertionError(recorder.getError()); From 885e1096873e37c0ad85841e5043cf2470d0d5da Mon Sep 17 00:00:00 2001 From: zbilun Date: Wed, 30 Oct 2024 09:40:53 -0700 Subject: [PATCH 2/6] Modify the influenced files --- .../grpc/testing/integration/TestCases.java | 4 +++ .../integration/TestServiceClient.java | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index 2d16065254a..16981684f8e 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -57,7 +57,11 @@ public enum TestCases { VERY_LARGE_REQUEST("very large request"), PICK_FIRST_UNARY("all requests are sent to one server despite multiple servers are resolved"), RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"), + RPC_SOAK_CONCURRENT("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel," + + " and using multiple threads concurrently"), CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), + CHANNEL_SOAK_CONCURRENT("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel," + + " and using multiple threads concurrently"), ORCA_PER_RPC("report backend metrics per query"), ORCA_OOB("report backend metrics out-of-band"); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index e6829be11cb..508e58264c7 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -520,6 +520,22 @@ private void runTest(TestCases testCase) throws Exception { tester.performSoakTest( serverHost, false /* resetChannelPerIteration */, + false /* concurrent */, + soakIterations, + soakMaxFailures, + soakPerIterationMaxAcceptableLatencyMs, + soakMinTimeMsBetweenRpcs, + soakOverallTimeoutSeconds, + soakRequestSize, + soakResponseSize); + break; + } + + case RPC_SOAK_CONCURRENT: { + tester.performSoakTest( + serverHost, + false /* resetChannelPerIteration */, + true /* concurrent */, soakIterations, soakMaxFailures, soakPerIterationMaxAcceptableLatencyMs, @@ -534,6 +550,7 @@ private void runTest(TestCases testCase) throws Exception { tester.performSoakTest( serverHost, true /* resetChannelPerIteration */, + false /* concurrent */, soakIterations, soakMaxFailures, soakPerIterationMaxAcceptableLatencyMs, @@ -542,7 +559,21 @@ private void runTest(TestCases testCase) throws Exception { soakRequestSize, soakResponseSize); break; + } + case CHANNEL_SOAK_CONCURRENT: { + tester.performSoakTest( + serverHost, + true /* resetChannelPerIteration */, + true /* concurrent */, + soakIterations, + soakMaxFailures, + soakPerIterationMaxAcceptableLatencyMs, + soakMinTimeMsBetweenRpcs, + soakOverallTimeoutSeconds, + soakRequestSize, + soakResponseSize); + break; } case ORCA_PER_RPC: { From 58e2ebd218cb660c15ca8c93766244d2f26990b2 Mon Sep 17 00:00:00 2001 From: zbilun Date: Thu, 31 Oct 2024 10:25:51 -0700 Subject: [PATCH 3/6] Address code review comments from Alex: improve soak test logic by using soak_num_threads Flag --- .../integration/AbstractInteropTest.java | 163 +++++++++--------- .../grpc/testing/integration/TestCases.java | 4 - .../integration/TestServiceClient.java | 44 ++--- 3 files changed, 90 insertions(+), 121 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index b66dd0cba7f..92a4bb506ad 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1732,17 +1732,15 @@ private SoakIterationResult performOneSoakIteration( public void performSoakTest( String serverUri, boolean resetChannelPerIteration, - boolean concurrent, int soakIterations, int maxFailures, int maxAcceptablePerIterationLatencyMs, int minTimeMsBetweenRpcs, int overallTimeoutSeconds, int soakRequestSize, - int soakResponseSize) + int soakResponseSize, + int numThreads) throws Exception { - // int iterationsDone = 0; - // int totalFailures = 0; AtomicInteger iterationsDone = new AtomicInteger(0); AtomicInteger totalFailures = new AtomicInteger(0); Histogram latencies = new Histogram(4 /* number of significant value digits */); @@ -1751,54 +1749,88 @@ public void performSoakTest( TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc .newBlockingStub(soakChannel) .withInterceptors(recordClientCallInterceptor(clientCallCapture)); - List threads = new ArrayList<>(); - // Only allow up to 10 threads to run concurrently - Semaphore semaphore = new Semaphore(10); - for (int i = 0; i < soakIterations; i++) { - if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { - break; - } - long earliestNextStartNs = System.nanoTime() - + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); - if (resetChannelPerIteration) { - soakChannel.shutdownNow(); - soakChannel.awaitTermination(10, TimeUnit.SECONDS); - soakChannel = createChannel(); - soakStub = TestServiceGrpc - .newBlockingStub(soakChannel) - .withInterceptors(recordClientCallInterceptor(clientCallCapture)); - } - - final TestServiceGrpc.TestServiceBlockingStub currSoakStub = soakStub; - if (concurrent) { - semaphore.acquire(); - // Create a new thread for each soak iteration - Thread thread = new Thread(() -> { + Thread[] threads = new Thread[numThreads]; + int soakIterationsPerThread = (int) Math.ceil((double) soakIterations / numThreads); + for (int threadInd = 0; threadInd < numThreads; threadInd++) { + final int startIteration = threadInd * soakIterationsPerThread; + final int endIteration = Math.min(startIteration + soakIterationsPerThread, soakIterations); + threads[threadInd] = new Thread(() -> { + ManagedChannel currentChannel = soakChannel; + TestServiceGrpc.TestServiceBlockingStub currentStub = soakStub; + for (int i = startIteration; i < endIteration; i++) { + if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { + break; + } + long earliestNextStartNs = System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); + if (resetChannelPerIteration) { + currentChannel.shutdownNow(); + try { + currentChannel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + currentChannel = createChannel(); + currentStub = TestServiceGrpc + .newBlockingStub(soakChannel) + .withInterceptors(recordClientCallInterceptor(clientCallCapture)); + } + SoakIterationResult result; try { - executeSoakIteration(currSoakStub,soakRequestSize, - soakResponseSize, iterationsDone, totalFailures, serverUri, latencies, - maxAcceptablePerIterationLatencyMs); - } finally { - semaphore.release(); + result = performOneSoakIteration(currentStub, soakRequestSize, soakResponseSize); + } catch (Exception e) { + synchronized (this) { + totalFailures.incrementAndGet(); + } + System.err.println("Error during soak iteration: " + e.getMessage()); + continue; // Skip to the next iteration } - }); - threads.add(thread); - thread.start(); - } else { - executeSoakIteration(currSoakStub, soakRequestSize, soakResponseSize, iterationsDone, - totalFailures, serverUri, latencies, maxAcceptablePerIterationLatencyMs); - } - long remainingNs = earliestNextStartNs - System.nanoTime(); - if (remainingNs > 0) { - TimeUnit.NANOSECONDS.sleep(remainingNs); - } + + SocketAddress peer = clientCallCapture + .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + StringBuilder logStr = new StringBuilder( + String.format( + Locale.US, + "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", + i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); + if (!result.getStatus().equals(Status.OK)) { + synchronized (this) { + totalFailures.incrementAndGet(); + } + logStr.append(String.format(" failed: %s", result.getStatus())); + } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { + synchronized (this) { + totalFailures.incrementAndGet(); + } + logStr.append( + " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); + } else { + logStr.append(" succeeded"); + } + System.err.println(logStr.toString()); + synchronized (this) { + iterationsDone.incrementAndGet(); + } + latencies.recordValue(result.getLatencyMs()); + long remainingNs = earliestNextStartNs - System.nanoTime(); + if (remainingNs > 0) { + try { + TimeUnit.NANOSECONDS.sleep(remainingNs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }); + threads[threadInd].start(); } - // Wait for all threads to finish if running in concurrent mode - if (concurrent) { - for (Thread thread :threads) { - thread.join(); - }} + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + }} soakChannel.shutdownNow(); soakChannel.awaitTermination(10, TimeUnit.SECONDS); @@ -1835,41 +1867,6 @@ public void performSoakTest( assertTrue(tooManyFailuresErrorMessage, totalFailures.get() <= maxFailures); } - private void executeSoakIteration( - TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, - int soakResponseSize, AtomicInteger iterationsDone, AtomicInteger totalFailures, - String serverUri, Histogram latencies, int maxAcceptablePerIterationLatencyMs) { - try { - SoakIterationResult result = - performOneSoakIteration(soakStub, soakRequestSize, soakResponseSize); - SocketAddress peer = clientCallCapture - .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); - StringBuilder logStr = new StringBuilder( - String.format( - Locale.US, - "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", - iterationsDone.get(), result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); - if (!result.getStatus().equals(Status.OK)) { - // totalFailures++; - totalFailures.incrementAndGet(); - logStr.append(String.format(" failed: %s", result.getStatus())); - } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { - // totalFailures++; - totalFailures.incrementAndGet(); - logStr.append( - " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); - } else { - logStr.append(" succeeded"); - iterationsDone.incrementAndGet(); - } - System.err.println(logStr.toString()); - // iterationsDone++; - latencies.recordValue(result.getLatencyMs()); - } catch (Exception e) { - e.printStackTrace(); - totalFailures.incrementAndGet(); - }} - private static void assertSuccess(StreamRecorder recorder) { if (recorder.getError() != null) { throw new AssertionError(recorder.getError()); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index 16981684f8e..2d16065254a 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -57,11 +57,7 @@ public enum TestCases { VERY_LARGE_REQUEST("very large request"), PICK_FIRST_UNARY("all requests are sent to one server despite multiple servers are resolved"), RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"), - RPC_SOAK_CONCURRENT("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel," - + " and using multiple threads concurrently"), CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), - CHANNEL_SOAK_CONCURRENT("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel," - + " and using multiple threads concurrently"), ORCA_PER_RPC("report backend metrics per query"), ORCA_OOB("report backend metrics out-of-band"); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 508e58264c7..27f29882c7c 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -134,6 +134,7 @@ public static void main(String[] args) throws Exception { soakIterations * soakPerIterationMaxAcceptableLatencyMs / 1000; private int soakRequestSize = 271828; private int soakResponseSize = 314159; + private int numThreads = 1; private String additionalMetadata = ""; private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider; @@ -214,6 +215,8 @@ void parseArgs(String[] args) throws Exception { soakRequestSize = Integer.parseInt(value); } else if ("soak_response_size".equals(key)) { soakResponseSize = Integer.parseInt(value); + } else if ("soak_num_threads".equals(key)) { + numThreads = Integer.parseInt(value); } else if ("additional_metadata".equals(key)) { additionalMetadata = value; } else { @@ -290,6 +293,9 @@ void parseArgs(String[] args) throws Exception { + "\n --soak_response_size " + "\n The response size in a soak RPC. Default " + c.soakResponseSize + + "\n --soak_num_threads The number of threads for concurrent execution of the " + + "\n soak tests (rpc_soak or channel_soak). Default " + + c.numThreads + "\n --additional_metadata " + "\n Additional metadata to send in each request, as a " + "\n semicolon-separated list of key:value pairs. Default " @@ -520,29 +526,14 @@ private void runTest(TestCases testCase) throws Exception { tester.performSoakTest( serverHost, false /* resetChannelPerIteration */, - false /* concurrent */, soakIterations, soakMaxFailures, soakPerIterationMaxAcceptableLatencyMs, soakMinTimeMsBetweenRpcs, soakOverallTimeoutSeconds, soakRequestSize, - soakResponseSize); - break; - } - - case RPC_SOAK_CONCURRENT: { - tester.performSoakTest( - serverHost, - false /* resetChannelPerIteration */, - true /* concurrent */, - soakIterations, - soakMaxFailures, - soakPerIterationMaxAcceptableLatencyMs, - soakMinTimeMsBetweenRpcs, - soakOverallTimeoutSeconds, - soakRequestSize, - soakResponseSize); + soakResponseSize, + numThreads); break; } @@ -550,29 +541,14 @@ private void runTest(TestCases testCase) throws Exception { tester.performSoakTest( serverHost, true /* resetChannelPerIteration */, - false /* concurrent */, - soakIterations, - soakMaxFailures, - soakPerIterationMaxAcceptableLatencyMs, - soakMinTimeMsBetweenRpcs, - soakOverallTimeoutSeconds, - soakRequestSize, - soakResponseSize); - break; - } - - case CHANNEL_SOAK_CONCURRENT: { - tester.performSoakTest( - serverHost, - true /* resetChannelPerIteration */, - true /* concurrent */, soakIterations, soakMaxFailures, soakPerIterationMaxAcceptableLatencyMs, soakMinTimeMsBetweenRpcs, soakOverallTimeoutSeconds, soakRequestSize, - soakResponseSize); + soakResponseSize, + numThreads); break; } From 56883b5a4dc60c489ae431cab22492d294a90cd9 Mon Sep 17 00:00:00 2001 From: zbilun Date: Fri, 1 Nov 2024 03:35:00 -0700 Subject: [PATCH 4/6] Address code review comments from Alex: modify totalFailures handling, channel creation logic, and refactor thread body for performSoakTest --- .../integration/AbstractInteropTest.java | 195 ++++++++++-------- .../io/grpc/testing/integration/test.java | 0 2 files changed, 114 insertions(+), 81 deletions(-) create mode 100644 interop-testing/src/main/java/io/grpc/testing/integration/test.java diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 92a4bb506ad..f8a2b2602a6 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1741,99 +1741,46 @@ public void performSoakTest( int soakResponseSize, int numThreads) throws Exception { + if (soakIterations % numThreads != 0) { + throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads."); + } AtomicInteger iterationsDone = new AtomicInteger(0); - AtomicInteger totalFailures = new AtomicInteger(0); + int totalFailures = 0; Histogram latencies = new Histogram(4 /* number of significant value digits */); long startNs = System.nanoTime(); - ManagedChannel soakChannel = createChannel(); - TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc - .newBlockingStub(soakChannel) - .withInterceptors(recordClientCallInterceptor(clientCallCapture)); Thread[] threads = new Thread[numThreads]; - int soakIterationsPerThread = (int) Math.ceil((double) soakIterations / numThreads); + int soakIterationsPerThread = soakIterations / numThreads; + // Hold pre-thread failure counts + AtomicInteger[] threadFailures = new AtomicInteger[numThreads]; + for (int i = 0; i < numThreads; i++) { + threadFailures[i] = new AtomicInteger(0); + } for (int threadInd = 0; threadInd < numThreads; threadInd++) { - final int startIteration = threadInd * soakIterationsPerThread; - final int endIteration = Math.min(startIteration + soakIterationsPerThread, soakIterations); - threads[threadInd] = new Thread(() -> { - ManagedChannel currentChannel = soakChannel; - TestServiceGrpc.TestServiceBlockingStub currentStub = soakStub; - for (int i = startIteration; i < endIteration; i++) { - if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { - break; - } - long earliestNextStartNs = System.nanoTime() - + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); - if (resetChannelPerIteration) { - currentChannel.shutdownNow(); - try { - currentChannel.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - currentChannel = createChannel(); - currentStub = TestServiceGrpc - .newBlockingStub(soakChannel) - .withInterceptors(recordClientCallInterceptor(clientCallCapture)); - } - SoakIterationResult result; - try { - result = performOneSoakIteration(currentStub, soakRequestSize, soakResponseSize); - } catch (Exception e) { - synchronized (this) { - totalFailures.incrementAndGet(); - } - System.err.println("Error during soak iteration: " + e.getMessage()); - continue; // Skip to the next iteration - } - - SocketAddress peer = clientCallCapture - .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); - StringBuilder logStr = new StringBuilder( - String.format( - Locale.US, - "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", - i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); - if (!result.getStatus().equals(Status.OK)) { - synchronized (this) { - totalFailures.incrementAndGet(); - } - logStr.append(String.format(" failed: %s", result.getStatus())); - } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { - synchronized (this) { - totalFailures.incrementAndGet(); - } - logStr.append( - " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); - } else { - logStr.append(" succeeded"); - } - System.err.println(logStr.toString()); - synchronized (this) { - iterationsDone.incrementAndGet(); - } - latencies.recordValue(result.getLatencyMs()); - long remainingNs = earliestNextStartNs - System.nanoTime(); - if (remainingNs > 0) { - try { - TimeUnit.NANOSECONDS.sleep(remainingNs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - }); + final int currentThreadInd = threadInd; + threads[threadInd] = new Thread(() -> executeSoakTestInThreads( + soakIterationsPerThread, + startNs, + resetChannelPerIteration, + minTimeMsBetweenRpcs, + threadFailures[currentThreadInd], + iterationsDone, + latencies, + soakRequestSize, + soakResponseSize, + maxAcceptablePerIterationLatencyMs, + overallTimeoutSeconds, + serverUri)); threads[threadInd].start(); } - for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }} - - soakChannel.shutdownNow(); - soakChannel.awaitTermination(10, TimeUnit.SECONDS); + for (AtomicInteger threadFailure :threadFailures) { + totalFailures += threadFailure.get(); + } System.err.println( String.format( Locale.US, @@ -1864,7 +1811,93 @@ public void performSoakTest( "(server_uri: %s) soak test total failures: %d exceeds max failures " + "threshold: %d.", serverUri, totalFailures, maxFailures); - assertTrue(tooManyFailuresErrorMessage, totalFailures.get() <= maxFailures); + assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures); + } + + private void executeSoakTestInThreads( + int soakIterationsPerThread, + long startNs, + boolean resetChannelPerIteration, + int minTimeMsBetweenRpcs, + AtomicInteger threadFailures, + AtomicInteger iterationsDone, + Histogram latencies, + int soakRequestSize, + int soakResponseSize, + int maxAcceptablePerIterationLatencyMs, + int overallTimeoutSeconds, + String serverUri) { + // Each thread creates its own channel + ManagedChannel currentChannel = createChannel(); + TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc + .newBlockingStub(currentChannel) + .withInterceptors(recordClientCallInterceptor(clientCallCapture)); + + for (int i = 0; i < soakIterationsPerThread; i++) { + if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { + break; + } + long earliestNextStartNs = System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); + if (resetChannelPerIteration) { + currentChannel.shutdownNow(); + try { + currentChannel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + currentChannel = createChannel(); + currentStub = TestServiceGrpc + .newBlockingStub(currentChannel) + .withInterceptors(recordClientCallInterceptor(clientCallCapture)); + } + SoakIterationResult result; + try { + result = performOneSoakIteration(currentStub, soakRequestSize, soakResponseSize); + } catch (Exception e) { + threadFailures.incrementAndGet(); + System.err.println("Error during soak iteration: " + e.getMessage()); + continue; // Skip to the next iteration + } + + SocketAddress peer = clientCallCapture + .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + StringBuilder logStr = new StringBuilder( + String.format( + Locale.US, + "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", + i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); + if (!result.getStatus().equals(Status.OK)) { + threadFailures.incrementAndGet(); + logStr.append(String.format(" failed: %s", result.getStatus())); + } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { + threadFailures.incrementAndGet(); + logStr.append( + " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); + } else { + logStr.append(" succeeded"); + } + System.err.println(logStr.toString()); + synchronized (this) { + iterationsDone.incrementAndGet(); + } + latencies.recordValue(result.getLatencyMs()); + long remainingNs = earliestNextStartNs - System.nanoTime(); + if (remainingNs > 0) { + try { + TimeUnit.NANOSECONDS.sleep(remainingNs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + // Shutdown the channel after completing the current thread soakiterations + currentChannel.shutdownNow(); + try { + currentChannel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } private static void assertSuccess(StreamRecorder recorder) { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/test.java b/interop-testing/src/main/java/io/grpc/testing/integration/test.java new file mode 100644 index 00000000000..e69de29bb2d From 7a9f3a95be39324e199e94ecc54608926114b395 Mon Sep 17 00:00:00 2001 From: zbilun Date: Fri, 1 Nov 2024 03:37:24 -0700 Subject: [PATCH 5/6] Removed useless file --- .../src/main/java/io/grpc/testing/integration/test.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 interop-testing/src/main/java/io/grpc/testing/integration/test.java diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/test.java b/interop-testing/src/main/java/io/grpc/testing/integration/test.java deleted file mode 100644 index e69de29bb2d..00000000000 From 84dac6218e9580ce2ad705b35bd0ee9c0063fa43 Mon Sep 17 00:00:00 2001 From: zbilun Date: Mon, 4 Nov 2024 15:10:20 -0800 Subject: [PATCH 6/6] Modify the channel implementation for rpc_soak test. --- .../integration/AbstractInteropTest.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index f8a2b2602a6..fb6d6de7cae 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1744,6 +1744,7 @@ public void performSoakTest( if (soakIterations % numThreads != 0) { throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads."); } + ManagedChannel sharedChannel = createChannel(); AtomicInteger iterationsDone = new AtomicInteger(0); int totalFailures = 0; Histogram latencies = new Histogram(4 /* number of significant value digits */); @@ -1769,7 +1770,8 @@ public void performSoakTest( soakResponseSize, maxAcceptablePerIterationLatencyMs, overallTimeoutSeconds, - serverUri)); + serverUri, + sharedChannel)); threads[threadInd].start(); } for (Thread thread : threads) { @@ -1812,6 +1814,18 @@ public void performSoakTest( + "threshold: %d.", serverUri, totalFailures, maxFailures); assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures); + shutdownChannel(sharedChannel); + } + + private void shutdownChannel(ManagedChannel channel) { + if (channel != null) { + channel.shutdownNow(); + try { + channel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } private void executeSoakTestInThreads( @@ -1826,9 +1840,10 @@ private void executeSoakTestInThreads( int soakResponseSize, int maxAcceptablePerIterationLatencyMs, int overallTimeoutSeconds, - String serverUri) { - // Each thread creates its own channel - ManagedChannel currentChannel = createChannel(); + String serverUri, + ManagedChannel sharedChannel) { + // Get the correct channel (shared channel or new channel) + ManagedChannel currentChannel = sharedChannel; TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc .newBlockingStub(currentChannel) .withInterceptors(recordClientCallInterceptor(clientCallCapture)); @@ -1840,12 +1855,7 @@ private void executeSoakTestInThreads( long earliestNextStartNs = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); if (resetChannelPerIteration) { - currentChannel.shutdownNow(); - try { - currentChannel.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + shutdownChannel(currentChannel); currentChannel = createChannel(); currentStub = TestServiceGrpc .newBlockingStub(currentChannel) @@ -1891,13 +1901,6 @@ private void executeSoakTestInThreads( } } } - // Shutdown the channel after completing the current thread soakiterations - currentChannel.shutdownNow(); - try { - currentChannel.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } } private static void assertSuccess(StreamRecorder recorder) {