Skip to content

Commit

Permalink
[controller][test] Message header triggered new KME schema registrati…
Browse files Browse the repository at this point in the history
…on in child controller

As part of the solution to remove deployment order for KafkaMessageEnvelope (KME) value schema,
for controllers, when they find an unknown KME schema from the message header, they need to
(talk to the system cluster leader controller to) register the new schema into local colo
system store. Otherwise, as KME is not embedded in every Kafka message, if servers restart
and resume the consumption from a non-SOS message with new KME (for example, lor1 servers
could consume TS sent by lva1 controller), servers can not find the right schema to
deserialize it and keep failing.

This rb enables the child controllers to register unknown KME schemas when discovering them
from the message's header. It maily leverages the existing functionailities from the
ControllerClientBackedSystemSchemaInitializer class for the implementation. A new config is
introduced to enable this feature and a new integration test is added to verify the
correctness of the new feature.
  • Loading branch information
lluwm committed Sep 11, 2023
1 parent 5b5feaa commit 251235b
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,9 @@ private ConfigKeys() {
public static final String SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED =
"system.schema.initialization.at.start.time.enabled";

public static final String KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED =
"kme.registration.from.message.header.enabled";

// Server specific configs
public static final String LISTENER_PORT = "listener.port";
public static final String GRPC_READ_SERVER_PORT = "grpc.read.server.port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,4 +558,10 @@ private void preCheckStoreCondition(String storeName) {
throw new VeniceNoStoreException(storeName);
}
}

// For testing purpose only.
public void removeValueSchema(String storeName, int schemaId) {
preCheckStoreCondition(storeName);
accessor.removeValueSchema(storeName, String.valueOf(schemaId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,9 @@ public void clear() {
readOnlyZKSharedSchemaRepository.clear();
readWriteRegularStoreSchemaRepository.clear();
}

// For testing purpose only.
public ReadWriteSchemaRepository getReadWriteRegularStoreSchemaRepository() {
return this.readWriteRegularStoreSchemaRepository;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,10 @@ String getReplicationMetadataSchemaPath(String storeName, String replicationMeta
.append(replicationMetadataVersionIdPair)
.toString();
}

// For testing purpose only.
public void removeValueSchema(String storeName, String schemaId) {
HelixUtils.remove(schemaAccessor, getValueSchemaPath(storeName, schemaId));
logger.info("Removed value schema: {} for store: {}.", schemaId, storeName);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.pubsub.api;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
Expand Down Expand Up @@ -63,6 +64,15 @@ public PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> deserialize(
Schema providedProtocolSchema = AvroCompatibilityHelper.parse(new String(header.value()));
value =
valueSerializer.deserialize(valueBytes, providedProtocolSchema, getEnvelope(key.getKeyHeaderByte()));
} catch (VeniceException e) {
/*
* If a Venice exception is caught when registering a new discovered schema, we don't need any special
* management here, delegate it to the follower deserialize to handle such case.
*/
LOGGER.warn(
"Venice exception in registering new encountered schema in protocol header: ",
VENICE_TRANSPORT_PROTOCOL_HEADER,
e);
} catch (Exception e) {
// Improper header... will ignore.
LOGGER.warn("Received unparsable schema in protocol header: " + VENICE_TRANSPORT_PROTOCOL_HEADER, e);
Expand Down Expand Up @@ -97,4 +107,9 @@ private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) {
throw new IllegalStateException("Illegal key header byte: " + keyHeaderByte);
}
}

// For testing only.
public KafkaValueSerializer getValueSerializer() {
return valueSerializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.SparseConcurrentList;
import com.linkedin.venice.utils.SparseConcurrentListWithOffset;
import com.linkedin.venice.utils.Utils;
import it.unimi.dsi.fastutil.ints.IntLinkedOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -93,7 +93,7 @@ private static class ReusableObjects {
private final SpecificDatumWriter writer;

/** Maintains the mapping between protocol version and the corresponding {@link SpecificDatumReader<SPECIFIC_RECORD>} */
private final List<VeniceSpecificDatumReader<SPECIFIC_RECORD>> protocolVersionToReader;
private final SparseConcurrentList<VeniceSpecificDatumReader<SPECIFIC_RECORD>> protocolVersionToReader;

/** The schema of the {@link SpecificRecord} which is compiled in the current version of the code. */
private final Schema compiledProtocol;
Expand Down Expand Up @@ -355,11 +355,28 @@ public SPECIFIC_RECORD deserialize(byte[] bytes, int protocolVersion, SPECIFIC_R

public SPECIFIC_RECORD deserialize(byte[] bytes, Schema providedProtocolSchema, SPECIFIC_RECORD reuse) {
int protocolVersion = getProtocolVersion(bytes);
VeniceSpecificDatumReader<SPECIFIC_RECORD> specificDatumReader = protocolVersionToReader.get(protocolVersion);
if (specificDatumReader == null) {
specificDatumReader = cacheDatumReader(protocolVersion, providedProtocolSchema);
newSchemaEncountered.accept(protocolVersion, providedProtocolSchema);
}

/**
* When a new schema version is discovered during data ingestion, there will be only one thread registering
* the new schema for all the users of the shared InternalAvroSpecificSerializer object. During this period
* of registration, other concurrent threads that calling deserialize will be blocked until the registration
* of the first thread is finished. (It would help to avoid the concurrent request loads in the case e.g. when
* all shared consumer threads in one venice server discover an unknown schema from header, and all the
* threads decide to register this schema to controller.)
*
* It is important to notice that once the registration part is completed, it still allows the deserialization
* to run in parallel.
*
* Also notice that if exception happens in newSchemaEncountered, local protocolVersionToReader will not
* be updated, and it gives its caller an opportunity to discover the failure and perform any
* remediation steps (e.g. retry etc.).
*/
VeniceSpecificDatumReader<SPECIFIC_RECORD> specificDatumReader =
protocolVersionToReader.computeIfAbsent(protocolVersion, index -> {
newSchemaEncountered.accept(protocolVersion, providedProtocolSchema);
return cacheDatumReader(protocolVersion, providedProtocolSchema);
});

return deserialize(bytes, specificDatumReader, reuse);
}

Expand Down Expand Up @@ -426,4 +443,14 @@ private VeniceSpecificDatumReader<SPECIFIC_RECORD> cacheDatumReader(int protocol
private String getCurrentlyLoadedProtocolVersions() {
return knownProtocols().toString();
}

// For testing only.
public void removeAllSchemas() {
this.protocolVersionToReader.clear();
}

// For testing purpose only.
public int getProtocolVersionSize() {
return protocolVersionToReader.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
Expand Down Expand Up @@ -76,7 +77,7 @@ public ControllerClientBackedSystemSchemaInitializer(
this.enforceSslOnly = enforceSslOnly;
}

public void execute() {
public void execute(Map<Integer, Schema> inputSchemas) {
if (controllerClient == null) {
if (!controllerUrl.isEmpty()) {
controllerClient = ControllerClient.constructClusterControllerClient(clusterName, controllerUrl, sslFactory);
Expand All @@ -100,7 +101,9 @@ public void execute() {
}

String storeName = protocolDefinition.getSystemStoreName();
Map<Integer, Schema> schemasInLocalResources = Utils.getAllSchemasFromResources(protocolDefinition);

Map<Integer, Schema> schemasInLocalResources =
inputSchemas != null ? inputSchemas : Utils.getAllSchemasFromResources(protocolDefinition);
D2ServiceDiscoveryResponse discoveryResponse = controllerClient.retryableRequest(
DEFAULT_RETRY_TIMES,
c -> c.discoverCluster(storeName),
Expand Down Expand Up @@ -142,19 +145,31 @@ public void execute() {
for (int version = 1; version <= protocolDefinition.getCurrentProtocolVersion(); version++) {
Schema schemaInLocalResources = schemasInLocalResources.get(version);
if (schemaInLocalResources == null) {
throw new VeniceException(
"Invalid protocol definition: " + protocolDefinition.name() + " does not have a version " + version
+ " even though it is less than or equal to the current version ("
+ protocolDefinition.getCurrentProtocolVersion() + ").");
}
checkAndMayRegisterValueSchema(storeName, version, schemasInZk.get(version), schemaInLocalResources);
if (inputSchemas == null) {
throw new VeniceException(
"Invalid protocol definition: " + protocolDefinition.name() + " does not have a version " + version
+ " even though it is less than or equal to the current version ("
+ protocolDefinition.getCurrentProtocolVersion() + ").");
}
} else {
checkAndMayRegisterValueSchema(
storeName,
version,
schemasInZk.get(version),
schemaInLocalResources,
inputSchemas != null);

if (autoRegisterPartialUpdateSchema) {
checkAndMayRegisterPartialUpdateSchema(storeName, version, schemaInLocalResources);
if (autoRegisterPartialUpdateSchema) {
checkAndMayRegisterPartialUpdateSchema(storeName, version, schemaInLocalResources);
}
}
}
}

public void execute() {
execute(null);
}

private boolean hasLeaderController() {
try {
RetryUtils.executeWithMaxAttempt(
Expand Down Expand Up @@ -236,11 +251,20 @@ private void checkAndMayRegisterValueSchema(
String storeName,
int valueSchemaId,
Schema schemaInZk,
Schema schemaInLocalResources) {
Schema schemaInLocalResources,
boolean skipCompatibilityCheck) {
if (schemaInZk == null) {
SchemaResponse addValueSchemaResponse = controllerClient.retryableRequest(
DEFAULT_RETRY_TIMES,
c -> c.addValueSchema(storeName, schemaInLocalResources.toString(), valueSchemaId));
SchemaResponse addValueSchemaResponse = controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, c -> {
if (skipCompatibilityCheck) {
return c.addValueSchema(
storeName,
schemaInLocalResources.toString(),
valueSchemaId,
DirectionalSchemaCompatibilityType.NONE);
}
return c.addValueSchema(storeName, schemaInLocalResources.toString(), valueSchemaId);
});

if (addValueSchemaResponse.isError()) {
throw new VeniceException(
"Error when adding value schema " + valueSchemaId + " to system store " + storeName + " in cluster "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
import static com.linkedin.venice.ConfigKeys.DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID;
import static com.linkedin.venice.ConfigKeys.DEFAULT_PARTITION_SIZE;
import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED;

import com.linkedin.venice.helix.HelixReadWriteSchemaRepository;
import com.linkedin.venice.helix.HelixReadWriteSchemaRepositoryAdapter;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestControllerKMERegistrationFromMessageHeader {
private static final Logger LOGGER = LogManager.getLogger(TestControllerKMERegistrationFromMessageHeader.class);
private static final int TEST_TIMEOUT = 90_000; // ms
private static final int NUMBER_OF_CHILD_DATACENTERS = 1;
private static final int NUMBER_OF_CLUSTERS = 1;

private static final String[] CLUSTER_NAMES =
IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new);

private List<VeniceMultiClusterWrapper> childDatacenters;
private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;

@BeforeClass
public void setUp() {
Properties controllerProps = new Properties();
controllerProps.put(DEFAULT_NUMBER_OF_PARTITION_FOR_HYBRID, 2);
controllerProps.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 3);
controllerProps.put(DEFAULT_PARTITION_SIZE, 1024);
controllerProps.put(KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED, true);
multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
NUMBER_OF_CHILD_DATACENTERS,
NUMBER_OF_CLUSTERS,
1,
3,
1,
1,
1,
Optional.of(controllerProps),
Optional.of(controllerProps),
Optional.empty());

childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
}

@AfterClass(alwaysRun = true)
public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test(timeOut = TEST_TIMEOUT)
public void testKMERegistrationThroughAdminTopicChannel() {
String clusterName = CLUSTER_NAMES[0];
String storeName = Utils.getUniqueString("store");

VeniceControllerWrapper pController =
multiRegionMultiClusterWrapper.getLeaderParentControllerWithRetries(clusterName);

VeniceMultiClusterWrapper child = childDatacenters.get(0);

// Remove the latest schema from child controller's local value serializer and remove it from child colo's schema
// repository (ZK).
VeniceControllerWrapper leaderController = child.getLeaderController(clusterName);
KafkaValueSerializer valueSerializer =
leaderController.getController().getVeniceControllerService().getKafkaValueSerializer();
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
Assert.assertTrue(
valueSerializer.getProtocolVersionSize() >= AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE
.getCurrentProtocolVersion());
});
HelixReadWriteSchemaRepositoryAdapter adapter =
(HelixReadWriteSchemaRepositoryAdapter) (leaderController.getVeniceHelixAdmin()
.getHelixVeniceClusterResources(clusterName)
.getSchemaRepository());
HelixReadWriteSchemaRepository repo =
(HelixReadWriteSchemaRepository) adapter.getReadWriteRegularStoreSchemaRepository();
repo.removeValueSchema(
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(),
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion());
valueSerializer.removeAllSchemas();
LOGGER.info("all schemas are removed");

// Verify that the latest version of the protocol is deleted in ZK.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
Assert.assertTrue(
repo.getValueSchema(
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(),
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()) == null);
});

/*
* Calling parent controller's create store action which will trigger an admin message which contains
* the latest schema in its header, child controller when it encounters the new schema in the message header,
* would register the new schema into the child colo's schema repo as well as add to its local serializer.
*/

pController.getVeniceAdmin().createStore(clusterName, storeName, "", "\"string\"", "\"string\"", false);

// Verify that schema is registered in the child colo's schema repo.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
Assert.assertTrue(
repo.getValueSchema(
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(),
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()) != null);
});

// Verify that store is created successfully in the child colo from the child controller's view.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
Assert.assertTrue(leaderController.getVeniceAdmin().getStore(clusterName, storeName).getName().equals(storeName));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ public Admin getVeniceAdmin() {
return service.getVeniceControllerService().getVeniceHelixAdmin();
}

public VeniceController getController() {
return service;
}

public VeniceHelixAdmin getVeniceHelixAdmin() {
return (VeniceHelixAdmin) getVeniceAdmin();
}
Expand Down
Loading

0 comments on commit 251235b

Please sign in to comment.