Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nisargthakkar committed Nov 16, 2024
1 parent 8cac7d4 commit 9f3e225
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.WRITE_COMPUTATION_ENABLED;
import static com.linkedin.venice.meta.Version.VERSION_SEPARATOR;
import static com.linkedin.venice.meta.VersionStatus.ONLINE;
import static com.linkedin.venice.meta.VersionStatus.PUSHED;
import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.BATCH_JOB_HEARTBEAT;
import static com.linkedin.venice.serialization.avro.AvroProtocolDefinition.PUSH_JOB_DETAILS;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -225,7 +223,6 @@
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_HYBRID_TIME_LAG_THRESHOLD;
import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REWIND_TIME_IN_SECONDS;
import static com.linkedin.venice.meta.Version.VERSION_SEPARATOR;
import static com.linkedin.venice.utils.RegionUtils.parseRegionsFilterList;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
Expand Down Expand Up @@ -101,6 +100,7 @@
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.VeniceView;
Expand Down Expand Up @@ -133,29 +133,24 @@ public static UpdateStoreWrapper getStoreUpdate(
boolean checkRegionFilter) {
VeniceControllerMultiClusterConfig multiClusterConfigs = admin.getMultiClusterConfigs();

// Check whether the command affects this region.
Set<String> regionsFilter =
params.getRegionsFilter().map(RegionUtils::parseRegionsFilterList).orElse(Collections.emptySet());
if (checkRegionFilter && !regionsFilter.isEmpty() && !regionsFilter.contains(multiClusterConfigs.getRegionName())) {
LOGGER.info(
"UpdateStore command will be skipped for store: {} in cluster: {}, because the region filter is {}"
+ " which doesn't include the current region: {}",
storeName,
clusterName,
regionsFilter,
multiClusterConfigs.getRegionName());
return null;
}

// There are certain configs that are only allowed to be updated in child regions. We might still want the ability
// to update such configs in the parent region via the Admin tool for operational reasons. So, we allow such updates
// if the regions filter only specifies one region, which is the parent region.
boolean onlyParentRegionFilter = false;

// Check whether the command affects this region.
if (params.getRegionsFilter().isPresent()) {
Set<String> regionsFilter = parseRegionsFilterList(params.getRegionsFilter().get());
if (checkRegionFilter && !regionsFilter.contains(multiClusterConfigs.getRegionName())) {
LOGGER.info(
"UpdateStore command will be skipped for store: {} in cluster: {}, because the region filter is {}"
+ " which doesn't include the current region: {}",
storeName,
clusterName,
regionsFilter,
multiClusterConfigs.getRegionName());
return null;
}

if (admin.isParent() && regionsFilter.size() == 1) {
onlyParentRegionFilter = true;
}
}
boolean onlyParentRegionFilter = admin.isParent() && regionsFilter.size() == 1;

Store originalStore = admin.getStore(clusterName, storeName);
if (originalStore == null) {
Expand Down Expand Up @@ -823,18 +818,19 @@ static void validateStoreConfigs(Admin admin, String clusterName, Store store) {
ErrorType.INVALID_CONFIG);
}

DataReplicationPolicy dataReplicationPolicy = hybridStoreConfig.getDataReplicationPolicy();
// Incremental push + !AA + NON_AGGREGATE DRP is not supported in multi-region mode
if (controllerConfig.isMultiRegion() && store.isIncrementalPushEnabled()
&& !store.isActiveActiveReplicationEnabled()
&& dataReplicationPolicy == DataReplicationPolicy.NON_AGGREGATE) {
boolean isIncrementalPushSupported = AdminUtils.isIncrementalPushSupported(
controllerConfig.isMultiRegion(),
store.isActiveActiveReplicationEnabled(),
hybridStoreConfig);
if (store.isIncrementalPushEnabled() && !isIncrementalPushSupported) {
throw new VeniceHttpException(
HttpStatus.SC_BAD_REQUEST,
errorMessagePrefix
+ "Incremental push is not supported for non active-active hybrid stores with NON_AGGREGATE data replication policy",
ErrorType.INVALID_CONFIG);
}

DataReplicationPolicy dataReplicationPolicy = hybridStoreConfig.getDataReplicationPolicy();
// ACTIVE_ACTIVE DRP is only supported when activeActiveReplicationEnabled = true
if (dataReplicationPolicy == DataReplicationPolicy.ACTIVE_ACTIVE && !store.isActiveActiveReplicationEnabled()) {
throw new VeniceHttpException(
Expand Down Expand Up @@ -988,7 +984,9 @@ private static void validateStoreUpdate(
}
}

private static Map<String, ViewConfig> validateAndDecorateStoreViewConfigs(Map<String, String> stringMap, Store store) {
private static Map<String, ViewConfig> validateAndDecorateStoreViewConfigs(
Map<String, String> stringMap,
Store store) {
Map<String, ViewConfig> configs = StoreViewUtils.convertStringMapViewToViewConfigMap(stringMap);
Map<String, ViewConfig> validatedConfigs = new HashMap<>();
for (Map.Entry<String, ViewConfig> viewConfigEntry: configs.entrySet()) {
Expand Down Expand Up @@ -1124,11 +1122,10 @@ static void validateStorePartitionCountUpdate(

int minPartitionNum = clusterConfig.getMinNumberOfPartitions();
if (newPartitionCount < minPartitionNum && newPartitionCount != 0) {
throw new VeniceHttpException(
HttpStatus.SC_BAD_REQUEST,
"Partition count must be at least " + minPartitionNum + " for store: " + storeName
+ ". If a specific partition count is not required, set it to 0.",
ErrorType.INVALID_CONFIG);
String errorMessage = errorMessagePrefix + "Partition count must be at least " + minPartitionNum
+ ". If a specific partition count is not required, set it to 0.";
LOGGER.error(errorMessage);
throw new VeniceHttpException(HttpStatus.SC_BAD_REQUEST, errorMessage, ErrorType.INVALID_CONFIG);
}

int maxPartitionNum = clusterConfig.getMaxNumberOfPartitions();
Expand Down Expand Up @@ -1219,12 +1216,11 @@ static PartitionerConfig mergeNewSettingsIntoOldPartitionerConfig(
return oldStore.getPartitionerConfig();
}

PartitionerConfig originalPartitionerConfig;
if (oldStore.getPartitionerConfig() == null) {
PartitionerConfig originalPartitionerConfig = oldStore.getPartitionerConfig();
if (originalPartitionerConfig == null) {
originalPartitionerConfig = new PartitionerConfigImpl();
} else {
originalPartitionerConfig = oldStore.getPartitionerConfig();
}

return new PartitionerConfigImpl(
partitionerClass.orElse(originalPartitionerConfig.getPartitionerClass()),
partitionerParams.orElse(originalPartitionerConfig.getPartitionerParams()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1655,8 +1655,7 @@ public void testUpdateStore() {
when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null)
.thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1));

UpdateStoreQueryParams storeQueryParams1 =
new UpdateStoreQueryParams().setIncrementalPushEnabled(true).setBlobTransferEnabled(true);
UpdateStoreQueryParams storeQueryParams1 = new UpdateStoreQueryParams().setBlobTransferEnabled(true);
parentAdmin.initStorageCluster(clusterName);
parentAdmin.updateStore(clusterName, storeName, storeQueryParams1);

Expand All @@ -1676,7 +1675,6 @@ public void testUpdateStore() {
assertEquals(adminMessage.operationType, AdminMessageType.UPDATE_STORE.getValue());

UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion;
assertTrue(updateStore.incrementalPushEnabled);
assertTrue(updateStore.blobTransferEnabled);

long readQuota = 100L;
Expand Down

0 comments on commit 9f3e225

Please sign in to comment.