Skip to content

Commit

Permalink
Fix a flaky testing - testIndexAllocation_closeBlocking
Browse files Browse the repository at this point in the history
Signed-off-by: Dooyong Kim <[email protected]>
  • Loading branch information
Dooyong Kim committed Oct 3, 2024
1 parent 43fe67b commit 1e4aae7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ class IndexAllocation implements NativeMemoryAllocation {

protected void closeInternal() {
Runnable onClose = () -> {
writeLock();
cleanup();
writeUnlock();
try {
writeLock();
cleanup();
} finally {
writeUnlock();
}
};

// The close operation needs to be blocking to prevent overflow
Expand Down Expand Up @@ -325,9 +328,12 @@ public TrainingDataAllocation(ExecutorService executor, long memoryAddress, int
@Override
public void close() {
executor.execute(() -> {
writeLock();
cleanup();
writeUnlock();
try {
writeLock();
cleanup();
} finally {
writeUnlock();
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -259,11 +260,12 @@ public void testIndexAllocation_closeDefault() {
}

public void testIndexAllocation_closeBlocking() throws InterruptedException, ExecutionException {
// Prepare mocking and a thread pool.
WatcherHandle<FileWatcher> watcherHandle = (WatcherHandle<FileWatcher>) mock(WatcherHandle.class);
ExecutorService executorService = Executors.newFixedThreadPool(2);
AtomicReference<Exception> expectedException = new AtomicReference<>();
ExecutorService executorService = Executors.newSingleThreadExecutor();

// Blocking close
// Enable `KNN_FORCE_EVICT_CACHE_ENABLED_SETTING` to force it to block other threads.
// Having it false will make `IndexAllocation` to run close logic in a different thread.
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(true);
NativeMemoryAllocation.IndexAllocation blockingIndexAllocation = new NativeMemoryAllocation.IndexAllocation(
mock(ExecutorService.class),
Expand All @@ -275,19 +277,21 @@ public void testIndexAllocation_closeBlocking() throws InterruptedException, Exe
watcherHandle
);

executorService.submit(blockingIndexAllocation::readLock);
// Acquire a read lock
blockingIndexAllocation.readLock();

// This should be blocked as a read lock is still being held.
Future<?> closingThread = executorService.submit(blockingIndexAllocation::close);

// Check if thread is currently blocked
try {
closingThread.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
expectedException.set(e);
}

assertNotNull(expectedException.get());
fail("Closing should be blocked. We are still holding a read lock.");
} catch (TimeoutException ignored) {}

executorService.submit(blockingIndexAllocation::readUnlock);
// Now, we unlock a read lock.
blockingIndexAllocation.readUnlock();
// As we don't hold any locking, the closing thread can now good to acquire a write lock.
closingThread.get();

// Waits until close
Expand Down

0 comments on commit 1e4aae7

Please sign in to comment.