-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add concurrency condition to the soak test using exisiting blocking api #11658
base: master
Are you sure you want to change the base?
Conversation
zbilun
commented
Oct 30, 2024
•
edited
Loading
edited
- PTAL @apolcyn
Histogram latencies = new Histogram(4 /* number of significant value digits */); | ||
long startNs = System.nanoTime(); | ||
ManagedChannel soakChannel = createChannel(); | ||
TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc | ||
.newBlockingStub(soakChannel) | ||
.withInterceptors(recordClientCallInterceptor(clientCallCapture)); | ||
List<Thread> threads = new ArrayList<>(); | ||
// Only allow up to 10 threads to run concurrently | ||
Semaphore semaphore = new Semaphore(10); | ||
for (int i = 0; i < soakIterations; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than hardcoding 10 let's make parallelism configurable with a flag.
We can add a new flag, something like --soak_num_threads
, where the value of the flag determines the number of number of threads to independently run the soak test loop on.
The flag can default to 1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I have updated the code based on your comments. Currently, I am implementing a uniform distribution for the soak iterations across different threads. However, there are several other approaches to distribute the soak iterations dynamically, such as round robin or load balancing. Please let me know if you would prefer a different implementation.
- Also I tried to run "tools/distrib/sanitize.sh" to format the code, however i could not find any sanitize.sh file from the root directory. Any hint?
- I may also need to modify the related .py files. I will update those files and submit a change list (CL) for review afterward.
break; | ||
} | ||
|
||
case RPC_SOAK_CONCURRENT: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than introduce new test cases, let's reuse the existing rpc_soak
and channel_soak
test cases, but add one more parameter for number of threads (passing the value in from the aforementioned --soak_num_threads
flag)
final TestServiceGrpc.TestServiceBlockingStub currSoakStub = soakStub; | ||
if (concurrent) { | ||
semaphore.acquire(); | ||
// Create a new thread for each soak iteration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than create a new thread per iteration, I think it would be better to create --num_threads
threads at the beginning of the soak test, and then have each thread independently run the soak test loop (e.g. run --soak_iterations
RPCs).
…ing soak_num_threads Flag
TimeUnit.NANOSECONDS.sleep(remainingNs); | ||
} | ||
Thread[] threads = new Thread[numThreads]; | ||
int soakIterationsPerThread = (int) Math.ceil((double) soakIterations / numThreads); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check these flags and fail at startup (before running the test), if --soak_iterations is not evenly divisible by --num_threads.
Then, let's just have each thread independently run soakIterationsPerThread
RPCs (no need to calculate startIteration
and endIteration
)
result = performOneSoakIteration(currentStub, soakRequestSize, soakResponseSize); | ||
} catch (Exception e) { | ||
synchronized (this) { | ||
totalFailures.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't need to examine totalFailures
until the thread is done and joined.
We can have a list of per-thread counters that each thread tallies up while running. Then after joining all threads, the main thread can examine all of those counters and create a summary.
final int startIteration = threadInd * soakIterationsPerThread; | ||
final int endIteration = Math.min(startIteration + soakIterationsPerThread, soakIterations); | ||
threads[threadInd] = new Thread(() -> { | ||
ManagedChannel currentChannel = soakChannel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's put the thread body in its own function
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
currentChannel = createChannel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the way that we're resetting the channel here might be problematic here, because, for example, on the first iteration, all threads will shut down the same channel.
I think we need to have all threads start out initially with an independent channel.
…, channel creation logic, and refactor thread body for performSoakTest
StringBuilder logStr = new StringBuilder( | ||
String.format( | ||
Locale.US, | ||
"soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should include thread ID in this log
Let's do: "thread id: %d soak iteration: %d ...."
We can get the thread ID with Thread.currentThread().getId();
Side note: because we're adding flags and changing the logging format of this test, we'll probably need to send a separate pull request to update the interop test description document as well (https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#rpc_soak and https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#channel_soak should mention the new logging format as well as the new flags)
* Runs large unary RPCs in a loop with configurable failure thresholds | ||
* and channel creation behavior. | ||
* Runs large unary RPCs in a loop with configurable failure thresholds | ||
* and channel creation behavior. | ||
*/ | ||
public void performSoakTest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than take a resetChannelPerIteration
parameter, I think we can clean things up by taking a function parameter here instead.
The signature of the function param can be like this: ManagedChannel maybeCreateNewChannel(ManagedChannel channel)
In the rpc soak test, we can provide a callback that always returns the same channel.
In the channel soak test, we can provide a callback that closes the pass-in channel and returns a new one.
startNs, | ||
resetChannelPerIteration, | ||
minTimeMsBetweenRpcs, | ||
threadFailures[currentThreadInd], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several pieces of data that are output from each of these threads:
threadFailures
iterationsDone
latencies
Rather than pass each of these as separate atomic parameters, I think it would be cleaner to pass a single parameter to each of the threads. We can create a simple wrapper object to wrap each of threadFailures
, iterationsDone
, and latencies
.
In the main thread here, we can create a list of such wrapper objects.
That way, each thread has a separate copy of mutable output data, and there is no need to synchronize anything.
After all threads are joined, the main thread can merge results across the list.