From 04aeaebb1fa56b5f392336074120ac9eb72acff1 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Thu, 26 Sep 2024 13:59:43 -0700 Subject: [PATCH] [controller] Superset schema generation should reflect default value updates in new value schema (#1184) When user updates a field default value, we should reflect it in the changes in superset schema, which is used in read-compute / partial update store. This should apply to all the value type, including union schema. --- .../venice/utils/AvroSchemaUtils.java | 5 +- .../venice/utils/AvroSupersetSchemaUtils.java | 67 +++++++++++-------- .../schema/TestAvroSupersetSchemaUtils.java | 45 ++++++++----- .../src/test/resources/UnionV1.avsc | 31 +++++++++ .../src/test/resources/UnionV2.avsc | 26 +++++++ .../VeniceParentHelixAdminTest.java | 43 ++++++++++++ .../venice/endToEnd/TestEmptyPush.java | 3 +- .../linkedin/venice/utils/TestWriteUtils.java | 7 ++ .../main/resources/valueSchema/UnionV1.avsc | 15 +++++ .../main/resources/valueSchema/UnionV2.avsc | 15 +++++ .../main/resources/valueSchema/UnionV3.avsc | 20 ++++++ .../controller/VeniceParentHelixAdmin.java | 2 - .../DefaultSupersetSchemaGenerator.java | 2 +- ...SupersetSchemaGeneratorWithCustomProp.java | 2 +- .../systemstore/SystemStoreRepairTask.java | 1 - 15 files changed, 232 insertions(+), 52 deletions(-) create mode 100644 internal/venice-client-common/src/test/resources/UnionV1.avsc create mode 100644 internal/venice-client-common/src/test/resources/UnionV2.avsc create mode 100644 internal/venice-test-common/src/main/resources/valueSchema/UnionV1.avsc create mode 100644 internal/venice-test-common/src/main/resources/valueSchema/UnionV2.avsc create mode 100644 internal/venice-test-common/src/main/resources/valueSchema/UnionV3.avsc diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/AvroSchemaUtils.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/AvroSchemaUtils.java index 7d3340ed27..fe96a6cb5b 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/AvroSchemaUtils.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/AvroSchemaUtils.java @@ -262,10 +262,9 @@ public static SchemaEntry generateSupersetSchemaFromAllValueSchemas(Collection combinedSchema = new ArrayList<>(); - Map s2Schema = s2.getTypes().stream().collect(Collectors.toMap(s -> s.getName(), s -> s)); - for (Schema subSchemaInS1: s1.getTypes()) { - final String fieldName = subSchemaInS1.getName(); - final Schema subSchemaWithSameNameInS2 = s2Schema.get(fieldName); - if (subSchemaWithSameNameInS2 == null) { - combinedSchema.add(subSchemaInS1); + Map existingSchemaTypeMap = + existingSchema.getTypes().stream().collect(Collectors.toMap(Schema::getName, s -> s)); + for (Schema subSchemaInNewSchema: newSchema.getTypes()) { + final String fieldName = subSchemaInNewSchema.getName(); + final Schema subSchemaInExistingSchema = existingSchemaTypeMap.get(fieldName); + if (subSchemaInExistingSchema == null) { + combinedSchema.add(subSchemaInNewSchema); } else { - combinedSchema.add(generateSuperSetSchema(subSchemaInS1, subSchemaWithSameNameInS2)); - s2Schema.remove(fieldName); + combinedSchema.add(generateSupersetSchema(subSchemaInExistingSchema, subSchemaInNewSchema)); + existingSchemaTypeMap.remove(fieldName); } } - s2Schema.forEach((k, v) -> combinedSchema.add(v)); - + existingSchemaTypeMap.forEach((k, v) -> combinedSchema.add(v)); return Schema.createUnion(combinedSchema); } @@ -135,25 +140,31 @@ private static FieldBuilder deepCopySchemaField(Schema.Field field) { return fieldBuilder; } - private static List mergeFieldSchemas(Schema s1, Schema s2) { + /** + * Merge field schema from two schema object. The rule is: If a field exist in both new schema and old schema, we should + * generate the superset schema of these two versions of the same field, with new schema's information taking higher + * priority. + * @param newSchema new schema + * @param existingSchema old schema + * @return merged schema field + */ + private static List mergeFieldSchemas(Schema existingSchema, Schema newSchema) { List fields = new ArrayList<>(); - for (Schema.Field f1: s1.getFields()) { - Schema.Field f2 = s2.getField(f1.name()); + for (Schema.Field fieldInNewSchema: newSchema.getFields()) { + Schema.Field fieldInExistingSchema = existingSchema.getField(fieldInNewSchema.name()); - FieldBuilder fieldBuilder = deepCopySchemaField(f1); - if (f2 != null) { - fieldBuilder.setSchema(generateSuperSetSchema(f1.schema(), f2.schema())) - .setDoc(f1.doc() != null ? f1.doc() : f2.doc()); - // merge props from f2 - copyFieldProperties(fieldBuilder, f2); + FieldBuilder fieldBuilder = deepCopySchemaField(fieldInNewSchema); + if (fieldInExistingSchema != null) { + fieldBuilder.setSchema(generateSupersetSchema(fieldInExistingSchema.schema(), fieldInNewSchema.schema())) + .setDoc(fieldInNewSchema.doc() != null ? fieldInNewSchema.doc() : fieldInExistingSchema.doc()); } fields.add(fieldBuilder.build()); } - for (Schema.Field f2: s2.getFields()) { - if (s1.getField(f2.name()) == null) { - fields.add(deepCopySchemaField(f2).build()); + for (Schema.Field fieldInExistingSchema: existingSchema.getFields()) { + if (newSchema.getField(fieldInExistingSchema.name()) == null) { + fields.add(deepCopySchemaField(fieldInExistingSchema).build()); } } return fields; diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSupersetSchemaUtils.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSupersetSchemaUtils.java index ed8766d851..cdcc39d553 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSupersetSchemaUtils.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/TestAvroSupersetSchemaUtils.java @@ -6,6 +6,7 @@ import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V4_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V5_SCHEMA; import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V6_SCHEMA; +import static com.linkedin.venice.utils.TestWriteUtils.loadFileAsString; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.venice.controllerapi.MultiSchemaResponse; @@ -36,7 +37,7 @@ public void testGenerateSupersetSchemaFromValueSchemasWithTwoSchemas() { AvroSchemaUtils.generateSupersetSchemaFromAllValueSchemas(Arrays.asList(schemaEntry1, schemaEntry2)); final Schema expectedSupersetSchema = - AvroSupersetSchemaUtils.generateSuperSetSchema(schemaEntry1.getSchema(), schemaEntry2.getSchema()); + AvroSupersetSchemaUtils.generateSupersetSchema(schemaEntry1.getSchema(), schemaEntry2.getSchema()); Assert.assertTrue( AvroSchemaUtils.compareSchemaIgnoreFieldOrder(expectedSupersetSchema, supersetSchemaEntry.getSchema())); Assert.assertEquals(supersetSchemaEntry.getId(), 2); @@ -142,7 +143,7 @@ public void testSupersetSchemaDefaultCompatibility() { Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr1); Schema existingValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr2); - Schema newSuperSetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(existingValueSchema, newValueSchema); + Schema newSuperSetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(existingValueSchema, newValueSchema); Assert.assertTrue( new SchemaEntry(1, valueSchemaStr2) .isNewSchemaCompatible(new SchemaEntry(2, newSuperSetSchema), DirectionalSchemaCompatibilityType.FULL)); @@ -161,7 +162,7 @@ public void testStringVsAvroString() { Assert.assertNotEquals(s1, s2); Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s2, s1); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s2, s1); Assert.assertNotNull(s3); Assert.assertNotNull( AvroCompatibilityHelper.getSchemaPropAsJsonString(s3.getField("name").schema(), "avro.java.string")); @@ -177,7 +178,7 @@ public void testWithDifferentDocField() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3); } @@ -191,7 +192,7 @@ public void testSchemaMerge() { Schema s1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr1); Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3); } @@ -206,7 +207,7 @@ public void testSchemaMergeFields() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3.getField("id1")); Assert.assertNotNull(s3.getField("id2")); } @@ -222,7 +223,7 @@ public void testSchemaMergeFieldsBadDefaults() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3.getField("id1")); Assert.assertNotNull(s3.getField("id2")); } @@ -237,7 +238,7 @@ public void testWithIncompatibleSchema() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); } @Test @@ -251,11 +252,26 @@ public void testSchemaMergeUnion() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3.getField("company")); Assert.assertNotNull(s3.getField("organization")); } + @Test + public void testSchemaMergeUnionWithComplexItemType() { + Schema s1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(loadFileAsString("UnionV1.avsc")); + Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(loadFileAsString("UnionV2.avsc")); + Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); + Assert.assertNotNull(s3.getField("age")); + Assert.assertNotNull(s3.getField("field")); + Schema.Field subFieldInS2 = s2.getField("field"); + Schema.Field subFieldInS3 = s3.getField("field"); + Schema unionSubFieldInS2 = subFieldInS2.schema().getTypes().get(1); + Schema unionSubFieldInS3 = subFieldInS3.schema().getTypes().get(1); + Assert.assertEquals(unionSubFieldInS3, unionSubFieldInS2); + } + @Test public void testWithNewFieldArrayRecord() { String recordSchemaStr1 = "{\n" + " \"type\" : \"record\",\n" + " \"name\" : \"testRecord\",\n" @@ -280,7 +296,7 @@ public void testWithNewFieldArrayRecord() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(recordSchemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(s3); } @@ -309,7 +325,7 @@ public void tesMergeWithDefaultValueUpdate() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertTrue(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - Schema s3 = AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + Schema s3 = AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); Assert.assertNotNull(AvroSchemaUtils.getFieldDefault(s3.getField("salary"))); } @@ -333,7 +349,7 @@ public void testWithEnumEvolution() { Schema s2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr2); Assert.assertFalse(AvroSchemaUtils.compareSchemaIgnoreFieldOrder(s1, s2)); - AvroSupersetSchemaUtils.generateSuperSetSchema(s1, s2); + AvroSupersetSchemaUtils.generateSupersetSchema(s1, s2); } @Test @@ -425,8 +441,7 @@ public void testSupersetSchemaContainsMergeFieldProps() { Schema schema1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr1); Schema schema2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr2); - - Schema supersetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(schema1, schema2); + Schema supersetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(schema1, schema2); Schema.Field intField = supersetSchema.getField("int_field"); Schema.Field stringField = supersetSchema.getField("string_field"); @@ -496,7 +511,7 @@ public void testValidateSubsetSchema() { Assert.assertNotEquals(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V6_SCHEMA); // Test validation skip comparing props when checking for subset schema. Schema supersetSchemaForV5AndV4 = - AvroSupersetSchemaUtils.generateSuperSetSchema(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V4_SCHEMA); + AvroSupersetSchemaUtils.generateSupersetSchema(NAME_RECORD_V5_SCHEMA, NAME_RECORD_V4_SCHEMA); Assert.assertTrue( AvroSupersetSchemaUtils.validateSubsetValueSchema(NAME_RECORD_V5_SCHEMA, supersetSchemaForV5AndV4.toString())); Assert.assertTrue( diff --git a/internal/venice-client-common/src/test/resources/UnionV1.avsc b/internal/venice-client-common/src/test/resources/UnionV1.avsc new file mode 100644 index 0000000000..81cd2b7006 --- /dev/null +++ b/internal/venice-client-common/src/test/resources/UnionV1.avsc @@ -0,0 +1,31 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "field", + "type": [ + "int", + { + "type": "record", + "name": "subField", + "fields": [ + { + "name": "name", + "type": "string", + "doc": "doc v1", + "default": "v1" + } + ] + } + ], + "default": 10 + }, + { + "name": "age", + "type": "int", + "default": 10 + } + ] +} diff --git a/internal/venice-client-common/src/test/resources/UnionV2.avsc b/internal/venice-client-common/src/test/resources/UnionV2.avsc new file mode 100644 index 0000000000..05288cf0d4 --- /dev/null +++ b/internal/venice-client-common/src/test/resources/UnionV2.avsc @@ -0,0 +1,26 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "field", + "type": [ + "int", + { + "type": "record", + "name": "subField", + "fields": [ + { + "name": "name", + "type": "string", + "doc": "doc v2", + "default": "v2" + } + ] + } + ], + "default": 20 + } + ] +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java index 7ca1f0a7d6..af14b0eb21 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/VeniceParentHelixAdminTest.java @@ -598,6 +598,7 @@ public void testStoreMetaDataUpdateFromParentToChildController( testWriteComputeSchemaAutoGeneration(parentControllerClient); testWriteComputeSchemaEnable(parentControllerClient); testWriteComputeSchemaAutoGenerationFailure(parentControllerClient); + testSupersetSchemaGenerationWithUpdateDefaultValue(parentControllerClient); testUpdateConfigs(parentControllerClient, childControllerClient); } } @@ -1132,6 +1133,48 @@ private void testWriteComputeSchemaEnable(ControllerClient parentControllerClien Assert.assertEquals(registeredWriteComputeSchema.size(), 1); } + private void testSupersetSchemaGenerationWithUpdateDefaultValue(ControllerClient parentControllerClient) { + String storeName = Utils.getUniqueString("test_store"); + String owner = "test_owner"; + String keySchemaStr = "\"long\""; + + // Step 1. Create a store with missing default fields schema + parentControllerClient + .createNewStore(storeName, owner, keySchemaStr, TestWriteUtils.UNION_RECORD_V1_SCHEMA.toString()); + MultiSchemaResponse valueAndWriteComputeSchemaResponse = + parentControllerClient.getAllValueAndDerivedSchema(storeName); + MultiSchemaResponse.Schema[] registeredSchemas = valueAndWriteComputeSchemaResponse.getSchemas(); + Assert.assertEquals(registeredSchemas.length, 1); + MultiSchemaResponse.Schema registeredSchema = registeredSchemas[0]; + Assert.assertFalse(registeredSchema.isDerivedSchema()); // No write compute schema yet. + + // Step 2. Update this store to enable write compute. + UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams(); + updateStoreQueryParams.setWriteComputationEnabled(true); + parentControllerClient.updateStore(storeName, updateStoreQueryParams); + + // Could not enable write compute bad schema did not have defaults + StoreInfo store = parentControllerClient.getStore(storeName).getStore(); + Assert.assertTrue(store.isWriteComputationEnabled()); + Assert.assertEquals(store.getLatestSuperSetValueSchemaId(), 1); + + // Step 3. Add a valid latest value schema for write-compute + parentControllerClient.addValueSchema(storeName, TestWriteUtils.UNION_RECORD_V2_SCHEMA.toString()); + TestUtils.waitForNonDeterministicAssertion( + 30, + TimeUnit.SECONDS, + () -> Assert + .assertEquals(parentControllerClient.getStore(storeName).getStore().getLatestSuperSetValueSchemaId(), 2)); + + parentControllerClient.addValueSchema(storeName, TestWriteUtils.UNION_RECORD_V3_SCHEMA.toString()); + TestUtils.waitForNonDeterministicAssertion( + 30, + TimeUnit.SECONDS, + () -> Assert + .assertEquals(parentControllerClient.getStore(storeName).getStore().getLatestSuperSetValueSchemaId(), 3)); + + } + private List getWriteComputeSchemaStrs(MultiSchemaResponse.Schema[] registeredSchemas) { List writeComputeSchemaStrs = new ArrayList<>(); for (MultiSchemaResponse.Schema schema: registeredSchemas) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestEmptyPush.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestEmptyPush.java index 3a5db98724..15ae067ee4 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestEmptyPush.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestEmptyPush.java @@ -136,7 +136,8 @@ public void testEmptyPushByChangingCompressionStrategyForHybridStore() throws IO PubSubTopic storeRealTimeTopic = venice.getPubSubTopicRepository().getTopic(Version.composeRealTimeTopic(storeName)); assertTrue(topicManager.containsTopicAndAllPartitionsAreOnline(storeRealTimeTopic)); - + // One time refresh of router metadata. + venice.refreshAllRouterMetaData(); // Start writing some real-time records SystemProducer veniceProducer = IntegrationTestPushUtils.getSamzaProducer(venice, storeName, Version.PushType.STREAM); diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java index 16ef0b5a48..62280a1dae 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java @@ -100,6 +100,13 @@ public class TestWriteUtils { public static final Schema NAME_RECORD_V6_SCHEMA = AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV6.avsc")); + public static final Schema UNION_RECORD_V1_SCHEMA = + AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/UnionV1.avsc")); + public static final Schema UNION_RECORD_V2_SCHEMA = + AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/UnionV2.avsc")); + public static final Schema UNION_RECORD_V3_SCHEMA = + AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/UnionV3.avsc")); + // ETL Schema public static final Schema ETL_KEY_SCHEMA = AvroCompatibilityHelper.parse(loadSchemaFileFromResource("etl/Key.avsc")); public static final Schema ETL_VALUE_SCHEMA = diff --git a/internal/venice-test-common/src/main/resources/valueSchema/UnionV1.avsc b/internal/venice-test-common/src/main/resources/valueSchema/UnionV1.avsc new file mode 100644 index 0000000000..1dbc85a391 --- /dev/null +++ b/internal/venice-test-common/src/main/resources/valueSchema/UnionV1.avsc @@ -0,0 +1,15 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "count", + "type": [ + "int", + "null" + ], + "default": 0 + } + ] +} diff --git a/internal/venice-test-common/src/main/resources/valueSchema/UnionV2.avsc b/internal/venice-test-common/src/main/resources/valueSchema/UnionV2.avsc new file mode 100644 index 0000000000..e1b349247e --- /dev/null +++ b/internal/venice-test-common/src/main/resources/valueSchema/UnionV2.avsc @@ -0,0 +1,15 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "count", + "type": [ + "null", + "int" + ], + "default": null + } + ] +} diff --git a/internal/venice-test-common/src/main/resources/valueSchema/UnionV3.avsc b/internal/venice-test-common/src/main/resources/valueSchema/UnionV3.avsc new file mode 100644 index 0000000000..0912be4c31 --- /dev/null +++ b/internal/venice-test-common/src/main/resources/valueSchema/UnionV3.avsc @@ -0,0 +1,20 @@ +{ + "type" : "record", + "name" : "User", + "namespace" : "example.avro", + "fields" : [ + { + "name": "count", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "dummyField", + "type" : "string", + "default" : "" + } + ] +} diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index aaf3fa1ed8..32331b5dea 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -3097,7 +3097,6 @@ public SchemaEntry addValueSchema( SupersetSchemaGenerator supersetSchemaGenerator = getSupersetSchemaGenerator(clusterName); Schema newSuperSetSchema = supersetSchemaGenerator.generateSupersetSchema(existingValueSchema, newValueSchema); String newSuperSetSchemaStr = newSuperSetSchema.toString(); - if (supersetSchemaGenerator.compareSchema(newSuperSetSchema, newValueSchema)) { doUpdateSupersetSchemaID = true; @@ -3143,7 +3142,6 @@ public SchemaEntry addValueSchema( } else { doUpdateSupersetSchemaID = false; } - SchemaEntry addedSchemaEntry = addValueSchemaEntry(clusterName, storeName, newValueSchemaStr, schemaId, doUpdateSupersetSchemaID); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/DefaultSupersetSchemaGenerator.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/DefaultSupersetSchemaGenerator.java index 0696a37fc9..0a5557aaf3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/DefaultSupersetSchemaGenerator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/DefaultSupersetSchemaGenerator.java @@ -20,6 +20,6 @@ public boolean compareSchema(Schema s1, Schema s2) { @Override public Schema generateSupersetSchema(Schema existingSchema, Schema newSchema) { - return AvroSupersetSchemaUtils.generateSuperSetSchema(existingSchema, newSchema); + return AvroSupersetSchemaUtils.generateSupersetSchema(existingSchema, newSchema); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/SupersetSchemaGeneratorWithCustomProp.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/SupersetSchemaGeneratorWithCustomProp.java index 92519628b5..f29fcd612c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/SupersetSchemaGeneratorWithCustomProp.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/supersetschema/SupersetSchemaGeneratorWithCustomProp.java @@ -76,7 +76,7 @@ public boolean compareSchema(Schema s1, Schema s2) { @Override public Schema generateSupersetSchema(Schema existingSchema, Schema newSchema) { - Schema supersetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(existingSchema, newSchema); + Schema supersetSchema = AvroSupersetSchemaUtils.generateSupersetSchema(existingSchema, newSchema); String customPropInNewSchema = newSchema.getProp(customProp); if (customPropInNewSchema != null && supersetSchema.getProp(customProp) == null) { Schema newSupersetSchema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(supersetSchema.toString()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairTask.java index dff7e87124..0524106839 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/systemstore/SystemStoreRepairTask.java @@ -224,7 +224,6 @@ void checkHeartbeatFromSystemStores( } long retrievedHeartbeatTimestamp = getHeartbeatFromSystemStore(clusterName, entry.getKey()); - LOGGER.info("DEBUGGING: {} {} {}", entry.getKey(), entry.getValue(), retrievedHeartbeatTimestamp); if (retrievedHeartbeatTimestamp < entry.getValue()) { newUnhealthySystemStoreSet.add(entry.getKey()); if (retrievedHeartbeatTimestamp == -1) {