Skip to content

Commit

Permalink
Use CDL to block threads to avoid flaky tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
aoli-al committed Jan 8, 2025
1 parent d4f0a32 commit ec8cafb
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge)
dir.close();
}

@SuppressForbidden(reason = "Thread sleep")
public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException {
ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
MergeScheduler.MergeSource mergeSource =
Expand Down Expand Up @@ -475,11 +474,12 @@ public void merge(MergePolicy.OneMerge merge) throws IOException {
Executor executor = mergeScheduler.intraMergeExecutor;
AtomicInteger threadsExecutedOnPool = new AtomicInteger();
AtomicInteger threadsExecutedOnSelf = new AtomicInteger();
for (int i = 0; i < 4; i++) {
CountDownLatch latch = new CountDownLatch(1);
final int totalThreads = 4;
for (int i = 0; i < totalThreads; i++) {
mergeScheduler.mergeThreads.add(
mergeScheduler.new MergeThread(mergeSource, merge) {
@Override
@SuppressForbidden(reason = "Thread sleep")
public void run() {
executor.execute(
() -> {
Expand All @@ -489,7 +489,7 @@ public void run() {
threadsExecutedOnPool.incrementAndGet();
}
try {
Thread.sleep(100);
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -500,6 +500,10 @@ public void run() {
for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) {
thread.start();
}
while (threadsExecutedOnSelf.get() + threadsExecutedOnPool.get() < totalThreads) {
Thread.yield();
}
latch.countDown();
mergeScheduler.sync();
assertEquals(3, threadsExecutedOnSelf.get());
assertEquals(1, threadsExecutedOnPool.get());
Expand Down

0 comments on commit ec8cafb

Please sign in to comment.