Skip to content

Commit

Permalink
[router][venice-thin-client] Add safeguard against invalid schema ID (#…
Browse files Browse the repository at this point in the history
…1178)

If schema repo returns invalid schema id, that fails client requests. This PR adds safeguard against such invalid schemas in meta data handler and client side schema reader.
---------
Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 authored Sep 17, 2024
1 parent 1fc53dc commit 37ea7b3
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,13 @@ private void updateAllValueSchemas(boolean forceRefresh) {
valueSchemaIdSet = fetchAllValueSchemaIdsFromRouter();
} catch (Exception e) {
LOGGER.warn(
"Caught exception when trying to fetch all value schema IDs from router, will fetch all value schema entries instead.");
"Caught exception when trying to fetch all value schema IDs from router, will fetch all value schema entries instead.",
e);
// Fall back to fetch all value schema.
for (SchemaEntry valueSchemaEntry: fetchAllValueSchemaEntriesFromRouter()) {
if (!isValidSchemaEntry(valueSchemaEntry)) {
continue;
}
valueSchemaEntryMap.put(valueSchemaEntry.getId(), valueSchemaEntry);
cacheValueAndCanonicalSchemas(valueSchemaEntry.getSchema(), valueSchemaEntry.getId());
}
Expand Down Expand Up @@ -441,7 +445,7 @@ private SchemaEntry maybeFetchLatestValueSchemaEntry() {
* one active value schema.
*/
synchronized (this) {
if (latest != null && !shouldRefreshLatestValueSchemaEntry.get()) {
if (latest != null && !shouldRefreshLatestValueSchemaEntry.get() && isValidSchemaEntry(latest)) {
return latest;
}
updateAllValueSchemaEntriesAndLatestValueSchemaEntry(false);
Expand All @@ -450,6 +454,9 @@ private SchemaEntry maybeFetchLatestValueSchemaEntry() {
latest = latestValueSchemaEntry.get();
}
}
if (latest == null || !isValidSchemaEntry(latest)) {
throw new VeniceClientException("Failed to get latest value schema for store: " + storeName);
}
return latest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.venice.compute.protocol.request.DotProduct;
import com.linkedin.venice.compute.protocol.request.HadamardProduct;
import com.linkedin.venice.compute.protocol.request.enums.ComputeOperationType;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand Down Expand Up @@ -80,6 +81,9 @@ public abstract class AbstractAvroComputeRequestBuilder<K> implements ComputeReq

public AbstractAvroComputeRequestBuilder(AvroGenericReadComputeStoreClient storeClient, SchemaReader schemaReader) {
this.latestValueSchemaId = schemaReader.getLatestValueSchemaId();
if (latestValueSchemaId == SchemaData.INVALID_VALUE_SCHEMA_ID) {
throw new VeniceClientException("Invalid value schema ID: " + latestValueSchemaId);
}
this.latestValueSchema = schemaReader.getValueSchema(latestValueSchemaId);
if (latestValueSchema.getType() != Schema.Type.RECORD) {
throw new VeniceClientException("Only value schema with 'RECORD' type is supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public void testGetLatestValueSchemaWhenNoValueSchema()
0);

try (SchemaReader schemaReader = new RouterBackedSchemaReader(() -> mockClient)) {
Assert.assertNull(schemaReader.getLatestValueSchema());
Assert.assertThrows(VeniceClientException.class, () -> schemaReader.getLatestValueSchema());
Mockito.verify(mockClient, Mockito.timeout(TIMEOUT).times(1)).getRaw(Mockito.anyString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,19 @@ private void handleValueSchemaLookup(ChannelHandlerContext ctx, VenicePathParser
responseObject.setSuperSetSchemaId(superSetSchemaId);
}
Collection<SchemaEntry> valueSchemaEntries = schemaRepo.getValueSchemas(storeName);
int schemaNum = valueSchemaEntries.size();
int schemaNum = (int) valueSchemaEntries.stream().filter(schemaEntry -> schemaEntry.getId() > 0).count();
MultiSchemaResponse.Schema[] schemas = new MultiSchemaResponse.Schema[schemaNum];
int index = 0;
for (SchemaEntry entry: valueSchemaEntries) {
int schemaId = entry.getId();
if (schemaId < 1) {
LOGGER.warn(
"Got an invalid schema id ({}) for store {} in handleValueSchemaLookup; will not include this in the {}.",
entry.getId(),
storeName,
responseObject.getClass().getSimpleName());
continue;
}
schemas[index] = new MultiSchemaResponse.Schema();
schemas[index].setId(schemaId);
schemas[index].setSchemaStr(entry.getSchema().toString());
Expand Down Expand Up @@ -385,6 +393,14 @@ private void handleValueSchemaIdsLookup(ChannelHandlerContext ctx, VenicePathPar
}
Set<Integer> schemaIdSet = new HashSet<>();
for (SchemaEntry entry: schemaRepo.getValueSchemas(storeName)) {
if (entry.getId() < 1) {
LOGGER.warn(
"Got an invalid schema id ({}) for store {} in handleValueSchemaIdsLookup; will not include this in the {}.",
entry.getId(),
storeName,
responseObject.getClass().getSimpleName());
continue;
}
schemaIdSet.add(entry.getId());
}
responseObject.setSchemaIdSet(schemaIdSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,10 @@ public void testAllValueSchemaIdLookup() throws IOException {
ReadOnlySchemaRepository schemaRepo = Mockito.mock(ReadOnlySchemaRepository.class);
SchemaEntry valueSchemaEntry1 = new SchemaEntry(valueSchemaId1, valueSchemaStr1);
SchemaEntry valueSchemaEntry2 = new SchemaEntry(valueSchemaId2, valueSchemaStr2);
Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2)).when(schemaRepo).getValueSchemas(storeName);
SchemaEntry valueSchemaEntry3 = new SchemaEntry(-1, valueSchemaStr2);
Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3))
.when(schemaRepo)
.getValueSchemas(storeName);
FullHttpResponse response = passRequestToMetadataHandler(
"http://myRouterHost:4567/all_value_schema_ids/" + storeName,
null,
Expand Down Expand Up @@ -412,7 +415,8 @@ public void testAllValueSchemaLookup() throws IOException {
SchemaEntry valueSchemaEntry1 = new SchemaEntry(valueSchemaId1, valueSchemaStr1);
SchemaEntry valueSchemaEntry2 = new SchemaEntry(valueSchemaId2, valueSchemaStr2);
SchemaEntry valueSchemaEntry3 = new SchemaEntry(valueSchemaId3, valueSchemaStr3);
Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3))
SchemaEntry valueSchemaEntry4 = new SchemaEntry(-1, valueSchemaStr3);
Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3, valueSchemaEntry4))
.when(schemaRepo)
.getValueSchemas(storeName);

Expand Down

0 comments on commit 37ea7b3

Please sign in to comment.