diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java index 2e99bdec95..7798a43551 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DataRecoveryTest.java @@ -56,6 +56,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.samza.config.MapConfig; @@ -188,7 +189,40 @@ public void testStartDataRecoveryAPIs() { } } - @Test(timeOut = TEST_TIMEOUT) + private void performDataRecoveryTest( + String storeName, + ControllerClient parentControllerClient, + ControllerClient destColoClient, + String src, + String dest, + int times, + int expectedStartVersionOnDest) throws ExecutionException, InterruptedException { + for (int version = expectedStartVersionOnDest; version < times + expectedStartVersionOnDest; version++) { + // Prepare dest fabric for data recovery. + Assert.assertFalse( + parentControllerClient.prepareDataRecovery(src, dest, storeName, VERSION_ID_UNSET, Optional.empty()) + .isError()); + // Initiate data recovery, a new version will be created in dest fabric. + Assert.assertFalse( + parentControllerClient.dataRecovery(src, dest, storeName, VERSION_ID_UNSET, false, true, Optional.empty()) + .isError()); + int finalVersion = version; + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { + Assert.assertEquals(destColoClient.getStore(storeName).getStore().getCurrentVersion(), finalVersion); + }); + try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(childDatacenters.get(1).getClusters().get(clusterName).getRandomRouterURL()))) { + for (int i = 0; i < 10; i++) { + Object v = client.get(String.valueOf(i)).get(); + Assert.assertNotNull(v, "Batch data should be consumed already in data center " + dest); + Assert.assertEquals(v.toString(), String.valueOf(i)); + } + } + } + } + + @Test(timeOut = 2 * TEST_TIMEOUT) public void testBatchOnlyDataRecovery() throws Exception { String storeName = Utils.getUniqueString("dataRecovery-store-batch"); String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString(); @@ -234,27 +268,26 @@ public void testBatchOnlyDataRecovery() throws Exception { parentControllerClient, 60, TimeUnit.SECONDS); - // Prepare dc-1 for data recovery - Assert.assertFalse( - parentControllerClient.prepareDataRecovery("dc-0", "dc-1", storeName, VERSION_ID_UNSET, Optional.empty()) + + // Verify that if same data center are given as both src and dest to data recovery api, client response should + // have errors. + String sameFabricName = "dc-0"; + Assert.assertTrue( + parentControllerClient + .prepareDataRecovery(sameFabricName, sameFabricName, storeName, VERSION_ID_UNSET, Optional.empty()) .isError()); - // Initiate data recovery, a new version will be created in dest fabric - Assert.assertFalse( + Assert.assertTrue( parentControllerClient - .dataRecovery("dc-0", "dc-1", storeName, VERSION_ID_UNSET, false, true, Optional.empty()) + .dataRecovery(sameFabricName, sameFabricName, storeName, VERSION_ID_UNSET, false, true, Optional.empty()) .isError()); - TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { - Assert.assertEquals(dc1Client.getStore(storeName).getStore().getCurrentVersion(), 2); - }); - try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( - ClientConfig.defaultGenericClientConfig(storeName) - .setVeniceURL(childDatacenters.get(1).getClusters().get(clusterName).getRandomRouterURL()))) { - for (int i = 0; i < 10; i++) { - Object v = client.get(String.valueOf(i)).get(); - Assert.assertNotNull(v, "Batch data should be consumed already in data center dc-1"); - Assert.assertEquals(v.toString(), String.valueOf(i)); - } - } + + /* + * Before data recovery, current version in dc-0 and dc-1 is 1. + * With two rounds of dc-0 -> dc-1 data recovery, current version in dc-1 changes to 2 and then 3. + * Then with two rounds of dc-1 -> dc-0 data recovery, current version in dc-0 becomes 3 and then 4. + */ + performDataRecoveryTest(storeName, parentControllerClient, dc1Client, "dc-0", "dc-1", 2, 2); + performDataRecoveryTest(storeName, parentControllerClient, dc0Client, "dc-1", "dc-0", 2, 3); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 9ad32cb7fc..f475bfde05 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -3622,10 +3622,16 @@ private boolean whetherToCreateNewDataRecoveryVersion( String clusterName, StoreInfo destStore, int versionNumber) { - // Currently new version data recovery is only supported for batch-only store. - // For existing data centers, current store version might be serving read requests. Need to create a new version. - // For new data centers or non-current version, it's ok to delete and recreate it. No need to create a new version. - return destStore.getHybridStoreConfig() == null && versionNumber == destStore.getCurrentVersion() + /** + * Creating a new data recovery version on the destination colo when satisfying: + * 1. New version data recovery is only supported for batch-only store. + * 2. For the existing destination data center, a new version is needed if + * 2.1. srcVersionNumber equals to the current version in dest colo, as current version is serving read requests. + * 2.2. srcVersionNumber is less than the current version in dest colo, because Venice normally assumes that a + * new version always have a larger version number than previous ones + * e.g. {@link StoreBackupVersionCleanupService#cleanupBackupVersion(Store, String)}. + */ + return destStore.getHybridStoreConfig() == null && versionNumber <= destStore.getCurrentVersion() && multiClusterConfigs.getControllerConfig(clusterName).getChildDataCenterAllowlist().contains(destFabric); } @@ -3641,6 +3647,13 @@ public void initiateDataRecovery( String destinationFabric, boolean copyAllVersionConfigs, Optional ignored) { + if (Objects.equals(sourceFabric, destinationFabric)) { + throw new VeniceException( + String.format( + "Source ({}) and destination ({}) cannot be the same data center", + sourceFabric, + destinationFabric)); + } StoreInfo srcStore = getStoreInChildRegion(sourceFabric, clusterName, storeName); if (version == VERSION_ID_UNSET) { version = srcStore.getCurrentVersion(); @@ -3661,9 +3674,9 @@ public void initiateDataRecovery( parentStore.setLargestUsedVersionNumber(newVersion); repository.updateStore(parentStore); LOGGER.info( - "Current version {}_v{} in {} might be serving read requests. Copying data to a new version {}.", - storeName, + "version {} is less or equal to in the current version of {} in {}. Copying data to a new version {}.", version, + storeName, destinationFabric, newVersion); version = newVersion; @@ -3690,17 +3703,20 @@ public void prepareDataRecovery( String sourceFabric, String destinationFabric, Optional ignored) { + if (Objects.equals(sourceFabric, destinationFabric)) { + throw new VeniceException( + String.format( + "Source ({}) and destination ({}) cannot be the same data center", + sourceFabric, + destinationFabric)); + } StoreInfo srcStore = getStoreInChildRegion(sourceFabric, clusterName, storeName); if (version == VERSION_ID_UNSET) { version = srcStore.getCurrentVersion(); } StoreInfo destStore = getStoreInChildRegion(destinationFabric, clusterName, storeName); if (whetherToCreateNewDataRecoveryVersion(destinationFabric, clusterName, destStore, version)) { - LOGGER.info( - "Skip current version {}_v{} cleanup in {} as it might be serving read requests.", - storeName, - version, - destinationFabric); + LOGGER.info("Skip cleanup for store: {}, version:{} in {}", storeName, version, destinationFabric); return; } int amplificationFactor = srcStore.getPartitionerConfig().getAmplificationFactor(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/datarecovery/DataRecoveryManager.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/datarecovery/DataRecoveryManager.java index de7c1c3c97..6b44f4030d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/datarecovery/DataRecoveryManager.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/datarecovery/DataRecoveryManager.java @@ -22,6 +22,8 @@ import com.linkedin.venice.service.ICProvider; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.io.Closeable; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Map; import java.util.Optional; @@ -68,6 +70,15 @@ private void ensureClientConfigIsAvailable(String feature) { } } + private String getRecoveryPushJobId(String srcPushJobId) { + final String prefix = "data-recovery"; + if (!srcPushJobId.startsWith(prefix)) { + return String.format("%s(%s)_%s", prefix, LocalDateTime.now(ZoneOffset.UTC), srcPushJobId); + } + return srcPushJobId + .replaceFirst("data-recovery\\(.*\\)", String.format("%s(%s)", prefix, LocalDateTime.now(ZoneOffset.UTC))); + } + /** * Initiate data recovery process by recreating the version, kafka topic, and Helix resources accordingly. */ @@ -89,7 +100,7 @@ public void initiateDataRecovery( * Update the push job id as a version with same id cannot be added twice. * @see VeniceHelixAdmin#addSpecificVersion(String, String, Version) */ - sourceFabricVersion.setPushJobId("data_recovery_" + sourceFabricVersion.getPushJobId()); + sourceFabricVersion.setPushJobId(getRecoveryPushJobId(sourceFabricVersion.getPushJobId())); } Version dataRecoveryVersion = sourceFabricVersion.cloneVersion(); dataRecoveryVersion.setStatus(VersionStatus.STARTED); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/datarecovery/TestDataRecoveryManager.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/datarecovery/TestDataRecoveryManager.java index 88159d0e8b..0853c34677 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/datarecovery/TestDataRecoveryManager.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/datarecovery/TestDataRecoveryManager.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import com.linkedin.d2.balancer.D2Client; import com.linkedin.venice.controller.VeniceHelixAdmin; @@ -68,6 +69,6 @@ public void testInitiateDataRecovery() { verify(veniceAdmin).addSpecificVersion(eq(clusterName), eq(storeName), captor.capture()); assertEquals(captor.getValue().getDataRecoveryVersionConfig().getDataRecoverySourceVersionNumber(), version); assertEquals(captor.getValue().getNumber(), 2); - assertEquals(captor.getValue().getPushJobId(), "data_recovery_pushJob1"); + assertTrue(captor.getValue().getPushJobId().startsWith("data-recovery")); } }