Skip to content

Commit

Permalink
[controller] Harden update-store workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
nisargthakkar committed Oct 1, 2024
1 parent b932806 commit e4ebdbe
Show file tree
Hide file tree
Showing 60 changed files with 4,242 additions and 2,862 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ private StoreInfo getStoreInfo(Consumer<StoreInfo> info, boolean applyFirst) {
storeInfo.setChunkingEnabled(false);
storeInfo.setCompressionStrategy(CompressionStrategy.NO_OP);
storeInfo.setWriteComputationEnabled(false);
storeInfo.setIncrementalPushEnabled(false);
storeInfo.setNativeReplicationSourceFabric("dc-0");
Map<String, Integer> coloMaps = new HashMap<String, Integer>() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,16 @@ private ConfigKeys() {
"controller.store.graveyard.cleanup.sleep.interval.between.list.fetch.minutes";

/**
* Whether the superset schema generation in Parent Controller should be done via passed callback or not.
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
*/
public static final String CONTROLLER_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.external.superset.schema.generation.enabled";

/**
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
* @deprecated Use {@link #CONTROLLER_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED}
*/
@Deprecated
public static final String CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.parent.external.superset.schema.generation.enabled";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ private void deleteStores(List<String> storeNames) {

public StoragePersona getPersonaContainingStore(String storeName) {
String personaName = storeNamePersonaMap.get(storeName);
if (personaName == null)
if (personaName == null) {
return null;
}
return getPersona(personaName);
}

private boolean isStoreSetValid(StoragePersona persona, Optional<Store> additionalStore) {
Set<String> setToValidate = new HashSet<>();
if (additionalStore.isPresent())
setToValidate.add(additionalStore.get().getName());
additionalStore.ifPresent(store -> setToValidate.add(store.getName()));
setToValidate.addAll(persona.getStoresToEnforce());
return setToValidate.stream()
.allMatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public enum BackupStrategy {
// KEEP_IN_KAFKA_ONLY,
/** Keep in user-specified store eg HDD, other DB */
// KEEP_IN_USER_STORE;
private int value;
private final int value;

BackupStrategy(int v) {
this.value = v;
Expand All @@ -35,6 +35,10 @@ public enum BackupStrategy {
Arrays.stream(values()).forEach(s -> idMapping.put(s.value, s));
}

public int getValue() {
return value;
}

public static BackupStrategy fromInt(int i) {
BackupStrategy strategy = idMapping.get(i);
if (strategy == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public interface HybridStoreConfig extends DataModelBackedStructure<StoreHybridC
BufferReplayPolicy getBufferReplayPolicy();

HybridStoreConfig clone();

default boolean isHybrid() {
return getRewindTimeInSeconds() >= 0
&& (getOffsetLagThresholdToGoOnline() >= 0 || getProducerTimestampLagThresholdToGoOnlineInSeconds() >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public int hashCode() {

@JsonIgnore
public PartitionerConfig clone() {
return new PartitionerConfigImpl(getPartitionerClass(), getPartitionerParams(), getAmplificationFactor());
return new PartitionerConfigImpl(
getPartitionerClass(),
new HashMap<>(getPartitionerParams()),
getAmplificationFactor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public ZKStore(Store store) {
setSchemaAutoRegisterFromPushJobEnabled(store.isSchemaAutoRegisterFromPushJobEnabled());
setLatestSuperSetValueSchemaId(store.getLatestSuperSetValueSchemaId());
setHybridStoreDiskQuotaEnabled(store.isHybridStoreDiskQuotaEnabled());
setEtlStoreConfig(store.getEtlStoreConfig());
setEtlStoreConfig(store.getEtlStoreConfig().clone());
setStoreMetadataSystemStoreEnabled(store.isStoreMetadataSystemStoreEnabled());
setLatestVersionPromoteToCurrentTimestamp(store.getLatestVersionPromoteToCurrentTimestamp());
setBackupVersionRetentionMs(store.getBackupVersionRetentionMs());
Expand All @@ -220,7 +220,7 @@ public ZKStore(Store store) {
setStoreMetaSystemStoreEnabled(store.isStoreMetaSystemStoreEnabled());
setActiveActiveReplicationEnabled(store.isActiveActiveReplicationEnabled());
setRmdVersion(store.getRmdVersion());
setViewConfigs(store.getViewConfigs());
setViewConfigs(new HashMap<>(store.getViewConfigs()));
setStorageNodeReadQuotaEnabled(store.isStorageNodeReadQuotaEnabled());
setUnusedSchemaDeletionEnabled(store.isUnusedSchemaDeletionEnabled());
setMinCompactionLagSeconds(store.getMinCompactionLagSeconds());
Expand Down Expand Up @@ -363,11 +363,7 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public long getStorageQuotaInByte() {
// This is a safeguard in case that some old stores do not have storage quota field
return (this.storeProperties.storageQuotaInByte <= 0
&& this.storeProperties.storageQuotaInByte != UNLIMITED_STORAGE_QUOTA)
? DEFAULT_STORAGE_QUOTA
: this.storeProperties.storageQuotaInByte;
return this.storeProperties.storageQuotaInByte;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,16 @@ public static int calculatePartitionCount(
} else if (partitionCount < minPartitionCount) {
partitionCount = minPartitionCount;
}

int returnPartitionCount = partitionCount <= 0 ? 1 : (int) partitionCount;

LOGGER.info(
"Assign partition count: {} calculated by storage quota: {} to the new version of store: {}",
partitionCount,
returnPartitionCount,
storageQuota,
storeName);
return (int) partitionCount;

return returnPartitionCount;
}

public static VenicePartitioner getVenicePartitioner(PartitionerConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.venice.meta;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;

import com.linkedin.venice.exceptions.VeniceException;
import org.testng.annotations.Test;


public class BackupStrategyTest {
@Test
public void testFromInt() {
assertEquals(BackupStrategy.fromInt(0), BackupStrategy.KEEP_MIN_VERSIONS);
assertEquals(BackupStrategy.fromInt(1), BackupStrategy.DELETE_ON_NEW_PUSH_START);
assertThrows(VeniceException.class, () -> BackupStrategy.fromInt(2));
}

@Test
public void testGetValue() {
assertEquals(BackupStrategy.KEEP_MIN_VERSIONS.getValue(), 0);
assertEquals(BackupStrategy.DELETE_ON_NEW_PUSH_START.getValue(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,26 @@ public void deserializes() throws IOException {
Assert.assertEquals(fasterXml.getRewindTimeInSeconds(), 123L);
Assert.assertEquals(fasterXml.getDataReplicationPolicy(), DataReplicationPolicy.NON_AGGREGATE);
}

@Test
public void testIsHybrid() {
HybridStoreConfig hybridStoreConfig;
hybridStoreConfig = new HybridStoreConfigImpl(-1, -1, -1, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, -1, -1, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, 100, -1, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, 100, 100, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, -1, 100, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(-1, -1, 100, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,40 @@ public void testClusterLevelActiveActiveReplicationConfigForNewHybridStores() th
assertFalse(parentControllerClient.getStore(storeName).getStore().isActiveActiveReplicationEnabled());
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testClusterLevelActiveActiveReplicationConfigForNewIncrementalPushStores() throws IOException {
String storeName = Utils.getUniqueString("test-store-incremental");
String pushJobId1 = "test-push-job-id-1";
parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\"");
parentControllerClient.emptyPush(storeName, pushJobId1, 1);

// Version 1 should exist.
StoreInfo store = assertCommand(parentControllerClient.getStore(storeName)).getStore();
assertEquals(store.getVersions().size(), 1);

// Check store level Active/Active is enabled or not
assertFalse(store.isActiveActiveReplicationEnabled());
assertFalse(store.isIncrementalPushEnabled());
assertFalse(store.isActiveActiveReplicationEnabled());

// Convert to incremental push store
assertCommand(
parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
StoreInfo storeToTest = parentControllerClient.getStore(storeName).getStore();
assertTrue(storeToTest.isIncrementalPushEnabled());
assertTrue(storeToTest.isActiveActiveReplicationEnabled());
});

// After inc push is disabled, even default A/A config for pure hybrid store is false,
// original store A/A config is enabled.
assertCommand(
parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(false)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
StoreInfo storeToTest = parentControllerClient.getStore(storeName).getStore();
assertFalse(storeToTest.isIncrementalPushEnabled());
assertTrue(storeToTest.isActiveActiveReplicationEnabled());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.TestUtils;
Expand Down Expand Up @@ -97,6 +98,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
parentControllerClient.updateStore(
storeName,
new UpdateStoreQueryParams().setIncrementalPushEnabled(true)
.setHybridDataReplicationPolicy(DataReplicationPolicy.NONE)
.setHybridRewindSeconds(1L)
.setHybridOffsetLagThreshold(10)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Expand All @@ -107,7 +109,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
}

@Test(timeOut = TEST_TIMEOUT)
public void testConvertHybridDuringPushjob() {
public void testConvertHybridDuringPushJob() {
String storeName = Utils.getUniqueString("test-store");
parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\"");
parentControllerClient.requestTopicForWrites(
Expand All @@ -128,7 +130,7 @@ public void testConvertHybridDuringPushjob() {
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(1L));
Assert.assertTrue(response.isError());
Assert.assertTrue(response.getError().contains("Cannot convert to hybrid as there is already a pushjob running"));
Assert.assertTrue(response.getError().contains("Cannot convert to hybrid as there is already a push job running"));
parentControllerClient.killOfflinePushJob(Version.composeKafkaTopic(storeName, 1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,10 @@ public void testEnableActiveActiveReplicationSchema() {
Assert.assertFalse(schemaResponse2.isError(), "addValeSchema returned error: " + schemaResponse2.getError());

// Enable AA on store
UpdateStoreQueryParams updateStoreToEnableAARepl =
new UpdateStoreQueryParams().setNativeReplicationEnabled(true).setActiveActiveReplicationEnabled(true);
UpdateStoreQueryParams updateStoreToEnableAARepl = new UpdateStoreQueryParams().setNativeReplicationEnabled(true)
.setActiveActiveReplicationEnabled(true)
.setHybridOffsetLagThreshold(1000)
.setHybridRewindSeconds(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreToEnableAARepl);
/**
* Test Active/Active replication config enablement generates the active active metadata schema.
Expand Down
Loading

0 comments on commit e4ebdbe

Please sign in to comment.