Skip to content

Commit

Permalink
[controller][test] Fix issues in data recovery API (#572)
Browse files Browse the repository at this point in the history
* [controller][test] Fix issues in data recovery API

This rb targets several issues in the data recovery API:

1. Today, during data recovery, only when the version number from the source equals to the current version in the dest colo,
we bump a store's version to LargestUsedVersionNumber+1 in parent and use the increased version for data recovery.
So, if the version number from the source is less than the current version in the dest, we simply use it in the dest.
As a result, data recovery can not complete successfully, because backup version cleanup thread can kick in, consider it
as an expired backup version, and delet it.

As a fix to it, we should extends current condition, and increase the store version for such cases as well.

On the other hand, if the version number from the source colo is larger than current version in dest colo, we can use it for
the dest colo without increasing it. This is fine because it is larger than the current, and resources for this version will
be deleted during prepareDataRecovery. So, even though the version number is used (created and then deleted) in dest colo
before, it can still be reused.

2. Users sometimes can put the same colo name for src and dest which is considered as an error operation.
This rb adds guardrail to protect such cases and fails early.

Integration tests are also improved to cover the above two issues and verify the correctness of this fix.
  • Loading branch information
lluwm authored Aug 9, 2023
1 parent 380729f commit e3cb5b2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.samza.config.MapConfig;
Expand Down Expand Up @@ -188,7 +189,40 @@ public void testStartDataRecoveryAPIs() {
}
}

@Test(timeOut = TEST_TIMEOUT)
private void performDataRecoveryTest(
String storeName,
ControllerClient parentControllerClient,
ControllerClient destColoClient,
String src,
String dest,
int times,
int expectedStartVersionOnDest) throws ExecutionException, InterruptedException {
for (int version = expectedStartVersionOnDest; version < times + expectedStartVersionOnDest; version++) {
// Prepare dest fabric for data recovery.
Assert.assertFalse(
parentControllerClient.prepareDataRecovery(src, dest, storeName, VERSION_ID_UNSET, Optional.empty())
.isError());
// Initiate data recovery, a new version will be created in dest fabric.
Assert.assertFalse(
parentControllerClient.dataRecovery(src, dest, storeName, VERSION_ID_UNSET, false, true, Optional.empty())
.isError());
int finalVersion = version;
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> {
Assert.assertEquals(destColoClient.getStore(storeName).getStore().getCurrentVersion(), finalVersion);
});
try (AvroGenericStoreClient<String, Object> client = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName)
.setVeniceURL(childDatacenters.get(1).getClusters().get(clusterName).getRandomRouterURL()))) {
for (int i = 0; i < 10; i++) {
Object v = client.get(String.valueOf(i)).get();
Assert.assertNotNull(v, "Batch data should be consumed already in data center " + dest);
Assert.assertEquals(v.toString(), String.valueOf(i));
}
}
}
}

@Test(timeOut = 2 * TEST_TIMEOUT)
public void testBatchOnlyDataRecovery() throws Exception {
String storeName = Utils.getUniqueString("dataRecovery-store-batch");
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
Expand Down Expand Up @@ -234,27 +268,26 @@ public void testBatchOnlyDataRecovery() throws Exception {
parentControllerClient,
60,
TimeUnit.SECONDS);
// Prepare dc-1 for data recovery
Assert.assertFalse(
parentControllerClient.prepareDataRecovery("dc-0", "dc-1", storeName, VERSION_ID_UNSET, Optional.empty())

// Verify that if same data center are given as both src and dest to data recovery api, client response should
// have errors.
String sameFabricName = "dc-0";
Assert.assertTrue(
parentControllerClient
.prepareDataRecovery(sameFabricName, sameFabricName, storeName, VERSION_ID_UNSET, Optional.empty())
.isError());
// Initiate data recovery, a new version will be created in dest fabric
Assert.assertFalse(
Assert.assertTrue(
parentControllerClient
.dataRecovery("dc-0", "dc-1", storeName, VERSION_ID_UNSET, false, true, Optional.empty())
.dataRecovery(sameFabricName, sameFabricName, storeName, VERSION_ID_UNSET, false, true, Optional.empty())
.isError());
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> {
Assert.assertEquals(dc1Client.getStore(storeName).getStore().getCurrentVersion(), 2);
});
try (AvroGenericStoreClient<String, Object> client = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName)
.setVeniceURL(childDatacenters.get(1).getClusters().get(clusterName).getRandomRouterURL()))) {
for (int i = 0; i < 10; i++) {
Object v = client.get(String.valueOf(i)).get();
Assert.assertNotNull(v, "Batch data should be consumed already in data center dc-1");
Assert.assertEquals(v.toString(), String.valueOf(i));
}
}

/*
* Before data recovery, current version in dc-0 and dc-1 is 1.
* With two rounds of dc-0 -> dc-1 data recovery, current version in dc-1 changes to 2 and then 3.
* Then with two rounds of dc-1 -> dc-0 data recovery, current version in dc-0 becomes 3 and then 4.
*/
performDataRecoveryTest(storeName, parentControllerClient, dc1Client, "dc-0", "dc-1", 2, 2);
performDataRecoveryTest(storeName, parentControllerClient, dc0Client, "dc-1", "dc-0", 2, 3);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3622,10 +3622,16 @@ private boolean whetherToCreateNewDataRecoveryVersion(
String clusterName,
StoreInfo destStore,
int versionNumber) {
// Currently new version data recovery is only supported for batch-only store.
// For existing data centers, current store version might be serving read requests. Need to create a new version.
// For new data centers or non-current version, it's ok to delete and recreate it. No need to create a new version.
return destStore.getHybridStoreConfig() == null && versionNumber == destStore.getCurrentVersion()
/**
* Creating a new data recovery version on the destination colo when satisfying:
* 1. New version data recovery is only supported for batch-only store.
* 2. For the existing destination data center, a new version is needed if
* 2.1. srcVersionNumber equals to the current version in dest colo, as current version is serving read requests.
* 2.2. srcVersionNumber is less than the current version in dest colo, because Venice normally assumes that a
* new version always have a larger version number than previous ones
* e.g. {@link StoreBackupVersionCleanupService#cleanupBackupVersion(Store, String)}.
*/
return destStore.getHybridStoreConfig() == null && versionNumber <= destStore.getCurrentVersion()
&& multiClusterConfigs.getControllerConfig(clusterName).getChildDataCenterAllowlist().contains(destFabric);
}

Expand All @@ -3641,6 +3647,13 @@ public void initiateDataRecovery(
String destinationFabric,
boolean copyAllVersionConfigs,
Optional<Version> ignored) {
if (Objects.equals(sourceFabric, destinationFabric)) {
throw new VeniceException(
String.format(
"Source ({}) and destination ({}) cannot be the same data center",
sourceFabric,
destinationFabric));
}
StoreInfo srcStore = getStoreInChildRegion(sourceFabric, clusterName, storeName);
if (version == VERSION_ID_UNSET) {
version = srcStore.getCurrentVersion();
Expand All @@ -3661,9 +3674,9 @@ public void initiateDataRecovery(
parentStore.setLargestUsedVersionNumber(newVersion);
repository.updateStore(parentStore);
LOGGER.info(
"Current version {}_v{} in {} might be serving read requests. Copying data to a new version {}.",
storeName,
"version {} is less or equal to in the current version of {} in {}. Copying data to a new version {}.",
version,
storeName,
destinationFabric,
newVersion);
version = newVersion;
Expand All @@ -3690,17 +3703,20 @@ public void prepareDataRecovery(
String sourceFabric,
String destinationFabric,
Optional<Integer> ignored) {
if (Objects.equals(sourceFabric, destinationFabric)) {
throw new VeniceException(
String.format(
"Source ({}) and destination ({}) cannot be the same data center",
sourceFabric,
destinationFabric));
}
StoreInfo srcStore = getStoreInChildRegion(sourceFabric, clusterName, storeName);
if (version == VERSION_ID_UNSET) {
version = srcStore.getCurrentVersion();
}
StoreInfo destStore = getStoreInChildRegion(destinationFabric, clusterName, storeName);
if (whetherToCreateNewDataRecoveryVersion(destinationFabric, clusterName, destStore, version)) {
LOGGER.info(
"Skip current version {}_v{} cleanup in {} as it might be serving read requests.",
storeName,
version,
destinationFabric);
LOGGER.info("Skip cleanup for store: {}, version:{} in {}", storeName, version, destinationFabric);
return;
}
int amplificationFactor = srcStore.getPartitionerConfig().getAmplificationFactor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.Closeable;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -68,6 +70,15 @@ private void ensureClientConfigIsAvailable(String feature) {
}
}

private String getRecoveryPushJobId(String srcPushJobId) {
final String prefix = "data-recovery";
if (!srcPushJobId.startsWith(prefix)) {
return String.format("%s(%s)_%s", prefix, LocalDateTime.now(ZoneOffset.UTC), srcPushJobId);
}
return srcPushJobId
.replaceFirst("data-recovery\\(.*\\)", String.format("%s(%s)", prefix, LocalDateTime.now(ZoneOffset.UTC)));
}

/**
* Initiate data recovery process by recreating the version, kafka topic, and Helix resources accordingly.
*/
Expand All @@ -89,7 +100,7 @@ public void initiateDataRecovery(
* Update the push job id as a version with same id cannot be added twice.
* @see VeniceHelixAdmin#addSpecificVersion(String, String, Version)
*/
sourceFabricVersion.setPushJobId("data_recovery_" + sourceFabricVersion.getPushJobId());
sourceFabricVersion.setPushJobId(getRecoveryPushJobId(sourceFabricVersion.getPushJobId()));
}
Version dataRecoveryVersion = sourceFabricVersion.cloneVersion();
dataRecoveryVersion.setStatus(VersionStatus.STARTED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.venice.controller.VeniceHelixAdmin;
Expand Down Expand Up @@ -68,6 +69,6 @@ public void testInitiateDataRecovery() {
verify(veniceAdmin).addSpecificVersion(eq(clusterName), eq(storeName), captor.capture());
assertEquals(captor.getValue().getDataRecoveryVersionConfig().getDataRecoverySourceVersionNumber(), version);
assertEquals(captor.getValue().getNumber(), 2);
assertEquals(captor.getValue().getPushJobId(), "data_recovery_pushJob1");
assertTrue(captor.getValue().getPushJobId().startsWith("data-recovery"));
}
}

0 comments on commit e3cb5b2

Please sign in to comment.