From 6fb902545929e1b088e5ff9cd5db962dc5d8f7e4 Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Fri, 9 Aug 2024 15:33:35 -0700 Subject: [PATCH] Fix some failing tests --- .../PushStatusStoreMultiColoTest.java | 2 +- .../TestPushJobWithNativeReplication.java | 11 +- .../IngestionHeartBeatTest.java | 109 ++++++++++++------ .../venice/controller/util/AdminUtils.java | 40 +++++++ .../controller/util/UpdateStoreUtils.java | 42 +------ .../controller/util/AdminUtilsTest.java | 33 ++++++ .../controller/util/UpdateStoreUtilsTest.java | 32 ----- 7 files changed, 151 insertions(+), 118 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java index 22b2d801b83..bdcf1f0c45a 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreMultiColoTest.java @@ -111,7 +111,7 @@ public void cleanUp() { Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); } - public void setUpStore() { + private void setUpStore() { storeName = Utils.getUniqueString("store"); String owner = "test"; // set up push status store. diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java index e84810cdbeb..b78d1e8439d 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java @@ -596,7 +596,6 @@ public void testMultiDataCenterRePushWithIncrementalPush() throws Exception { .updateStore( storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true) - .setHybridDataReplicationPolicy(DataReplicationPolicy.NONE) .setHybridOffsetLagThreshold(1) .setHybridRewindSeconds(Time.SECONDS_PER_DAY)) .isError()); @@ -616,11 +615,7 @@ public void testMultiDataCenterRePushWithIncrementalPush() throws Exception { String incPushToRTVersion = System.currentTimeMillis() + "_test_inc_push_to_rt"; VeniceControllerWrapper parentController = parentControllers.stream().filter(c -> c.isLeaderController(clusterName)).findAny().get(); - incPushToRTWriter = startIncrementalPush( - parentControllerClient, - storeName, - parentController.getVeniceAdmin().getVeniceWriterFactory(), - incPushToRTVersion); + incPushToRTWriter = startIncrementalPush(parentControllerClient, storeName, incPushToRTVersion); final String newVersionTopic = Version.composeKafkaTopic( storeName, parentControllerClient.getStore(storeName).getStore().getLargestUsedVersionNumber()); @@ -1009,7 +1004,6 @@ private void assertStoreHealth(ControllerClient controllerClient, String systemS private VeniceWriter startIncrementalPush( ControllerClient controllerClient, String storeName, - VeniceWriterFactory veniceWriterFactory, String incrementalPushVersion) { VersionCreationResponse response = controllerClient.requestTopicForWrites( storeName, @@ -1026,8 +1020,9 @@ private VeniceWriter startIncrementalPush( -1); assertFalse(response.isError()); Assert.assertNotNull(response.getKafkaTopic()); + VeniceWriterFactory veniceWriterFactory = new VeniceWriterFactory(new Properties(), null, null); VeniceWriter veniceWriter = veniceWriterFactory.createVeniceWriter( - new VeniceWriterOptions.Builder(response.getKafkaTopic()) + new VeniceWriterOptions.Builder(response.getKafkaTopic()).setBrokerAddress(response.getKafkaBootstrapServers()) .setKeySerializer(new VeniceAvroKafkaSerializer(STRING_SCHEMA.toString())) .setValueSerializer(new VeniceAvroKafkaSerializer(STRING_SCHEMA.toString())) .build()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/ingestionHeartbeat/IngestionHeartBeatTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/ingestionHeartbeat/IngestionHeartBeatTest.java index 5a841d318fa..1b6e8814e4a 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/ingestionHeartbeat/IngestionHeartBeatTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/ingestionHeartbeat/IngestionHeartBeatTest.java @@ -20,6 +20,7 @@ import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.controller.util.AdminUtils; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; @@ -34,7 +35,10 @@ import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.BufferReplayPolicy; import com.linkedin.venice.meta.DataReplicationPolicy; +import com.linkedin.venice.meta.HybridStoreConfig; +import com.linkedin.venice.meta.HybridStoreConfigImpl; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; @@ -64,8 +68,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.testng.Assert; import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -82,7 +86,6 @@ public class IngestionHeartBeatTest { private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; private VeniceControllerWrapper parentController; private List childDatacenters; - private String storeName; @BeforeClass(alwaysRun = true) public void setUp() { @@ -109,15 +112,6 @@ public void setUp() { this.parentController = parentControllers.get(0); } - @AfterTest(alwaysRun = true) - public void cleanupStore() { - String parentControllerUrl = parentController.getControllerUrl(); - try (ControllerClient parentControllerClient = - new ControllerClient(multiRegionMultiClusterWrapper.getClusterNames()[0], parentControllerUrl)) { - parentControllerClient.disableAndDeleteStore(storeName); - } - } - @DataProvider public static Object[][] AAConfigAndIncPushAndDRPProvider() { return DataProviderUtils @@ -129,7 +123,7 @@ public void testIngestionHeartBeat( boolean isActiveActiveEnabled, boolean isIncrementalPushEnabled, DataReplicationPolicy dataReplicationPolicy) throws IOException, InterruptedException { - storeName = Utils.getUniqueString("ingestionHeartBeatTest"); + String storeName = Utils.getUniqueString("ingestionHeartBeatTest"); String parentControllerUrl = parentController.getControllerUrl(); File inputDir = getTempDataDirectory(); Schema recordSchema = writeSimpleAvroFileWithStringToNameRecordV1Schema(inputDir); @@ -145,22 +139,47 @@ public void testIngestionHeartBeat( assertCommand( parentControllerClient .createNewStore(storeName, "test_owner", keySchemaStr, NAME_RECORD_V1_SCHEMA.toString())); + + assertCommand( + parentControllerClient.updateStore( + storeName, + new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) + .setCompressionStrategy(CompressionStrategy.NO_OP) + .setHybridRewindSeconds(500L) + .setHybridOffsetLagThreshold(10L) + .setPartitionCount(2) + .setReplicationFactor(2) + .setNativeReplicationEnabled(true))); + UpdateStoreQueryParams updateStoreParams = - new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) - .setCompressionStrategy(CompressionStrategy.NO_OP) - .setIncrementalPushEnabled(isIncrementalPushEnabled) - .setHybridRewindSeconds(500L) - .setHybridOffsetLagThreshold(10L) - .setPartitionCount(2) - .setReplicationFactor(2) - .setNativeReplicationEnabled(true) + new UpdateStoreQueryParams().setIncrementalPushEnabled(isIncrementalPushEnabled) .setActiveActiveReplicationEnabled(isActiveActiveEnabled) .setHybridDataReplicationPolicy(dataReplicationPolicy); ControllerResponse updateStoreResponse = parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams)); - assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError()); + HybridStoreConfig expectedHybridStoreConfig = + new HybridStoreConfigImpl(500L, 10L, -1, dataReplicationPolicy, BufferReplayPolicy.REWIND_FROM_EOP); + + boolean isIncrementalPushAllowed = + AdminUtils.isIncrementalPushSupported(true, isActiveActiveEnabled, expectedHybridStoreConfig); + + // ACTIVE_ACTIVE DRP is only supported for stores with AA enabled + boolean isAAConfigSupported = + isActiveActiveEnabled || dataReplicationPolicy != DataReplicationPolicy.ACTIVE_ACTIVE; + + // Push should succeed if: + // 1. It is a full push, or + // 2. It is an incremental push, and incremental push is allowed + boolean shouldPushSucceed = !isIncrementalPushEnabled || isIncrementalPushAllowed; + + boolean isConfigSupported = isAAConfigSupported && shouldPushSucceed; + if (isConfigSupported) { + assertCommand(updateStoreResponse); + } else { + assertTrue(updateStoreResponse.isError()); + } VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id", 1000); assertEquals(response.getVersion(), 1); @@ -176,26 +195,43 @@ public void testIngestionHeartBeat( String childControllerUrl = childDatacenters.get(0).getRandomController().getControllerUrl(); try (ControllerClient childControllerClient = new ControllerClient(CLUSTER_NAME, childControllerUrl)) { runVPJ(vpjProperties, expectedVersionNumber, childControllerClient); + if (!shouldPushSucceed) { + Assert.fail("Push should have failed"); + } + } catch (Exception e) { + if (shouldPushSucceed) { + Assert.fail("Push should not fail", e); + } } VeniceClusterWrapper veniceClusterWrapper = childDatacenters.get(0).getClusters().get(CLUSTER_NAME); veniceClusterWrapper.waitVersion(storeName, expectedVersionNumber); - // Verify data pushed via full push/inc push using client - try (AvroGenericStoreClient storeReader = ClientFactory.getAndStartGenericAvroClient( - ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()))) { - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { - try { - for (int i = 1; i < 100; i++) { - String key = String.valueOf(i); - GenericRecord value = readValue(storeReader, key); - assertNotNull(value, "Key " + key + " should not be missing!"); - assertEquals(value.get("firstName").toString(), "first_name_" + key); - assertEquals(value.get("lastName").toString(), "last_name_" + key); + if (shouldPushSucceed) { + // Verify data pushed via full push/inc push using client + try (AvroGenericStoreClient storeReader = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(veniceClusterWrapper.getRandomRouterURL()))) { + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + try { + for (int i = 1; i < 100; i++) { + String key = String.valueOf(i); + GenericRecord value = readValue(storeReader, key); + assertNotNull(value, "Key " + key + " should not be missing!"); + assertEquals(value.get("firstName").toString(), "first_name_" + key); + assertEquals(value.get("lastName").toString(), "last_name_" + key); + } + } catch (Exception e) { + throw new VeniceException(e); } - } catch (Exception e) { - throw new VeniceException(e); - } - }); + }); + } + } + + // Since the config combination is not supported, we can either validate the heartbeats using the default values, + // or skip the validation. Here, we choose to skip it since the default validation case will be one of the + // permutations where the heartbeats will get validated. + if (!isConfigSupported) { + return; } // create consumer to consume from RT/VT to verify HB and Leader completed header @@ -278,6 +314,7 @@ private void verifyHBinKafkaTopic( break; } } + if ((!isIncrementalPushEnabled || isActiveActiveEnabled) && (isActiveActiveEnabled || dataReplicationPolicy != DataReplicationPolicy.AGGREGATE)) { assertTrue( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/AdminUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/AdminUtils.java index 37f661533f7..5546bd2ddf7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/AdminUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/AdminUtils.java @@ -70,4 +70,44 @@ public static int getRmdVersionID(Admin admin, String storeName, String clusterN LOGGER.info("Use RMD version ID {} for cluster {}", rmdVersionID, clusterName); return rmdVersionID; } + + /** + * Check if a store can support incremental pushes based on other configs. The following rules define when incremental + * push is allowed: + *
    + *
  1. If the system is running in single-region mode, the store must by hybrid
  2. + *
  3. If the system is running in multi-region mode,
  4. + *
      + *
    1. Hybrid + Active-Active
    2. + *
    3. Hybrid + !Active-Active + {@link DataReplicationPolicy} is {@link DataReplicationPolicy#AGGREGATE}
    4. + *
    5. Hybrid + !Active-Active + {@link DataReplicationPolicy} is {@link DataReplicationPolicy#NONE}
    6. + *
    + *
      + * @param multiRegion whether the system is running in multi-region mode + * @param hybridStoreConfig The hybrid store config after applying all updates + * @return {@code true} if incremental push is allowed, {@code false} otherwise + */ + public static boolean isIncrementalPushSupported( + boolean multiRegion, + boolean activeActiveReplicationEnabled, + HybridStoreConfig hybridStoreConfig) { + // Only hybrid stores can support incremental push + if (!AdminUtils.isHybrid(hybridStoreConfig)) { + return false; + } + + // If the system is running in multi-region mode, we need to validate the data replication policies + if (!multiRegion) { + return true; + } + + // A/A can always support incremental push + if (activeActiveReplicationEnabled) { + return true; + } + + DataReplicationPolicy dataReplicationPolicy = hybridStoreConfig.getDataReplicationPolicy(); + return dataReplicationPolicy == DataReplicationPolicy.AGGREGATE + || dataReplicationPolicy == DataReplicationPolicy.NONE; + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/UpdateStoreUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/UpdateStoreUtils.java index bec548ffb4f..a031d4cf7d3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/UpdateStoreUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/util/UpdateStoreUtils.java @@ -470,7 +470,7 @@ public static UpdateStoreWrapper getStoreUpdate( INCREMENTAL_PUSH_ENABLED, updatedConfigs, () -> updatedStore.setIncrementalPushEnabled( - isIncrementalPushEnabled( + AdminUtils.isIncrementalPushSupported( clusterConfig.isMultiRegion(), updatedStore.isActiveActiveReplicationEnabled(), newHybridStoreConfig))); @@ -722,46 +722,6 @@ static void updateInferredConfigsForBatchToHybrid( } } - /** - * Check if a store can support incremental pushes based on other configs. The following rules define when incremental - * push is allowed: - *
        - *
      1. If the system is running in single-region mode, the store must by hybrid
      2. - *
      3. If the system is running in multi-region mode,
      4. - *
          - *
        1. Hybrid + Active-Active
        2. - *
        3. Hybrid + !Active-Active + {@link DataReplicationPolicy} is {@link DataReplicationPolicy#AGGREGATE}
        4. - *
        5. Hybrid + !Active-Active + {@link DataReplicationPolicy} is {@link DataReplicationPolicy#NONE}
        6. - *
        - *
          - * @param multiRegion whether the system is running in multi-region mode - * @param hybridStoreConfig The hybrid store config after applying all updates - * @return {@code true} if incremental push is allowed, {@code false} otherwise - */ - static boolean isIncrementalPushEnabled( - boolean multiRegion, - boolean activeActiveReplicationEnabled, - HybridStoreConfig hybridStoreConfig) { - // Only hybrid stores can support incremental push - if (!AdminUtils.isHybrid(hybridStoreConfig)) { - return false; - } - - // If the system is running in multi-region mode, we need to validate the data replication policies - if (!multiRegion) { - return true; - } - - // A/A can always support incremental push - if (activeActiveReplicationEnabled) { - return true; - } - - DataReplicationPolicy dataReplicationPolicy = hybridStoreConfig.getDataReplicationPolicy(); - return dataReplicationPolicy == DataReplicationPolicy.AGGREGATE - || dataReplicationPolicy == DataReplicationPolicy.NONE; - } - /** * Validate if the specified store is in a valid state or not * Examples of such checks are: diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/util/AdminUtilsTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/util/AdminUtilsTest.java index b636814704c..3857b97d734 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/util/AdminUtilsTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/util/AdminUtilsTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import com.linkedin.venice.ConfigConstants; @@ -131,4 +132,36 @@ public void testGetRmdVersionID() { verify(multiClusterConfig, never()).getControllerConfig(any()); verify(controllerConfig, never()).getReplicationMetadataVersion(); } + + @Test + public void testIsIncrementalPushSupported() { + HybridStoreConfig nonHybridConfig = + new HybridStoreConfigImpl(-1, -1, -1, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); + HybridStoreConfig hybridConfigWithNonAggregateDRP = new HybridStoreConfigImpl( + 100, + 1000, + -1, + DataReplicationPolicy.NON_AGGREGATE, + BufferReplayPolicy.REWIND_FROM_EOP); + HybridStoreConfig hybridConfigWithAggregateDRP = + new HybridStoreConfigImpl(100, 1000, -1, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); + HybridStoreConfig hybridConfigWithNoneDRP = + new HybridStoreConfigImpl(100, 1000, -1, DataReplicationPolicy.NONE, BufferReplayPolicy.REWIND_FROM_EOP); + + // In single-region mode, any hybrid store should have incremental push enabled. + assertFalse(AdminUtils.isIncrementalPushSupported(false, false, null)); + assertFalse(AdminUtils.isIncrementalPushSupported(false, false, nonHybridConfig)); + assertTrue(AdminUtils.isIncrementalPushSupported(false, false, hybridConfigWithNonAggregateDRP)); + assertTrue(AdminUtils.isIncrementalPushSupported(false, false, hybridConfigWithAggregateDRP)); + assertTrue(AdminUtils.isIncrementalPushSupported(false, false, hybridConfigWithNoneDRP)); + + // In multi-region mode, hybrid stores with NON_AGGREGATE DataReplicationPolicy should not have incremental push + // enabled. + assertFalse(AdminUtils.isIncrementalPushSupported(true, false, null)); + assertFalse(AdminUtils.isIncrementalPushSupported(true, false, nonHybridConfig)); + assertFalse(AdminUtils.isIncrementalPushSupported(true, false, hybridConfigWithNonAggregateDRP)); + assertTrue(AdminUtils.isIncrementalPushSupported(true, false, hybridConfigWithAggregateDRP)); + assertTrue(AdminUtils.isIncrementalPushSupported(true, false, hybridConfigWithNoneDRP)); + assertTrue(AdminUtils.isIncrementalPushSupported(true, true, hybridConfigWithNonAggregateDRP)); + } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/util/UpdateStoreUtilsTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/util/UpdateStoreUtilsTest.java index d166197b9e0..33555036be8 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/util/UpdateStoreUtilsTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/util/UpdateStoreUtilsTest.java @@ -285,38 +285,6 @@ public void testUpdateInferredConfigsForBatchToHybrid() { verify(admin, never()).addDerivedSchema(eq(clusterName), eq(storeName), anyInt(), anyString()); } - @Test - public void testIsIncrementalPushEnabled() { - HybridStoreConfig nonHybridConfig = - new HybridStoreConfigImpl(-1, -1, -1, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); - HybridStoreConfig hybridConfigWithNonAggregateDRP = new HybridStoreConfigImpl( - 100, - 1000, - -1, - DataReplicationPolicy.NON_AGGREGATE, - BufferReplayPolicy.REWIND_FROM_EOP); - HybridStoreConfig hybridConfigWithAggregateDRP = - new HybridStoreConfigImpl(100, 1000, -1, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP); - HybridStoreConfig hybridConfigWithNoneDRP = - new HybridStoreConfigImpl(100, 1000, -1, DataReplicationPolicy.NONE, BufferReplayPolicy.REWIND_FROM_EOP); - - // In single-region mode, any hybrid store should have incremental push enabled. - assertFalse(UpdateStoreUtils.isIncrementalPushEnabled(false, false, null)); - assertFalse(UpdateStoreUtils.isIncrementalPushEnabled(false, false, nonHybridConfig)); - assertTrue(UpdateStoreUtils.isIncrementalPushEnabled(false, false, hybridConfigWithNonAggregateDRP)); - assertTrue(UpdateStoreUtils.isIncrementalPushEnabled(false, false, hybridConfigWithAggregateDRP)); - assertTrue(UpdateStoreUtils.isIncrementalPushEnabled(false, false, hybridConfigWithNoneDRP)); - - // In multi-region mode, hybrid stores with NON_AGGREGATE DataReplicationPolicy should not have incremental push - // enabled. - assertFalse(UpdateStoreUtils.isIncrementalPushEnabled(true, false, null)); - assertFalse(UpdateStoreUtils.isIncrementalPushEnabled(true, false, nonHybridConfig)); - assertFalse(UpdateStoreUtils.isIncrementalPushEnabled(true, false, hybridConfigWithNonAggregateDRP)); - assertTrue(UpdateStoreUtils.isIncrementalPushEnabled(true, false, hybridConfigWithAggregateDRP)); - assertTrue(UpdateStoreUtils.isIncrementalPushEnabled(true, false, hybridConfigWithNoneDRP)); - assertTrue(UpdateStoreUtils.isIncrementalPushEnabled(true, true, hybridConfigWithNonAggregateDRP)); - } - @Test public void testValidateStoreConfigs() { String clusterName = "clusterName";