diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index dafd50f398d..e2e0130c8ef 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -369,6 +369,9 @@ private ConfigKeys() { public static final String SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED = "system.schema.initialization.at.start.time.enabled"; + public static final String KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED = + "kme.registration.from.message.header.enabled"; + // Server specific configs public static final String LISTENER_PORT = "listener.port"; public static final String GRPC_READ_SERVER_PORT = "grpc.read.server.port"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java index 3295e09979f..591b51472a7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepository.java @@ -558,4 +558,10 @@ private void preCheckStoreCondition(String storeName) { throw new VeniceNoStoreException(storeName); } } + + // For testing purpose only. + public void removeValueSchema(String storeName, int schemaId) { + preCheckStoreCondition(storeName); + accessor.removeValueSchema(storeName, String.valueOf(schemaId)); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepositoryAdapter.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepositoryAdapter.java index aa66d300ea1..4449a3c378e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepositoryAdapter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixReadWriteSchemaRepositoryAdapter.java @@ -299,4 +299,9 @@ public void clear() { readOnlyZKSharedSchemaRepository.clear(); readWriteRegularStoreSchemaRepository.clear(); } + + // For testing purpose only. + public ReadWriteSchemaRepository getReadWriteRegularStoreSchemaRepository() { + return this.readWriteRegularStoreSchemaRepository; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixSchemaAccessor.java b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixSchemaAccessor.java index 635921c1363..1f9f72a771a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixSchemaAccessor.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/helix/HelixSchemaAccessor.java @@ -278,4 +278,10 @@ String getReplicationMetadataSchemaPath(String storeName, String replicationMeta .append(replicationMetadataVersionIdPair) .toString(); } + + // For testing purpose only. + public void removeValueSchema(String storeName, String schemaId) { + HelixUtils.remove(schemaAccessor, getValueSchemaPath(storeName, schemaId)); + logger.info("Removed value schema: {} for store: {}.", schemaId, storeName); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java index 9429781faef..643c6839df4 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java @@ -1,6 +1,7 @@ package com.linkedin.venice.pubsub.api; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.enums.MessageType; import com.linkedin.venice.message.KafkaKey; @@ -63,6 +64,15 @@ public PubSubMessage deserialize( Schema providedProtocolSchema = AvroCompatibilityHelper.parse(new String(header.value())); value = valueSerializer.deserialize(valueBytes, providedProtocolSchema, getEnvelope(key.getKeyHeaderByte())); + } catch (VeniceException e) { + /* + * If a Venice exception is caught when registering a new discovered schema, we don't need any special + * management here, delegate it to the follower deserialize to handle such case. + */ + LOGGER.warn( + "Venice exception in registering new encountered schema in protocol header: ", + VENICE_TRANSPORT_PROTOCOL_HEADER, + e); } catch (Exception e) { // Improper header... will ignore. LOGGER.warn("Received unparsable schema in protocol header: " + VENICE_TRANSPORT_PROTOCOL_HEADER, e); @@ -97,4 +107,9 @@ private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) { throw new IllegalStateException("Illegal key header byte: " + keyHeaderByte); } } + + // For testing only. + public KafkaValueSerializer getValueSerializer() { + return valueSerializer; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java index 7e70fedad11..a320d73043c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java @@ -7,6 +7,7 @@ import com.linkedin.venice.schema.SchemaReader; import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.utils.ByteUtils; +import com.linkedin.venice.utils.SparseConcurrentList; import com.linkedin.venice.utils.SparseConcurrentListWithOffset; import com.linkedin.venice.utils.Utils; import it.unimi.dsi.fastutil.ints.IntLinkedOpenHashSet; @@ -14,7 +15,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.function.BiConsumer; @@ -93,7 +93,7 @@ private static class ReusableObjects { private final SpecificDatumWriter writer; /** Maintains the mapping between protocol version and the corresponding {@link SpecificDatumReader} */ - private final List> protocolVersionToReader; + private final SparseConcurrentList> protocolVersionToReader; /** The schema of the {@link SpecificRecord} which is compiled in the current version of the code. */ private final Schema compiledProtocol; @@ -355,11 +355,28 @@ public SPECIFIC_RECORD deserialize(byte[] bytes, int protocolVersion, SPECIFIC_R public SPECIFIC_RECORD deserialize(byte[] bytes, Schema providedProtocolSchema, SPECIFIC_RECORD reuse) { int protocolVersion = getProtocolVersion(bytes); - VeniceSpecificDatumReader specificDatumReader = protocolVersionToReader.get(protocolVersion); - if (specificDatumReader == null) { - specificDatumReader = cacheDatumReader(protocolVersion, providedProtocolSchema); - newSchemaEncountered.accept(protocolVersion, providedProtocolSchema); - } + + /** + * When a new schema version is discovered during data ingestion, there will be only one thread registering + * the new schema for all the users of the shared InternalAvroSpecificSerializer object. During this period + * of registration, other concurrent threads that calling deserialize will be blocked until the registration + * of the first thread is finished. (It would help to avoid the concurrent request loads in the case e.g. when + * all shared consumer threads in one venice server discover an unknown schema from header, and all the + * threads decide to register this schema to controller.) + * + * It is important to notice that once the registration part is completed, it still allows the deserialization + * to run in parallel. + * + * Also notice that if exception happens in newSchemaEncountered, local protocolVersionToReader will not + * be updated, and it gives its caller an opportunity to discover the failure and perform any + * remediation steps (e.g. retry etc.). + */ + VeniceSpecificDatumReader specificDatumReader = + protocolVersionToReader.computeIfAbsent(protocolVersion, index -> { + newSchemaEncountered.accept(protocolVersion, providedProtocolSchema); + return cacheDatumReader(protocolVersion, providedProtocolSchema); + }); + return deserialize(bytes, specificDatumReader, reuse); } @@ -426,4 +443,14 @@ private VeniceSpecificDatumReader cacheDatumReader(int protocol private String getCurrentlyLoadedProtocolVersions() { return knownProtocols().toString(); } + + // For testing only. + public void removeAllSchemas() { + this.protocolVersionToReader.clear(); + } + + // For testing purpose only. + public int getProtocolVersionSize() { + return protocolVersionToReader.size(); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java index f3d67ff0f0c..de1d50d41e1 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java @@ -15,6 +15,7 @@ import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.exceptions.ErrorType; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType; import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; @@ -76,7 +77,7 @@ public ControllerClientBackedSystemSchemaInitializer( this.enforceSslOnly = enforceSslOnly; } - public void execute() { + public void execute(Map inputSchemas) { if (controllerClient == null) { if (!controllerUrl.isEmpty()) { controllerClient = ControllerClient.constructClusterControllerClient(clusterName, controllerUrl, sslFactory); @@ -100,7 +101,9 @@ public void execute() { } String storeName = protocolDefinition.getSystemStoreName(); - Map schemasInLocalResources = Utils.getAllSchemasFromResources(protocolDefinition); + + Map schemasInLocalResources = + inputSchemas != null ? inputSchemas : Utils.getAllSchemasFromResources(protocolDefinition); D2ServiceDiscoveryResponse discoveryResponse = controllerClient.retryableRequest( DEFAULT_RETRY_TIMES, c -> c.discoverCluster(storeName), @@ -142,19 +145,31 @@ public void execute() { for (int version = 1; version <= protocolDefinition.getCurrentProtocolVersion(); version++) { Schema schemaInLocalResources = schemasInLocalResources.get(version); if (schemaInLocalResources == null) { - throw new VeniceException( - "Invalid protocol definition: " + protocolDefinition.name() + " does not have a version " + version - + " even though it is less than or equal to the current version (" - + protocolDefinition.getCurrentProtocolVersion() + ")."); - } - checkAndMayRegisterValueSchema(storeName, version, schemasInZk.get(version), schemaInLocalResources); + if (inputSchemas == null) { + throw new VeniceException( + "Invalid protocol definition: " + protocolDefinition.name() + " does not have a version " + version + + " even though it is less than or equal to the current version (" + + protocolDefinition.getCurrentProtocolVersion() + ")."); + } + } else { + checkAndMayRegisterValueSchema( + storeName, + version, + schemasInZk.get(version), + schemaInLocalResources, + inputSchemas != null); - if (autoRegisterPartialUpdateSchema) { - checkAndMayRegisterPartialUpdateSchema(storeName, version, schemaInLocalResources); + if (autoRegisterPartialUpdateSchema) { + checkAndMayRegisterPartialUpdateSchema(storeName, version, schemaInLocalResources); + } } } } + public void execute() { + execute(null); + } + private boolean hasLeaderController() { try { RetryUtils.executeWithMaxAttempt( @@ -236,11 +251,20 @@ private void checkAndMayRegisterValueSchema( String storeName, int valueSchemaId, Schema schemaInZk, - Schema schemaInLocalResources) { + Schema schemaInLocalResources, + boolean skipCompatibilityCheck) { if (schemaInZk == null) { - SchemaResponse addValueSchemaResponse = controllerClient.retryableRequest( - DEFAULT_RETRY_TIMES, - c -> c.addValueSchema(storeName, schemaInLocalResources.toString(), valueSchemaId)); + SchemaResponse addValueSchemaResponse = controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, c -> { + if (skipCompatibilityCheck) { + return c.addValueSchema( + storeName, + schemaInLocalResources.toString(), + valueSchemaId, + DirectionalSchemaCompatibilityType.NONE); + } + return c.addValueSchema(storeName, schemaInLocalResources.toString(), valueSchemaId); + }); + if (addValueSchemaResponse.isError()) { throw new VeniceException( "Error when adding value schema " + valueSchemaId + " to system store " + storeName + " in cluster " diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestControllerKMERegistrationFromMessageHeader.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestControllerKMERegistrationFromMessageHeader.java new file mode 100644 index 00000000000..3d996dd609c --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestControllerKMERegistrationFromMessageHeader.java @@ -0,0 +1,131 @@ +package com.linkedin.venice.controller; + +import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS; +import static com.linkedin.venice.ConfigKeys.DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID; +import static com.linkedin.venice.ConfigKeys.DEFAULT_PARTITION_SIZE; +import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED; + +import com.linkedin.venice.helix.HelixReadWriteSchemaRepository; +import com.linkedin.venice.helix.HelixReadWriteSchemaRepositoryAdapter; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.KafkaValueSerializer; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.Utils; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestControllerKMERegistrationFromMessageHeader { + private static final Logger LOGGER = LogManager.getLogger(TestControllerKMERegistrationFromMessageHeader.class); + private static final int TEST_TIMEOUT = 90_000; // ms + private static final int NUMBER_OF_CHILD_DATACENTERS = 1; + private static final int NUMBER_OF_CLUSTERS = 1; + + private static final String[] CLUSTER_NAMES = + IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new); + + private List childDatacenters; + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; + + @BeforeClass + public void setUp() { + Properties controllerProps = new Properties(); + controllerProps.put(DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID, 2); + controllerProps.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 3); + controllerProps.put(DEFAULT_PARTITION_SIZE, 1024); + controllerProps.put(KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED, true); + multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + NUMBER_OF_CHILD_DATACENTERS, + NUMBER_OF_CLUSTERS, + 1, + 3, + 1, + 1, + 1, + Optional.of(controllerProps), + Optional.of(controllerProps), + Optional.empty()); + + childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testKMERegistrationThroughAdminTopicChannel() { + String clusterName = CLUSTER_NAMES[0]; + String storeName = Utils.getUniqueString("store"); + + VeniceControllerWrapper pController = + multiRegionMultiClusterWrapper.getLeaderParentControllerWithRetries(clusterName); + + VeniceMultiClusterWrapper child = childDatacenters.get(0); + + // Remove the latest schema from child controller's local value serializer and remove it from child colo's schema + // repository (ZK). + VeniceControllerWrapper leaderController = child.getLeaderController(clusterName); + KafkaValueSerializer valueSerializer = + leaderController.getController().getVeniceControllerService().getKafkaValueSerializer(); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Assert.assertTrue( + valueSerializer.getProtocolVersionSize() >= AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE + .getCurrentProtocolVersion()); + }); + HelixReadWriteSchemaRepositoryAdapter adapter = + (HelixReadWriteSchemaRepositoryAdapter) (leaderController.getVeniceHelixAdmin() + .getHelixVeniceClusterResources(clusterName) + .getSchemaRepository()); + HelixReadWriteSchemaRepository repo = + (HelixReadWriteSchemaRepository) adapter.getReadWriteRegularStoreSchemaRepository(); + repo.removeValueSchema( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()); + valueSerializer.removeAllSchemas(); + LOGGER.info("all schemas are removed"); + + // Verify that the latest version of the protocol is deleted in ZK. + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Assert.assertTrue( + repo.getValueSchema( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()) == null); + }); + + /* + * Calling parent controller's create store action which will trigger an admin message which contains + * the latest schema in its header, child controller when it encounters the new schema in the message header, + * would register the new schema into the child colo's schema repo as well as add to its local serializer. + */ + + pController.getVeniceAdmin().createStore(clusterName, storeName, "", "\"string\"", "\"string\"", false); + + // Verify that schema is registered in the child colo's schema repo. + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Assert.assertTrue( + repo.getValueSchema( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()) != null); + }); + + // Verify that store is created successfully in the child colo from the child controller's view. + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Assert.assertTrue(leaderController.getVeniceAdmin().getStore(clusterName, storeName).getName().equals(storeName)); + }); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java index d4b93b39f30..bc18586d9b9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java @@ -461,6 +461,10 @@ public Admin getVeniceAdmin() { return service.getVeniceControllerService().getVeniceHelixAdmin(); } + public VeniceController getController() { + return service; + } + public VeniceHelixAdmin getVeniceHelixAdmin() { return (VeniceHelixAdmin) getVeniceAdmin(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java index 680d65ab474..812c8e7129e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java @@ -66,6 +66,7 @@ import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; import static com.linkedin.venice.ConfigKeys.KAFKA_READ_ONLY_ADMIN_CLASS; import static com.linkedin.venice.ConfigKeys.KAFKA_WRITE_ONLY_ADMIN_CLASS; +import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED; import static com.linkedin.venice.ConfigKeys.MIN_NUMBER_OF_STORE_VERSIONS_TO_PRESERVE; import static com.linkedin.venice.ConfigKeys.MIN_NUMBER_OF_UNUSED_KAFKA_TOPICS_TO_PRESERVE; import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_FABRIC_ALLOWLIST; @@ -293,6 +294,8 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig { private final boolean systemSchemaInitializationAtStartTimeEnabled; + private final boolean isKMERegistrationFromMessageHeaderEnabled; + private final PubSubClientsFactory pubSubClientsFactory; public VeniceControllerConfig(VeniceProperties props) { @@ -508,6 +511,8 @@ public VeniceControllerConfig(VeniceProperties props) { props.getBoolean(CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED, false); this.systemSchemaInitializationAtStartTimeEnabled = props.getBoolean(SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED, false); + this.isKMERegistrationFromMessageHeaderEnabled = + props.getBoolean(KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED, false); try { String producerFactoryClassName = @@ -956,6 +961,10 @@ public boolean isSystemSchemaInitializationAtStartTimeEnabled() { return systemSchemaInitializationAtStartTimeEnabled; } + public boolean isKMERegistrationFromMessageHeaderEnabled() { + return isKMERegistrationFromMessageHeaderEnabled; + } + public PubSubClientsFactory getPubSubClientsFactory() { return pubSubClientsFactory; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java index 7225cbf2cb2..5bc7fa58a25 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java @@ -262,4 +262,8 @@ public int getGraveyardCleanupSleepIntervalBetweenListFetchMinutes() { public PubSubClientsFactory getPubSubClientsFactory() { return getCommonConfig().getPubSubClientsFactory(); } + + public boolean isKMERegistrationFromMessageHeaderEnabled() { + return getCommonConfig().isKMERegistrationFromMessageHeaderEnabled(); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java index 8e95c9183a5..f004d340b8c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerService.java @@ -2,6 +2,7 @@ import static com.linkedin.venice.client.store.ClientFactory.getSchemaReader; +import com.google.common.collect.ImmutableMap; import com.linkedin.d2.balancer.D2Client; import com.linkedin.venice.SSLConfig; import com.linkedin.venice.acl.DynamicAccessController; @@ -14,6 +15,7 @@ import com.linkedin.venice.controller.lingeringjob.HeartbeatBasedLingeringStoreVersionChecker; import com.linkedin.venice.controller.lingeringjob.LingeringStoreVersionChecker; import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -25,12 +27,15 @@ import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; import com.linkedin.venice.service.AbstractVeniceService; import com.linkedin.venice.service.ICProvider; +import com.linkedin.venice.system.store.ControllerClientBackedSystemSchemaInitializer; import com.linkedin.venice.utils.pools.LandFillObjectPool; import io.tehuti.metrics.MetricsRepository; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.function.BiConsumer; +import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -120,17 +125,45 @@ public VeniceControllerService( } // The admin consumer needs to use VeniceHelixAdmin to update Zookeeper directly consumerServicesByClusters = new HashMap<>(multiClusterConfigs.getClusters().size()); - /** N.B. The code below is copied from {@link com.linkedin.venice.controller.init.SystemSchemaInitializationRoutine */ - // BiConsumer newSchemaEncountered = (schemaId, schema) -> internalAdmin.addValueSchema( - // "?", // TODO: Figure out a clean way to retrieve the cluster name param - // AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), - // schema.toString(), - // schemaId, - // DirectionalSchemaCompatibilityType.NONE, - // false); - KafkaValueSerializer kafkaValueSerializer = new OptimizedKafkaValueSerializer( - // newSchemaEncountered // TODO: Wire in this hook once we figure out a clean way to do it - ); + + /** + * Register a callback function to handle the case when a new KME value schema is encountered when the child controller + * consumes the admin topics. + */ + BiConsumer newSchemaEncountered = (schemaId, schema) -> { + LOGGER.info("Encountered a new KME value schema (id = {}), proceed to register", schemaId); + String systemClusterName = multiClusterConfigs.getSystemSchemaClusterName(); + VeniceControllerConfig systemStoreClusterConfig = multiClusterConfigs.getControllerConfig(systemClusterName); + try { + ControllerClientBackedSystemSchemaInitializer schemaInitializer = + new ControllerClientBackedSystemSchemaInitializer( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, + systemClusterName, + null, + null, + false, + ((VeniceHelixAdmin) admin).getSslFactory(), + systemStoreClusterConfig.getChildControllerUrl(systemStoreClusterConfig.getRegionName()), + systemStoreClusterConfig.getChildControllerD2ServiceName(), + systemStoreClusterConfig.getChildControllerD2ZkHost(systemStoreClusterConfig.getRegionName()), + systemStoreClusterConfig.isControllerEnforceSSLOnly()); + + schemaInitializer.execute(ImmutableMap.of(schemaId, schema)); + } catch (VeniceException e) { + LOGGER.error( + "Exception in registering '{}' schema version '{}'", + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.name(), + schemaId, + e); + throw e; + } + }; + + KafkaValueSerializer kafkaValueSerializer = + (!multiClusterConfigs.isParent() && multiClusterConfigs.isKMERegistrationFromMessageHeaderEnabled()) + ? new OptimizedKafkaValueSerializer(newSchemaEncountered) + : new OptimizedKafkaValueSerializer(); + kafkaMessageEnvelopeSchemaReader.ifPresent(kafkaValueSerializer::setSchemaReader); PubSubMessageDeserializer pubSubMessageDeserializer = new PubSubMessageDeserializer( kafkaValueSerializer, @@ -220,4 +253,12 @@ public Admin getVeniceHelixAdmin() { public AdminConsumerService getAdminConsumerServiceByCluster(String cluster) { return consumerServicesByClusters.get(cluster); } + + // For testing only. + public KafkaValueSerializer getKafkaValueSerializer() { + for (Map.Entry entry: consumerServicesByClusters.entrySet()) { + return entry.getValue().getDeserializer().getValueSerializer(); + } + return null; + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java index 76a0a979040..f64aeb94ad4 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java @@ -213,4 +213,9 @@ private PubSubConsumerAdapter createKafkaConsumer(String clusterName) { return consumerFactory .create(new VeniceProperties(kafkaConsumerProperties), false, pubSubMessageDeserializer, clusterName); } + + // For testing only. + public PubSubMessageDeserializer getDeserializer() { + return this.pubSubMessageDeserializer; + } }