Skip to content

Commit

Permalink
Fix some failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nisargthakkar committed Sep 5, 2024
1 parent 765663f commit 6fb9025
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

public void setUpStore() {
private void setUpStore() {
storeName = Utils.getUniqueString("store");
String owner = "test";
// set up push status store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ public void testMultiDataCenterRePushWithIncrementalPush() throws Exception {
.updateStore(
storeName,
new UpdateStoreQueryParams().setIncrementalPushEnabled(true)
.setHybridDataReplicationPolicy(DataReplicationPolicy.NONE)
.setHybridOffsetLagThreshold(1)
.setHybridRewindSeconds(Time.SECONDS_PER_DAY))
.isError());
Expand All @@ -616,11 +615,7 @@ public void testMultiDataCenterRePushWithIncrementalPush() throws Exception {
String incPushToRTVersion = System.currentTimeMillis() + "_test_inc_push_to_rt";
VeniceControllerWrapper parentController =
parentControllers.stream().filter(c -> c.isLeaderController(clusterName)).findAny().get();
incPushToRTWriter = startIncrementalPush(
parentControllerClient,
storeName,
parentController.getVeniceAdmin().getVeniceWriterFactory(),
incPushToRTVersion);
incPushToRTWriter = startIncrementalPush(parentControllerClient, storeName, incPushToRTVersion);
final String newVersionTopic = Version.composeKafkaTopic(
storeName,
parentControllerClient.getStore(storeName).getStore().getLargestUsedVersionNumber());
Expand Down Expand Up @@ -1009,7 +1004,6 @@ private void assertStoreHealth(ControllerClient controllerClient, String systemS
private VeniceWriter<String, String, byte[]> startIncrementalPush(
ControllerClient controllerClient,
String storeName,
VeniceWriterFactory veniceWriterFactory,
String incrementalPushVersion) {
VersionCreationResponse response = controllerClient.requestTopicForWrites(
storeName,
Expand All @@ -1026,8 +1020,9 @@ private VeniceWriter<String, String, byte[]> startIncrementalPush(
-1);
assertFalse(response.isError());
Assert.assertNotNull(response.getKafkaTopic());
VeniceWriterFactory veniceWriterFactory = new VeniceWriterFactory(new Properties(), null, null);
VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter(
new VeniceWriterOptions.Builder(response.getKafkaTopic())
new VeniceWriterOptions.Builder(response.getKafkaTopic()).setBrokerAddress(response.getKafkaBootstrapServers())
.setKeySerializer(new VeniceAvroKafkaSerializer(STRING_SCHEMA.toString()))
.setValueSerializer(new VeniceAvroKafkaSerializer(STRING_SCHEMA.toString()))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controller.util.AdminUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
Expand All @@ -34,7 +35,10 @@
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
Expand Down Expand Up @@ -64,8 +68,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -82,7 +86,6 @@ public class IngestionHeartBeatTest {
private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
private VeniceControllerWrapper parentController;
private List<VeniceMultiClusterWrapper> childDatacenters;
private String storeName;

@BeforeClass(alwaysRun = true)
public void setUp() {
Expand All @@ -109,15 +112,6 @@ public void setUp() {
this.parentController = parentControllers.get(0);
}

@AfterTest(alwaysRun = true)
public void cleanupStore() {
String parentControllerUrl = parentController.getControllerUrl();
try (ControllerClient parentControllerClient =
new ControllerClient(multiRegionMultiClusterWrapper.getClusterNames()[0], parentControllerUrl)) {
parentControllerClient.disableAndDeleteStore(storeName);
}
}

@DataProvider
public static Object[][] AAConfigAndIncPushAndDRPProvider() {
return DataProviderUtils
Expand All @@ -129,7 +123,7 @@ public void testIngestionHeartBeat(
boolean isActiveActiveEnabled,
boolean isIncrementalPushEnabled,
DataReplicationPolicy dataReplicationPolicy) throws IOException, InterruptedException {
storeName = Utils.getUniqueString("ingestionHeartBeatTest");
String storeName = Utils.getUniqueString("ingestionHeartBeatTest");
String parentControllerUrl = parentController.getControllerUrl();
File inputDir = getTempDataDirectory();
Schema recordSchema = writeSimpleAvroFileWithStringToNameRecordV1Schema(inputDir);
Expand All @@ -145,22 +139,47 @@ public void testIngestionHeartBeat(
assertCommand(
parentControllerClient
.createNewStore(storeName, "test_owner", keySchemaStr, NAME_RECORD_V1_SCHEMA.toString()));

assertCommand(
parentControllerClient.updateStore(
storeName,
new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA)
.setCompressionStrategy(CompressionStrategy.NO_OP)
.setHybridRewindSeconds(500L)
.setHybridOffsetLagThreshold(10L)
.setPartitionCount(2)
.setReplicationFactor(2)
.setNativeReplicationEnabled(true)));

UpdateStoreQueryParams updateStoreParams =
new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA)
.setCompressionStrategy(CompressionStrategy.NO_OP)
.setIncrementalPushEnabled(isIncrementalPushEnabled)
.setHybridRewindSeconds(500L)
.setHybridOffsetLagThreshold(10L)
.setPartitionCount(2)
.setReplicationFactor(2)
.setNativeReplicationEnabled(true)
new UpdateStoreQueryParams().setIncrementalPushEnabled(isIncrementalPushEnabled)
.setActiveActiveReplicationEnabled(isActiveActiveEnabled)
.setHybridDataReplicationPolicy(dataReplicationPolicy);

ControllerResponse updateStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams));

assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError());
HybridStoreConfig expectedHybridStoreConfig =
new HybridStoreConfigImpl(500L, 10L, -1, dataReplicationPolicy, BufferReplayPolicy.REWIND_FROM_EOP);

boolean isIncrementalPushAllowed =
AdminUtils.isIncrementalPushSupported(true, isActiveActiveEnabled, expectedHybridStoreConfig);

// ACTIVE_ACTIVE DRP is only supported for stores with AA enabled
boolean isAAConfigSupported =
isActiveActiveEnabled || dataReplicationPolicy != DataReplicationPolicy.ACTIVE_ACTIVE;

// Push should succeed if:
// 1. It is a full push, or
// 2. It is an incremental push, and incremental push is allowed
boolean shouldPushSucceed = !isIncrementalPushEnabled || isIncrementalPushAllowed;

boolean isConfigSupported = isAAConfigSupported && shouldPushSucceed;
if (isConfigSupported) {
assertCommand(updateStoreResponse);
} else {
assertTrue(updateStoreResponse.isError());
}

VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id", 1000);
assertEquals(response.getVersion(), 1);
Expand All @@ -176,26 +195,43 @@ public void testIngestionHeartBeat(
String childControllerUrl = childDatacenters.get(0).getRandomController().getControllerUrl();
try (ControllerClient childControllerClient = new ControllerClient(CLUSTER_NAME, childControllerUrl)) {
runVPJ(vpjProperties, expectedVersionNumber, childControllerClient);
if (!shouldPushSucceed) {
Assert.fail("Push should have failed");
}
} catch (Exception e) {
if (shouldPushSucceed) {
Assert.fail("Push should not fail", e);
}
}
VeniceClusterWrapper veniceClusterWrapper = childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
veniceClusterWrapper.waitVersion(storeName, expectedVersionNumber);

// Verify data pushed via full push/inc push using client
try (AvroGenericStoreClient<Object, Object> storeReader = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()))) {
TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> {
try {
for (int i = 1; i < 100; i++) {
String key = String.valueOf(i);
GenericRecord value = readValue(storeReader, key);
assertNotNull(value, "Key " + key + " should not be missing!");
assertEquals(value.get("firstName").toString(), "first_name_" + key);
assertEquals(value.get("lastName").toString(), "last_name_" + key);
if (shouldPushSucceed) {
// Verify data pushed via full push/inc push using client
try (AvroGenericStoreClient<Object, Object> storeReader = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName)
.setVeniceURL(veniceClusterWrapper.getRandomRouterURL()))) {
TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> {
try {
for (int i = 1; i < 100; i++) {
String key = String.valueOf(i);
GenericRecord value = readValue(storeReader, key);
assertNotNull(value, "Key " + key + " should not be missing!");
assertEquals(value.get("firstName").toString(), "first_name_" + key);
assertEquals(value.get("lastName").toString(), "last_name_" + key);
}
} catch (Exception e) {
throw new VeniceException(e);
}
} catch (Exception e) {
throw new VeniceException(e);
}
});
});
}
}

// Since the config combination is not supported, we can either validate the heartbeats using the default values,
// or skip the validation. Here, we choose to skip it since the default validation case will be one of the
// permutations where the heartbeats will get validated.
if (!isConfigSupported) {
return;
}

// create consumer to consume from RT/VT to verify HB and Leader completed header
Expand Down Expand Up @@ -278,6 +314,7 @@ private void verifyHBinKafkaTopic(
break;
}
}

if ((!isIncrementalPushEnabled || isActiveActiveEnabled)
&& (isActiveActiveEnabled || dataReplicationPolicy != DataReplicationPolicy.AGGREGATE)) {
assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,44 @@ public static int getRmdVersionID(Admin admin, String storeName, String clusterN
LOGGER.info("Use RMD version ID {} for cluster {}", rmdVersionID, clusterName);
return rmdVersionID;
}

/**
* Check if a store can support incremental pushes based on other configs. The following rules define when incremental
* push is allowed:
* <ol>
* <li>If the system is running in single-region mode, the store must by hybrid</li>
* <li>If the system is running in multi-region mode,</li>
* <ol type="i">
* <li>Hybrid + Active-Active</li>
* <li>Hybrid + !Active-Active + {@link DataReplicationPolicy} is {@link DataReplicationPolicy#AGGREGATE}</li>
* <li>Hybrid + !Active-Active + {@link DataReplicationPolicy} is {@link DataReplicationPolicy#NONE}</li>
* </ol>
* <ol/>
* @param multiRegion whether the system is running in multi-region mode
* @param hybridStoreConfig The hybrid store config after applying all updates
* @return {@code true} if incremental push is allowed, {@code false} otherwise
*/
public static boolean isIncrementalPushSupported(
boolean multiRegion,
boolean activeActiveReplicationEnabled,
HybridStoreConfig hybridStoreConfig) {
// Only hybrid stores can support incremental push
if (!AdminUtils.isHybrid(hybridStoreConfig)) {
return false;
}

// If the system is running in multi-region mode, we need to validate the data replication policies
if (!multiRegion) {
return true;
}

// A/A can always support incremental push
if (activeActiveReplicationEnabled) {
return true;
}

DataReplicationPolicy dataReplicationPolicy = hybridStoreConfig.getDataReplicationPolicy();
return dataReplicationPolicy == DataReplicationPolicy.AGGREGATE
|| dataReplicationPolicy == DataReplicationPolicy.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ public static UpdateStoreWrapper getStoreUpdate(
INCREMENTAL_PUSH_ENABLED,
updatedConfigs,
() -> updatedStore.setIncrementalPushEnabled(
isIncrementalPushEnabled(
AdminUtils.isIncrementalPushSupported(
clusterConfig.isMultiRegion(),
updatedStore.isActiveActiveReplicationEnabled(),
newHybridStoreConfig)));
Expand Down Expand Up @@ -722,46 +722,6 @@ static void updateInferredConfigsForBatchToHybrid(
}
}

/**
* Check if a store can support incremental pushes based on other configs. The following rules define when incremental
* push is allowed:
* <ol>
* <li>If the system is running in single-region mode, the store must by hybrid</li>
* <li>If the system is running in multi-region mode,</li>
* <ol type="i">
* <li>Hybrid + Active-Active</li>
* <li>Hybrid + !Active-Active + {@link DataReplicationPolicy} is {@link DataReplicationPolicy#AGGREGATE}</li>
* <li>Hybrid + !Active-Active + {@link DataReplicationPolicy} is {@link DataReplicationPolicy#NONE}</li>
* </ol>
* <ol/>
* @param multiRegion whether the system is running in multi-region mode
* @param hybridStoreConfig The hybrid store config after applying all updates
* @return {@code true} if incremental push is allowed, {@code false} otherwise
*/
static boolean isIncrementalPushEnabled(
boolean multiRegion,
boolean activeActiveReplicationEnabled,
HybridStoreConfig hybridStoreConfig) {
// Only hybrid stores can support incremental push
if (!AdminUtils.isHybrid(hybridStoreConfig)) {
return false;
}

// If the system is running in multi-region mode, we need to validate the data replication policies
if (!multiRegion) {
return true;
}

// A/A can always support incremental push
if (activeActiveReplicationEnabled) {
return true;
}

DataReplicationPolicy dataReplicationPolicy = hybridStoreConfig.getDataReplicationPolicy();
return dataReplicationPolicy == DataReplicationPolicy.AGGREGATE
|| dataReplicationPolicy == DataReplicationPolicy.NONE;
}

/**
* Validate if the specified store is in a valid state or not
* Examples of such checks are:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.ConfigConstants;
Expand Down Expand Up @@ -131,4 +132,36 @@ public void testGetRmdVersionID() {
verify(multiClusterConfig, never()).getControllerConfig(any());
verify(controllerConfig, never()).getReplicationMetadataVersion();
}

@Test
public void testIsIncrementalPushSupported() {
HybridStoreConfig nonHybridConfig =
new HybridStoreConfigImpl(-1, -1, -1, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP);
HybridStoreConfig hybridConfigWithNonAggregateDRP = new HybridStoreConfigImpl(
100,
1000,
-1,
DataReplicationPolicy.NON_AGGREGATE,
BufferReplayPolicy.REWIND_FROM_EOP);
HybridStoreConfig hybridConfigWithAggregateDRP =
new HybridStoreConfigImpl(100, 1000, -1, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP);
HybridStoreConfig hybridConfigWithNoneDRP =
new HybridStoreConfigImpl(100, 1000, -1, DataReplicationPolicy.NONE, BufferReplayPolicy.REWIND_FROM_EOP);

// In single-region mode, any hybrid store should have incremental push enabled.
assertFalse(AdminUtils.isIncrementalPushSupported(false, false, null));
assertFalse(AdminUtils.isIncrementalPushSupported(false, false, nonHybridConfig));
assertTrue(AdminUtils.isIncrementalPushSupported(false, false, hybridConfigWithNonAggregateDRP));
assertTrue(AdminUtils.isIncrementalPushSupported(false, false, hybridConfigWithAggregateDRP));
assertTrue(AdminUtils.isIncrementalPushSupported(false, false, hybridConfigWithNoneDRP));

// In multi-region mode, hybrid stores with NON_AGGREGATE DataReplicationPolicy should not have incremental push
// enabled.
assertFalse(AdminUtils.isIncrementalPushSupported(true, false, null));
assertFalse(AdminUtils.isIncrementalPushSupported(true, false, nonHybridConfig));
assertFalse(AdminUtils.isIncrementalPushSupported(true, false, hybridConfigWithNonAggregateDRP));
assertTrue(AdminUtils.isIncrementalPushSupported(true, false, hybridConfigWithAggregateDRP));
assertTrue(AdminUtils.isIncrementalPushSupported(true, false, hybridConfigWithNoneDRP));
assertTrue(AdminUtils.isIncrementalPushSupported(true, true, hybridConfigWithNonAggregateDRP));
}
}
Loading

0 comments on commit 6fb9025

Please sign in to comment.