Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Harden update-store workflow #1091

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ private StoreInfo getStoreInfo(Consumer<StoreInfo> info, boolean applyFirst) {
storeInfo.setChunkingEnabled(false);
storeInfo.setCompressionStrategy(CompressionStrategy.NO_OP);
storeInfo.setWriteComputationEnabled(false);
storeInfo.setIncrementalPushEnabled(false);
storeInfo.setNativeReplicationSourceFabric("dc-0");
Map<String, Integer> coloMaps = new HashMap<String, Integer>() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,16 @@ private ConfigKeys() {
"controller.store.graveyard.cleanup.sleep.interval.between.list.fetch.minutes";

/**
* Whether the superset schema generation in Parent Controller should be done via passed callback or not.
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
*/
public static final String CONTROLLER_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.external.superset.schema.generation.enabled";
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved

/**
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
* @deprecated Use {@link #CONTROLLER_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED}
*/
@Deprecated
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved
public static final String CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.parent.external.superset.schema.generation.enabled";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ private void deleteStores(List<String> storeNames) {

public StoragePersona getPersonaContainingStore(String storeName) {
String personaName = storeNamePersonaMap.get(storeName);
if (personaName == null)
if (personaName == null) {
return null;
}
return getPersona(personaName);
}

private boolean isStoreSetValid(StoragePersona persona, Optional<Store> additionalStore) {
Set<String> setToValidate = new HashSet<>();
if (additionalStore.isPresent())
setToValidate.add(additionalStore.get().getName());
additionalStore.ifPresent(store -> setToValidate.add(store.getName()));
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved
setToValidate.addAll(persona.getStoresToEnforce());
return setToValidate.stream()
.allMatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public enum BackupStrategy {
// KEEP_IN_KAFKA_ONLY,
/** Keep in user-specified store eg HDD, other DB */
// KEEP_IN_USER_STORE;
private int value;
private final int value;

BackupStrategy(int v) {
this.value = v;
Expand All @@ -35,6 +35,10 @@ public enum BackupStrategy {
Arrays.stream(values()).forEach(s -> idMapping.put(s.value, s));
}

public int getValue() {
return value;
}

public static BackupStrategy fromInt(int i) {
BackupStrategy strategy = idMapping.get(i);
if (strategy == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public interface HybridStoreConfig extends DataModelBackedStructure<StoreHybridC
BufferReplayPolicy getBufferReplayPolicy();

HybridStoreConfig clone();

default boolean isHybrid() {
return getRewindTimeInSeconds() >= 0
&& (getOffsetLagThresholdToGoOnline() >= 0 || getProducerTimestampLagThresholdToGoOnlineInSeconds() >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public int hashCode() {

@JsonIgnore
public PartitionerConfig clone() {
return new PartitionerConfigImpl(getPartitionerClass(), getPartitionerParams(), getAmplificationFactor());
return new PartitionerConfigImpl(
getPartitionerClass(),
new HashMap<>(getPartitionerParams()),
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved
getAmplificationFactor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public ZKStore(Store store) {
setSchemaAutoRegisterFromPushJobEnabled(store.isSchemaAutoRegisterFromPushJobEnabled());
setLatestSuperSetValueSchemaId(store.getLatestSuperSetValueSchemaId());
setHybridStoreDiskQuotaEnabled(store.isHybridStoreDiskQuotaEnabled());
setEtlStoreConfig(store.getEtlStoreConfig());
setEtlStoreConfig(store.getEtlStoreConfig().clone());
setStoreMetadataSystemStoreEnabled(store.isStoreMetadataSystemStoreEnabled());
setLatestVersionPromoteToCurrentTimestamp(store.getLatestVersionPromoteToCurrentTimestamp());
setBackupVersionRetentionMs(store.getBackupVersionRetentionMs());
Expand All @@ -220,7 +220,7 @@ public ZKStore(Store store) {
setStoreMetaSystemStoreEnabled(store.isStoreMetaSystemStoreEnabled());
setActiveActiveReplicationEnabled(store.isActiveActiveReplicationEnabled());
setRmdVersion(store.getRmdVersion());
setViewConfigs(store.getViewConfigs());
setViewConfigs(new HashMap<>(store.getViewConfigs()));
setStorageNodeReadQuotaEnabled(store.isStorageNodeReadQuotaEnabled());
setUnusedSchemaDeletionEnabled(store.isUnusedSchemaDeletionEnabled());
setMinCompactionLagSeconds(store.getMinCompactionLagSeconds());
Expand Down Expand Up @@ -365,11 +365,7 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public long getStorageQuotaInByte() {
// This is a safeguard in case that some old stores do not have storage quota field
return (this.storeProperties.storageQuotaInByte <= 0
&& this.storeProperties.storageQuotaInByte != UNLIMITED_STORAGE_QUOTA)
? DEFAULT_STORAGE_QUOTA
: this.storeProperties.storageQuotaInByte;
return this.storeProperties.storageQuotaInByte;
nisargthakkar marked this conversation as resolved.
Show resolved Hide resolved
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,16 @@ public static int calculatePartitionCount(
} else if (partitionCount < minPartitionCount) {
partitionCount = minPartitionCount;
}

int returnPartitionCount = partitionCount <= 0 ? 1 : (int) partitionCount;

LOGGER.info(
"Assign partition count: {} calculated by storage quota: {} to the new version of store: {}",
partitionCount,
returnPartitionCount,
storageQuota,
storeName);
return (int) partitionCount;

return returnPartitionCount;
}

public static VenicePartitioner getVenicePartitioner(PartitionerConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.venice.meta;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;

import com.linkedin.venice.exceptions.VeniceException;
import org.testng.annotations.Test;


public class BackupStrategyTest {
@Test
public void testFromInt() {
assertEquals(BackupStrategy.fromInt(0), BackupStrategy.KEEP_MIN_VERSIONS);
assertEquals(BackupStrategy.fromInt(1), BackupStrategy.DELETE_ON_NEW_PUSH_START);
assertThrows(VeniceException.class, () -> BackupStrategy.fromInt(2));
}

@Test
public void testGetValue() {
assertEquals(BackupStrategy.KEEP_MIN_VERSIONS.getValue(), 0);
assertEquals(BackupStrategy.DELETE_ON_NEW_PUSH_START.getValue(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,26 @@ public void deserializes() throws IOException {
Assert.assertEquals(fasterXml.getRewindTimeInSeconds(), 123L);
Assert.assertEquals(fasterXml.getDataReplicationPolicy(), DataReplicationPolicy.NON_AGGREGATE);
}

@Test
public void testIsHybrid() {
HybridStoreConfig hybridStoreConfig;
hybridStoreConfig = new HybridStoreConfigImpl(-1, -1, -1, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, -1, -1, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, 100, -1, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, 100, 100, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, -1, 100, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(-1, -1, 100, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,40 @@ public void testClusterLevelActiveActiveReplicationConfigForNewHybridStores() th
assertFalse(parentControllerClient.getStore(storeName).getStore().isActiveActiveReplicationEnabled());
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testClusterLevelActiveActiveReplicationConfigForNewIncrementalPushStores() throws IOException {
String storeName = Utils.getUniqueString("test-store-incremental");
String pushJobId1 = "test-push-job-id-1";
parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\"");
parentControllerClient.emptyPush(storeName, pushJobId1, 1);

// Version 1 should exist.
StoreInfo store = assertCommand(parentControllerClient.getStore(storeName)).getStore();
assertEquals(store.getVersions().size(), 1);

// Check store level Active/Active is enabled or not
assertFalse(store.isActiveActiveReplicationEnabled());
assertFalse(store.isIncrementalPushEnabled());
assertFalse(store.isActiveActiveReplicationEnabled());

// Convert to incremental push store
assertCommand(
parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
StoreInfo storeToTest = parentControllerClient.getStore(storeName).getStore();
assertTrue(storeToTest.isIncrementalPushEnabled());
assertTrue(storeToTest.isActiveActiveReplicationEnabled());
});

// After inc push is disabled, even default A/A config for pure hybrid store is false,
// original store A/A config is enabled.
assertCommand(
parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(false)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
StoreInfo storeToTest = parentControllerClient.getStore(storeName).getStore();
assertFalse(storeToTest.isIncrementalPushEnabled());
assertTrue(storeToTest.isActiveActiveReplicationEnabled());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.TestUtils;
Expand Down Expand Up @@ -97,6 +98,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
parentControllerClient.updateStore(
storeName,
new UpdateStoreQueryParams().setIncrementalPushEnabled(true)
.setHybridDataReplicationPolicy(DataReplicationPolicy.NONE)
.setHybridRewindSeconds(1L)
.setHybridOffsetLagThreshold(10)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Expand All @@ -107,7 +109,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
}

@Test(timeOut = TEST_TIMEOUT)
public void testConvertHybridDuringPushjob() {
public void testConvertHybridDuringPushJob() {
String storeName = Utils.getUniqueString("test-store");
parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\"");
parentControllerClient.requestTopicForWrites(
Expand All @@ -128,7 +130,7 @@ public void testConvertHybridDuringPushjob() {
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(1L));
Assert.assertTrue(response.isError());
Assert.assertTrue(response.getError().contains("Cannot convert to hybrid as there is already a pushjob running"));
Assert.assertTrue(response.getError().contains("Cannot convert to hybrid as there is already a push job running"));
parentControllerClient.killOfflinePushJob(Version.composeKafkaTopic(storeName, 1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,10 @@ public void testEnableActiveActiveReplicationSchema() {
Assert.assertFalse(schemaResponse2.isError(), "addValeSchema returned error: " + schemaResponse2.getError());

// Enable AA on store
UpdateStoreQueryParams updateStoreToEnableAARepl =
new UpdateStoreQueryParams().setNativeReplicationEnabled(true).setActiveActiveReplicationEnabled(true);
UpdateStoreQueryParams updateStoreToEnableAARepl = new UpdateStoreQueryParams().setNativeReplicationEnabled(true)
.setActiveActiveReplicationEnabled(true)
.setHybridOffsetLagThreshold(1000)
.setHybridRewindSeconds(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreToEnableAARepl);
/**
* Test Active/Active replication config enablement generates the active active metadata schema.
Expand Down
Loading
Loading