Skip to content

Commit

Permalink
[controller] Superset schema generation should reflect default value …
Browse files Browse the repository at this point in the history
…updates in new value schema (#1184)

When user updates a field default value, we should reflect it in the changes in superset schema, which is used in read-compute / partial update store. This should apply to all the value type, including union schema.
  • Loading branch information
sixpluszero authored Sep 26, 2024
1 parent c772dd3 commit 04aeaeb
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,9 @@ public static SchemaEntry generateSupersetSchemaFromAllValueSchemas(Collection<S
validateTwoSchemasAreFullyCompatible(tmpSupersetSchema, valueSchema);
largestSchemaID = Math.max(largestSchemaID, valueSchemaEntry.getId());
/**
* Note that superset schema should be the second parameter. For the reason, please refer to the Javadoc of the
* {@link AvroSchemaUtils#generateSuperSetSchema} method.
* Current superset schema should be the first parameter, and the incoming value schema is the 2nd parameter.
*/
tmpSupersetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(valueSchema, tmpSupersetSchema);
tmpSupersetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(tmpSupersetSchema, valueSchema);
}
final Schema supersetSchema = tmpSupersetSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ private AvroSupersetSchemaUtils() {
* @return True if {@param s1} is {@param s2}'s superset schema and false otherwise.
*/
public static boolean isSupersetSchema(Schema s1, Schema s2) {
final Schema supersetSchema = generateSuperSetSchema(s1, s2);
final Schema supersetSchema = generateSupersetSchema(s1, s2);
return AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, supersetSchema);
}

/**
* Generate super-set schema of two Schemas. If we have {A,B,C} and {A,B,D} it will generate {A,B,C,D}, where
* C/D could be nested record change as well eg, array/map of records, or record of records.
* Prerequisite: The top-level schema are of type RECORD only and each field have default values. ie they are compatible
* schemas and the generated schema will pick the default value from s1.
* schemas and the generated schema will pick the default value from new value schema.
* @param existingSchema schema existing in the repo
* @param newSchema schema to be added.
* @return super-set schema of existingSchema abd newSchema
*/
public static Schema generateSuperSetSchema(Schema existingSchema, Schema newSchema) {
public static Schema generateSupersetSchema(Schema existingSchema, Schema newSchema) {
if (existingSchema.getType() != newSchema.getType()) {
throw new VeniceException("Incompatible schema");
}
Expand Down Expand Up @@ -79,31 +79,36 @@ public static Schema generateSuperSetSchema(Schema existingSchema, Schema newSch
superSetSchema.setFields(mergeFieldSchemas(existingSchema, newSchema));
return superSetSchema;
case ARRAY:
return Schema.createArray(generateSuperSetSchema(existingSchema.getElementType(), newSchema.getElementType()));
return Schema.createArray(generateSupersetSchema(existingSchema.getElementType(), newSchema.getElementType()));
case MAP:
return Schema.createMap(generateSuperSetSchema(existingSchema.getValueType(), newSchema.getValueType()));
return Schema.createMap(generateSupersetSchema(existingSchema.getValueType(), newSchema.getValueType()));
case UNION:
return unionSchema(existingSchema, newSchema);
default:
throw new VeniceException("Super set schema not supported");
}
}

private static Schema unionSchema(Schema s1, Schema s2) {
/**
* Merge union schema from two schema object. The rule is: If a field exist in both new schema and old schema, we should
* generate the superset schema of these two versions of the same field, with new schema's information taking higher
* priority.
*/
private static Schema unionSchema(Schema existingSchema, Schema newSchema) {
List<Schema> combinedSchema = new ArrayList<>();
Map<String, Schema> s2Schema = s2.getTypes().stream().collect(Collectors.toMap(s -> s.getName(), s -> s));
for (Schema subSchemaInS1: s1.getTypes()) {
final String fieldName = subSchemaInS1.getName();
final Schema subSchemaWithSameNameInS2 = s2Schema.get(fieldName);
if (subSchemaWithSameNameInS2 == null) {
combinedSchema.add(subSchemaInS1);
Map<String, Schema> existingSchemaTypeMap =
existingSchema.getTypes().stream().collect(Collectors.toMap(Schema::getName, s -> s));
for (Schema subSchemaInNewSchema: newSchema.getTypes()) {
final String fieldName = subSchemaInNewSchema.getName();
final Schema subSchemaInExistingSchema = existingSchemaTypeMap.get(fieldName);
if (subSchemaInExistingSchema == null) {
combinedSchema.add(subSchemaInNewSchema);
} else {
combinedSchema.add(generateSuperSetSchema(subSchemaInS1, subSchemaWithSameNameInS2));
s2Schema.remove(fieldName);
combinedSchema.add(generateSupersetSchema(subSchemaInExistingSchema, subSchemaInNewSchema));
existingSchemaTypeMap.remove(fieldName);
}
}
s2Schema.forEach((k, v) -> combinedSchema.add(v));

existingSchemaTypeMap.forEach((k, v) -> combinedSchema.add(v));
return Schema.createUnion(combinedSchema);
}

Expand Down Expand Up @@ -135,25 +140,31 @@ private static FieldBuilder deepCopySchemaField(Schema.Field field) {
return fieldBuilder;
}

private static List<Schema.Field> mergeFieldSchemas(Schema s1, Schema s2) {
/**
* Merge field schema from two schema object. The rule is: If a field exist in both new schema and old schema, we should
* generate the superset schema of these two versions of the same field, with new schema's information taking higher
* priority.
* @param newSchema new schema
* @param existingSchema old schema
* @return merged schema field
*/
private static List<Schema.Field> mergeFieldSchemas(Schema existingSchema, Schema newSchema) {
List<Schema.Field> fields = new ArrayList<>();

for (Schema.Field f1: s1.getFields()) {
Schema.Field f2 = s2.getField(f1.name());
for (Schema.Field fieldInNewSchema: newSchema.getFields()) {
Schema.Field fieldInExistingSchema = existingSchema.getField(fieldInNewSchema.name());

FieldBuilder fieldBuilder = deepCopySchemaField(f1);
if (f2 != null) {
fieldBuilder.setSchema(generateSuperSetSchema(f1.schema(), f2.schema()))
.setDoc(f1.doc() != null ? f1.doc() : f2.doc());
// merge props from f2
copyFieldProperties(fieldBuilder, f2);
FieldBuilder fieldBuilder = deepCopySchemaField(fieldInNewSchema);
if (fieldInExistingSchema != null) {
fieldBuilder.setSchema(generateSupersetSchema(fieldInExistingSchema.schema(), fieldInNewSchema.schema()))
.setDoc(fieldInNewSchema.doc() != null ? fieldInNewSchema.doc() : fieldInExistingSchema.doc());
}
fields.add(fieldBuilder.build());
}

for (Schema.Field f2: s2.getFields()) {
if (s1.getField(f2.name()) == null) {
fields.add(deepCopySchemaField(f2).build());
for (Schema.Field fieldInExistingSchema: existingSchema.getFields()) {
if (newSchema.getField(fieldInExistingSchema.name()) == null) {
fields.add(deepCopySchemaField(fieldInExistingSchema).build());
}
}
return fields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V4_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V5_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V6_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.loadFileAsString;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
Expand Down Expand Up @@ -36,7 +37,7 @@ public void testGenerateSupersetSchemaFromValueSchemasWithTwoSchemas() {
AvroSchemaUtils.generateSupersetSchemaFromAllValueSchemas(Arrays.asList(schemaEntry1, schemaEntry2));

final Schema expectedSupersetSchema =
AvroSupersetSchemaUtils.generateSuperSetSchema(schemaEntry1.getSchema(), schemaEntry2.getSchema());
AvroSupersetSchemaUtils.generateSupersetSchema(schemaEntry1.getSchema(), schemaEntry2.getSchema());
Assert.assertTrue(
AvroSchemaUtils.compareSchemaIgnoreFieldOrder(expectedSupersetSchema, supersetSchemaEntry.getSchema()));
Assert.assertEquals(supersetSchemaEntry.getId(), 2);
Expand Down Expand Up @@ -142,7 +143,7 @@ public void testSupersetSchemaDefaultCompatibility() {

Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr1);
Schema existingValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr2);
Schema newSuperSetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(existingValueSchema, newValueSchema);
Schema newSuperSetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(existingValueSchema, newValueSchema);
Assert.assertTrue(
new SchemaEntry(1, valueSchemaStr2)
.isNewSchemaCompatible(new SchemaEntry(2, newSuperSetSchema), DirectionalSchemaCompatibilityType.FULL));
Expand All @@ -161,7 +162,7 @@ public void testStringVsAvroString() {
Assert.assertNotEquals(s1, s2);
Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));

Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s2, s1);
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s2, s1);
Assert.assertNotNull(s3);
Assert.assertNotNull(
AvroCompatibilityHelper.getSchemaPropAsJsonString(s3.getField("name").schema(), "avro.java.string"));
Expand All @@ -177,7 +178,7 @@ public void testWithDifferentDocField() {
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2);
Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));

Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
Assert.assertNotNull(s3);
}

Expand All @@ -191,7 +192,7 @@ public void testSchemaMerge() {
Schema s1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr1);
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2);
Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));
Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
Assert.assertNotNull(s3);
}

Expand All @@ -206,7 +207,7 @@ public void testSchemaMergeFields() {
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2);
Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));

Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
Assert.assertNotNull(s3.getField("id1"));
Assert.assertNotNull(s3.getField("id2"));
}
Expand All @@ -222,7 +223,7 @@ public void testSchemaMergeFieldsBadDefaults() {
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2);
Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));

Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
Assert.assertNotNull(s3.getField("id1"));
Assert.assertNotNull(s3.getField("id2"));
}
Expand All @@ -237,7 +238,7 @@ public void testWithIncompatibleSchema() {
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2);

Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));
AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
}

@Test
Expand All @@ -251,11 +252,26 @@ public void testSchemaMergeUnion() {
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2);
Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));

Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
Assert.assertNotNull(s3.getField("company"));
Assert.assertNotNull(s3.getField("organization"));
}

@Test
public void testSchemaMergeUnionWithComplexItemType() {
Schema s1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(loadFileAsString("UnionV1.avsc"));
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(loadFileAsString("UnionV2.avsc"));
Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
Assert.assertNotNull(s3.getField("age"));
Assert.assertNotNull(s3.getField("field"));
Schema.Field subFieldInS2 = s2.getField("field");
Schema.Field subFieldInS3 = s3.getField("field");
Schema unionSubFieldInS2 = subFieldInS2.schema().getTypes().get(1);
Schema unionSubFieldInS3 = subFieldInS3.schema().getTypes().get(1);
Assert.assertEquals(unionSubFieldInS3, unionSubFieldInS2);
}

@Test
public void testWithNewFieldArrayRecord() {
String recordSchemaStr1 = "{\n" + " \"type\" : \"record\",\n" + " \"name\" : \"testRecord\",\n"
Expand All @@ -280,7 +296,7 @@ public void testWithNewFieldArrayRecord() {
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(recordSchemaStr2);
Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));

Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
Assert.assertNotNull(s3);
}

Expand Down Expand Up @@ -309,7 +325,7 @@ public void tesMergeWithDefaultValueUpdate() {
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2);
Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));

Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
Assert.assertNotNull(AvroSchemaUtils.getFieldDefault(s3.getField("salary")));
}

Expand All @@ -333,7 +349,7 @@ public void testWithEnumEvolution() {
Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2);

Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2));
AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2);
AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2);
}

@Test
Expand Down Expand Up @@ -425,8 +441,7 @@ public void testSupersetSchemaContainsMergeFieldProps() {

Schema schema1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr1);
Schema schema2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr2);

Schema supersetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(schema1, schema2);
Schema supersetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(schema1, schema2);

Schema.Field intField = supersetSchema.getField("int_field");
Schema.Field stringField = supersetSchema.getField("string_field");
Expand Down Expand Up @@ -496,7 +511,7 @@ public void testValidateSubsetSchema() {
Assert.assertNotEquals(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V6_SCHEMA);
// Test validation skip comparing props when checking for subset schema.
Schema supersetSchemaForV5AndV4 =
AvroSupersetSchemaUtils.generateSuperSetSchema(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V4_SCHEMA);
AvroSupersetSchemaUtils.generateSupersetSchema(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V4_SCHEMA);
Assert.assertTrue(
AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V5_SCHEMA, supersetSchemaForV5AndV4.toString()));
Assert.assertTrue(
Expand Down
31 changes: 31 additions & 0 deletions internal/venice-client-common/src/test/resources/UnionV1.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"type" : "record",
"name" : "User",
"namespace" : "example.avro",
"fields" : [
{
"name": "field",
"type": [
"int",
{
"type": "record",
"name": "subField",
"fields": [
{
"name": "name",
"type": "string",
"doc": "doc v1",
"default": "v1"
}
]
}
],
"default": 10
},
{
"name": "age",
"type": "int",
"default": 10
}
]
}
26 changes: 26 additions & 0 deletions internal/venice-client-common/src/test/resources/UnionV2.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"type" : "record",
"name" : "User",
"namespace" : "example.avro",
"fields" : [
{
"name": "field",
"type": [
"int",
{
"type": "record",
"name": "subField",
"fields": [
{
"name": "name",
"type": "string",
"doc": "doc v2",
"default": "v2"
}
]
}
],
"default": 20
}
]
}
Loading

0 comments on commit 04aeaeb

Please sign in to comment.