diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java index 356f04d7c..8adf35447 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java @@ -187,9 +187,12 @@ class IndexAllocation implements NativeMemoryAllocation { protected void closeInternal() { Runnable onClose = () -> { - writeLock(); - cleanup(); - writeUnlock(); + try { + writeLock(); + cleanup(); + } finally { + writeUnlock(); + } }; // The close operation needs to be blocking to prevent overflow @@ -325,9 +328,12 @@ public TrainingDataAllocation(ExecutorService executor, long memoryAddress, int @Override public void close() { executor.execute(() -> { - writeLock(); - cleanup(); - writeUnlock(); + try { + writeLock(); + cleanup(); + } finally { + writeUnlock(); + } }); } diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java index cb5fbaeba..906ff4cb7 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java @@ -37,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Mockito.doNothing; @@ -259,11 +260,12 @@ public void testIndexAllocation_closeDefault() { } public void testIndexAllocation_closeBlocking() throws InterruptedException, ExecutionException { + // Prepare mocking and a thread pool. WatcherHandle watcherHandle = (WatcherHandle) mock(WatcherHandle.class); - ExecutorService executorService = Executors.newFixedThreadPool(2); - AtomicReference expectedException = new AtomicReference<>(); + ExecutorService executorService = Executors.newSingleThreadExecutor(); - // Blocking close + // Enable `KNN_FORCE_EVICT_CACHE_ENABLED_SETTING` to force it to block other threads. + // Having it false will make `IndexAllocation` to run close logic in a different thread. when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(true); NativeMemoryAllocation.IndexAllocation blockingIndexAllocation = new NativeMemoryAllocation.IndexAllocation( mock(ExecutorService.class), @@ -275,19 +277,21 @@ public void testIndexAllocation_closeBlocking() throws InterruptedException, Exe watcherHandle ); - executorService.submit(blockingIndexAllocation::readLock); + // Acquire a read lock + blockingIndexAllocation.readLock(); + + // This should be blocked as a read lock is still being held. Future closingThread = executorService.submit(blockingIndexAllocation::close); // Check if thread is currently blocked try { closingThread.get(5, TimeUnit.SECONDS); - } catch (Exception e) { - expectedException.set(e); - } - - assertNotNull(expectedException.get()); + fail("Closing should be blocked. We are still holding a read lock."); + } catch (TimeoutException ignored) {} - executorService.submit(blockingIndexAllocation::readUnlock); + // Now, we unlock a read lock. + blockingIndexAllocation.readUnlock(); + // As we don't hold any locking, the closing thread can now good to acquire a write lock. closingThread.get(); // Waits until close