Skip to content

Commit

Permalink
WIP: run tests in parallel
Browse files Browse the repository at this point in the history
This begins the process of allowing for parallel test execution. A few issues were noticed that required some changes:

* Some tests were taking up a bunch of time and benefitted from parallelized execution of tests within the fixture. Those have been updated to CONCURRENT execution mode
* A known issue with JUnit (junit-team/junit5#3108) means that if one of the tests involves a future waiting, that can look to the `ForkJoinPool` like the thread is available for work stealing, so too many tests can end up being executed at once. A new test extension was added that has a semaphore, and that appears to be enough to stop extra tests from being executed
* The server was running out of batch GRV transactions, which resulted in tests failing with "batch GRV transactions exhausted". This mainly affected indexing tests, and I was able to resolve this by upping the transaction priority, for better or worse

There are still some issues:

* A number of tests were hitting deadline exceeded exceptions. It looked like some kind of weird concurrency stuff may be going on, because there were things happening like key space path resolution while creating a record store would include stack traces from closing the test key space path manager, which seems like things were sharing objects that shouldn't have been. This affected both the `:fdb-record-layer-core:test` and `:fdb-lucene:test` tasks
* The relational layer tests are not structured to allocate unique key spaces for each test, so they immediatetly hit concurrency problems when run in a parallelized manner
  • Loading branch information
alecgrieser committed Jan 21, 2025
1 parent da2ff33 commit c554503
Show file tree
Hide file tree
Showing 19 changed files with 208 additions and 158 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* LimitConcurrencyExtension.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.test;

import com.google.auto.service.AutoService;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.ExtensionContext;

import java.util.concurrent.Semaphore;

@AutoService(Extension.class)
public class LimitConcurrencyExtension implements BeforeEachCallback, AfterEachCallback {
static Semaphore testConcurrency = new Semaphore(Integer.parseInt(System.getProperty("tests.concurrencyLimit", "10")));

@Override
public void beforeEach(final ExtensionContext context) throws InterruptedException {
testConcurrency.acquire();
}

@Override
public void afterEach(final ExtensionContext context) {
testConcurrency.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.google.protobuf.Message;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import javax.annotation.Nonnull;
import java.util.HashMap;
Expand All @@ -69,6 +71,7 @@
* Tests related to built in functionality for getting the count of records in a store.
*/
@Tag(Tags.RequiresFDB)
@Execution(ExecutionMode.CONCURRENT)
public class FDBRecordStoreCountRecordsTest extends FDBRecordStoreTestBase {

@Test
Expand Down Expand Up @@ -380,7 +383,7 @@ public void addCountIndex() throws Exception {
}

// Build the index
try (OnlineIndexer onlineIndexBuilder = OnlineIndexer.forRecordStoreAndIndex(recordStore, "record_count")) {
try (OnlineIndexer onlineIndexBuilder = newIndexer("record_count")) {
onlineIndexBuilder.buildIndex();
}
try (FDBRecordContext context = openContext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,8 +1239,7 @@ void markReadableTest() throws Exception {
openSimpleRecordStore(context);
Index index = recordStore.getRecordMetaData().getIndex(indexName);
assertThat(recordStore.isIndexReadable(index), is(false));
try (OnlineIndexer indexBuilder = OnlineIndexer.newBuilder().setRecordStore(recordStore).setIndex(index)
.build()) {
try (OnlineIndexer indexBuilder = newIndexer(indexName)) {
indexBuilder.buildIndex(false);
}
commit(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,18 @@ protected FDBStoredRecord<Message> saveAndSplitSimpleRecord(long recno, String s
md.removeIndex("MySimpleRecord$str_value_indexed");
md.setStoreRecordVersions(false);
};

@Nonnull
protected OnlineIndexer.Builder newIndexerBuilder() {
return OnlineIndexer.newBuilder()
.setRecordStore(recordStore)
.setPriority(FDBTransactionPriority.DEFAULT);
}

@Nonnull
protected OnlineIndexer newIndexer(@Nonnull String indexName) {
return newIndexerBuilder()
.setIndex(indexName)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import com.apple.foundationdb.record.metadata.Key;
import com.apple.foundationdb.record.provider.foundationdb.indexes.InvalidIndexEntry;
import com.apple.foundationdb.record.provider.foundationdb.indexes.ValueIndexMaintainer;
import com.apple.foundationdb.record.query.QueryToKeyMatcher;
import com.apple.foundationdb.record.provider.foundationdb.keyspace.KeySpacePath;
import com.apple.foundationdb.record.query.QueryToKeyMatcher;
import com.apple.foundationdb.record.query.plan.QueryPlanner;
import com.apple.foundationdb.record.test.TestKeySpace;
import com.apple.foundationdb.record.util.pair.Pair;
Expand All @@ -59,6 +59,8 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -98,6 +100,7 @@
* Tests of uniqueness checks.
*/
@Tag(Tags.RequiresFDB)
@Execution(ExecutionMode.CONCURRENT)
public class FDBRecordStoreUniqueIndexTest extends FDBRecordStoreTestBase {

private static final String NO_UNIQUE_CLEAR_INDEX_TYPE = "no_unique_clear";
Expand Down Expand Up @@ -310,8 +313,7 @@ void multipleStores() throws Exception {
recordStore = createOrOpenRecordStore(context, simpleMetaData(uniqueHook), path).getLeft();
commit(context);

try (OnlineIndexer indexBuilder = OnlineIndexer.newBuilder()
.setRecordStore(recordStore)
try (OnlineIndexer indexBuilder = newIndexerBuilder()
.setTargetIndexes(List.of(uniqueIndex))
.setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder()
.allowUniquePendingState(true))
Expand Down Expand Up @@ -630,8 +632,7 @@ public void addUniqueIndexViaBuild() {
if (allowReadableUniquePending) {
indexingPolicy.allowUniquePendingState();
}
try (OnlineIndexer indexer = OnlineIndexer.newBuilder()
.setRecordStore(recordStore)
try (OnlineIndexer indexer = newIndexerBuilder()
.setTargetIndexesByName(List.of(uniqueIndex.getName()))
.setIndexingPolicy(indexingPolicy
.build()).build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ void doNotAllowBuildingIndexFromUnnestedIndex() {
try (OnlineIndexer indexer = OnlineIndexer.newBuilder()
.setRecordStoreBuilder(storeBuilder)
.setIndex(targetIndex)
.setPriority(FDBTransactionPriority.DEFAULT)
.setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder()
.setSourceIndex(sourceIndex.getName())
.setIfDisabled(OnlineIndexer.IndexingPolicy.DesiredAction.REBUILD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ void testRepartitionTimeout() {
try (OnlineIndexer indexer = OnlineIndexer.newBuilder()
.setRecordStoreBuilder(storeBuilder)
.setTargetIndexesByName(List.of(INDEX_NAME))
.setPriority(FDBTransactionPriority.DEFAULT)
.setMaxAttempts(9)
.build()) {
Assertions.assertThrows(FDBExceptions.FDBStoreTransactionIsTooOldException.class, indexer::mergeIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ OnlineIndexer.Builder newIndexerBuilder() {
.setMetaData(metaData)
.setSubspaceProvider(new SubspaceProviderByKeySpacePath(path))
.setIndexMaintenanceFilter(getIndexMaintenanceFilter())
.setPriority(FDBTransactionPriority.DEFAULT)
.setFormatVersion(formatVersion);
}

Expand All @@ -181,6 +182,7 @@ OnlineIndexScrubber.Builder newScrubberBuilder() {
.setMetaData(metaData)
.setSubspaceProvider(new SubspaceProviderByKeySpacePath(path))
.setIndexMaintenanceFilter(getIndexMaintenanceFilter())
.setPriority(FDBTransactionPriority.DEFAULT)
.setFormatVersion(formatVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ public void testOnlineIndexBuilder() throws Exception {
timer.reset();

// Build in this transaction.
try (OnlineIndexer indexBuilder = OnlineIndexer.forRecordStoreAndIndex(recordStore, "newIndex")) {
try (OnlineIndexer indexBuilder = newIndexer("newIndex")) {
indexBuilder.rebuildIndex(recordStore);
}
recordStore.markIndexReadable("newIndex").join();
Expand All @@ -629,7 +629,7 @@ public void testOnlineIndexBuilder() throws Exception {
timer.reset();

// Build in multiple transactions.
try (OnlineIndexer indexBuilder = OnlineIndexer.forRecordStoreAndIndex(recordStore, "newIndex")) {
try (OnlineIndexer indexBuilder = newIndexer("newIndex")) {
indexBuilder.buildIndex();
}

Expand Down Expand Up @@ -682,7 +682,7 @@ public void testOnlineIndexMultiTargetBuilder() throws Exception {
}

// Build multiple indexes of typed records
try (OnlineIndexer indexBuilder = OnlineIndexer.newBuilder().setRecordStore(recordStore)
try (OnlineIndexer indexBuilder = newIndexerBuilder()
.addTargetIndex("newIndex")
.addTargetIndex("newSumIndex")
.addTargetIndex("newMaxIndex")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package com.apple.foundationdb.record.provider.foundationdb.indexes;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -35,18 +37,22 @@
/**
* Simple tests for Multidimensional Index.
*/
@Execution(ExecutionMode.CONCURRENT)
class SimpleMultidimensionalIndexTest extends MultidimensionalIndexTestBase {

static Stream<String> storageAdapterArgs() {
return Stream.of(BY_NODE.toString(), BY_SLOT.toString());
}

static Stream<Boolean> booleanArgs() {
return Stream.of(false, true);
}

static Stream<Arguments> argumentsForBasicReads() {
return Stream.of(
Arguments.of(BY_NODE.toString(), false, false),
Arguments.of(BY_NODE.toString(), false, true),
Arguments.of(BY_NODE.toString(), true, false),
Arguments.of(BY_NODE.toString(), true, true),
Arguments.of(BY_SLOT.toString(), false, false),
Arguments.of(BY_SLOT.toString(), false, true),
Arguments.of(BY_SLOT.toString(), true, false),
Arguments.of(BY_SLOT.toString(), true, true));
return storageAdapterArgs().flatMap(storageAdapter ->
booleanArgs().flatMap(storeHilbertValues ->
booleanArgs().map(useNodeSlotIndex ->
Arguments.of(storageAdapter, storeHilbertValues, useNodeSlotIndex))));
}

/**
Expand All @@ -57,62 +63,51 @@ static Stream<Arguments> argumentsForBasicReads() {
*
* @return a stream of arguments
*/
@Nonnull
static Stream<Arguments> argumentsForIndexReads() {
final Random random = new Random(System.currentTimeMillis());
return Stream.of(
Arguments.of(random.nextLong(), 100, BY_SLOT.toString(), false, false),
Arguments.of(random.nextLong(), 100, BY_SLOT.toString(), false, true),
Arguments.of(random.nextLong(), 100, BY_SLOT.toString(), true, false),
Arguments.of(random.nextLong(), 100, BY_SLOT.toString(), true, true),
Arguments.of(random.nextLong(), 100, BY_NODE.toString(), false, false),
Arguments.of(random.nextLong(), 100, BY_NODE.toString(), false, true),
Arguments.of(random.nextLong(), 100, BY_NODE.toString(), true, false),
Arguments.of(random.nextLong(), 100, BY_NODE.toString(), true, true),
Arguments.of(random.nextLong(), 500, BY_SLOT.toString(), false, false),
Arguments.of(random.nextLong(), 500, BY_SLOT.toString(), false, true),
Arguments.of(random.nextLong(), 500, BY_SLOT.toString(), true, false),
Arguments.of(random.nextLong(), 500, BY_SLOT.toString(), true, true),
Arguments.of(random.nextLong(), 500, BY_NODE.toString(), false, false),
Arguments.of(random.nextLong(), 500, BY_NODE.toString(), false, true),
Arguments.of(random.nextLong(), 500, BY_NODE.toString(), true, false),
Arguments.of(random.nextLong(), 500, BY_NODE.toString(), true, true),
// large values only for default config
Arguments.of(random.nextLong(), 1000, BY_NODE.toString(), true, false),
Arguments.of(random.nextLong(), 5000, BY_NODE.toString(), true, false)
);
return storageAdapterArgs().flatMap(storageAdapter ->
booleanArgs().flatMap(storeHilbertValue ->
booleanArgs().flatMap(useNodeSlotIndex ->
argumentsForIndexReads(random, storageAdapter, storeHilbertValue, useNodeSlotIndex))));
}

@Nonnull
static Stream<Arguments> argumentsForIndexReads(@Nonnull Random random, @Nonnull String storageAdapter, boolean storeHilbertValue, boolean useNodeSlotIndex) {
Arguments small = Arguments.of(random.nextLong(), 100, storageAdapter, storeHilbertValue, useNodeSlotIndex);
Arguments medium = Arguments.of(random.nextLong(), 500, storageAdapter, storeHilbertValue, useNodeSlotIndex);
// large values only for default config
if (storeHilbertValue && !useNodeSlotIndex && storageAdapter.equals(BY_NODE.toString())) {
Arguments large = Arguments.of(random.nextLong(), 1000, storageAdapter, storeHilbertValue, useNodeSlotIndex);
Arguments extraLarge = Arguments.of(random.nextLong(), 5000, storageAdapter, storeHilbertValue, useNodeSlotIndex);
return Stream.of(small, medium, large, extraLarge);
} else {
return Stream.of(small, medium);
}
}

@Nonnull
static Stream<Arguments> argumentsForIndexReadsAfterDeletes() {
final Random random = new Random(System.currentTimeMillis());
return Stream.of(
Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, BY_SLOT.toString(), false, false),
Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, BY_SLOT.toString(), false, true),
Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, BY_SLOT.toString(), true, false),
Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, BY_SLOT.toString(), true, true),
Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, BY_SLOT.toString(), false, false),
Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, BY_SLOT.toString(), false, true),
Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, BY_SLOT.toString(), true, false),
Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, BY_SLOT.toString(), true, true),
Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, BY_SLOT.toString(), false, false),
Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, BY_SLOT.toString(), false, true),
Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, BY_SLOT.toString(), true, false),
Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, BY_SLOT.toString(), true, true),
Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, BY_NODE.toString(), false, false),
Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, BY_NODE.toString(), false, true),
Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, BY_NODE.toString(), true, false),
Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, BY_NODE.toString(), true, true),
Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, BY_NODE.toString(), false, false),
Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, BY_NODE.toString(), false, true),
Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, BY_NODE.toString(), true, false),
Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, BY_NODE.toString(), true, true),
Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, BY_NODE.toString(), false, false),
Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, BY_NODE.toString(), false, true),
Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, BY_NODE.toString(), true, false),
Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, BY_NODE.toString(), true, true),
// large values only for default config
Arguments.of(random.nextLong(), 1000, random.nextInt(1000) + 1, BY_NODE.toString(), true, false),
Arguments.of(random.nextLong(), 5000, random.nextInt(1000) + 1, BY_NODE.toString(), true, false)
);
return storageAdapterArgs().flatMap(storageAdapter ->
booleanArgs().flatMap(storeHilbertValue ->
booleanArgs().flatMap(useNodeSlotIndex ->
argumentsForIndexReadsAfterDeletes(random, storageAdapter, storeHilbertValue, useNodeSlotIndex))));
}

@Nonnull
static Stream<Arguments> argumentsForIndexReadsAfterDeletes(@Nonnull Random random, @Nonnull String storageAdapter, boolean storeHilbertValue, boolean useNodeSlotIndex) {
Arguments extraSmall = Arguments.of(random.nextLong(), 10, random.nextInt(10) + 1, storageAdapter, storeHilbertValue, useNodeSlotIndex);
Arguments small = Arguments.of(random.nextLong(), 100, random.nextInt(100) + 1, storageAdapter, storeHilbertValue, useNodeSlotIndex);
Arguments medium = Arguments.of(random.nextLong(), 300, random.nextInt(300) + 1, storageAdapter, storeHilbertValue, useNodeSlotIndex);
// large values only for default config
if (storeHilbertValue && !useNodeSlotIndex && storageAdapter.equals(BY_NODE.toString())) {
Arguments large = Arguments.of(random.nextLong(), 1000, random.nextInt(1000) + 1, storageAdapter, storeHilbertValue, useNodeSlotIndex);
Arguments extraLarge = Arguments.of(random.nextLong(), 5000, random.nextInt(5000) + 1, storageAdapter, storeHilbertValue, useNodeSlotIndex);
return Stream.of(extraSmall, small, medium, large, extraLarge);
} else {
return Stream.of(extraSmall, small, medium);
}
}

/**
Expand Down
Loading

0 comments on commit c554503

Please sign in to comment.