diff --git a/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/AvroConfigurationUtil.java b/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/AvroConfigurationUtil.java new file mode 100644 index 0000000000..4632d51682 --- /dev/null +++ b/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/AvroConfigurationUtil.java @@ -0,0 +1,55 @@ +/** + * Copyright 2014 Cloudera Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kitesdk.data.spi.filesystem; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.hadoop.io.AvroSerialization; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.avro.AvroReadSupport; +import org.kitesdk.compat.DynMethods; +import org.kitesdk.data.Format; +import org.kitesdk.data.Formats; +import org.kitesdk.data.spi.DataModelUtil; + +public class AvroConfigurationUtil { + + // Constant from AvroJob copied here so we can set it on the Configuration + // given to this class. + private static final String AVRO_SCHEMA_INPUT_KEY = "avro.schema.input.key"; + + // this is required for 1.7.4 because setDataModelClass is not available + private static final DynMethods.StaticMethod setModel = + new DynMethods.Builder("setDataModelClass") + .impl(AvroSerialization.class, Configuration.class, Class.class) + .defaultNoop() + .buildStatic(); + + public static void configure(Configuration conf, Format format, Schema schema, Class type) { + GenericData model = DataModelUtil.getDataModelForType(type); + if (Formats.AVRO.equals(format)) { + setModel.invoke(conf, model.getClass()); + conf.set(AVRO_SCHEMA_INPUT_KEY, schema.toString()); + + } else if (Formats.PARQUET.equals(format)) { + // TODO: update to a version of Parquet with setAvroDataSupplier + //AvroReadSupport.setAvroDataSupplier(conf, + // DataModelUtil.supplierClassFor(model)); + AvroReadSupport.setAvroReadSchema(conf, schema); + } + } + +} diff --git a/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/FileSystemViewKeyInputFormat.java b/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/FileSystemViewKeyInputFormat.java index 2ca7ba7dad..abd16dba68 100644 --- a/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/FileSystemViewKeyInputFormat.java +++ b/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/FileSystemViewKeyInputFormat.java @@ -17,67 +17,45 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.hadoop.io.AvroSerialization; import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.kitesdk.compat.DynMethods; +import org.apache.parquet.avro.AvroParquetInputFormat; import org.kitesdk.compat.Hadoop; import org.kitesdk.data.Format; import org.kitesdk.data.Formats; import org.kitesdk.data.spi.AbstractKeyRecordReaderWrapper; import org.kitesdk.data.spi.AbstractRefinableView; -import org.kitesdk.data.spi.DataModelUtil; import org.kitesdk.data.spi.FilteredRecordReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.parquet.avro.AvroParquetInputFormat; -import org.apache.parquet.avro.AvroReadSupport; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; class FileSystemViewKeyInputFormat extends InputFormat { private static final Logger LOG = LoggerFactory.getLogger(FileSystemViewKeyInputFormat.class); - // Constant from AvroJob copied here so we can set it on the Configuration - // given to this class. - private static final String AVRO_SCHEMA_INPUT_KEY = "avro.schema.input.key"; - - // this is required for 1.7.4 because setDataModelClass is not available - private static final DynMethods.StaticMethod setModel = - new DynMethods.Builder("setDataModelClass") - .impl(AvroSerialization.class, Configuration.class, Class.class) - .defaultNoop() - .buildStatic(); - private FileSystemDataset dataset; private FileSystemView view; public FileSystemViewKeyInputFormat(FileSystemDataset dataset, - Configuration conf) { + Configuration conf) { this.dataset = dataset; this.view = null; LOG.debug("Dataset: {}", dataset); Format format = dataset.getDescriptor().getFormat(); - setConfigProperties(conf, format, dataset.getSchema(), dataset.getType()); + AvroConfigurationUtil.configure(conf, format, dataset.getSchema(), dataset.getType()); } public FileSystemViewKeyInputFormat(FileSystemView view, Configuration conf) { @@ -87,22 +65,7 @@ public FileSystemViewKeyInputFormat(FileSystemView view, Configuration conf) Format format = dataset.getDescriptor().getFormat(); - setConfigProperties(conf, format, view.getSchema(), view.getType()); - } - - private static void setConfigProperties(Configuration conf, Format format, - Schema schema, Class type) { - GenericData model = DataModelUtil.getDataModelForType(type); - if (Formats.AVRO.equals(format)) { - setModel.invoke(conf, model.getClass()); - conf.set(AVRO_SCHEMA_INPUT_KEY, schema.toString()); - - } else if (Formats.PARQUET.equals(format)) { - // TODO: update to a version of Parquet with setAvroDataSupplier - //AvroReadSupport.setAvroDataSupplier(conf, - // DataModelUtil.supplierClassFor(model)); - AvroReadSupport.setAvroReadSchema(conf, schema); - } + AvroConfigurationUtil.configure(conf, format, view.getSchema(), view.getType()); } @Override @@ -114,7 +77,6 @@ public List getSplits(JobContext jobContext) throws IOException { if (setInputPaths(jobContext, job)) { if (Formats.AVRO.equals(format)) { - AvroJob.setInputKeySchema(job, dataset.getDescriptor().getSchema()); AvroCombineInputFormat delegate = new AvroCombineInputFormat(); return delegate.getSplits(jobContext); } else if (Formats.PARQUET.equals(format)) { diff --git a/kite-data/kite-data-crunch/src/main/java/org/kitesdk/data/crunch/DatasetSourceTarget.java b/kite-data/kite-data-crunch/src/main/java/org/kitesdk/data/crunch/DatasetSourceTarget.java index 1540debf9f..205724198a 100644 --- a/kite-data/kite-data-crunch/src/main/java/org/kitesdk/data/crunch/DatasetSourceTarget.java +++ b/kite-data/kite-data-crunch/src/main/java/org/kitesdk/data/crunch/DatasetSourceTarget.java @@ -15,10 +15,6 @@ */ package org.kitesdk.data.crunch; -import java.io.IOException; -import java.net.URI; -import java.util.Map; -import java.util.Set; import com.google.common.collect.ImmutableSet; import org.apache.avro.generic.GenericData; import org.apache.crunch.ReadableData; @@ -41,9 +37,15 @@ import org.kitesdk.data.mapreduce.DatasetKeyInputFormat; import org.kitesdk.data.spi.LastModifiedAccessor; import org.kitesdk.data.spi.SizeAccessor; +import org.kitesdk.data.spi.filesystem.AvroConfigurationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.Set; + class DatasetSourceTarget extends DatasetTarget implements ReadableSourceTarget { private static final Logger LOG = LoggerFactory @@ -72,8 +74,10 @@ public DatasetSourceTarget(View view, AvroType avroType) { this.view = view; this.avroType = avroType; - Configuration temp = new Configuration(false /* use an empty conf */ ); + Configuration temp = new Configuration(false /* use an empty conf */); DatasetKeyInputFormat.configure(temp).readFrom(view); + AvroConfigurationUtil.configure(temp, view.getDataset().getDescriptor().getFormat(), + view.getSchema(), view.getDataset().getType()); this.formatBundle = inputBundle(temp); } @@ -85,7 +89,7 @@ public DatasetSourceTarget(URI uri, AvroType avroType) { private static AvroType toAvroType(View view, Class type) { if (type.isAssignableFrom(GenericData.Record.class)) { return (AvroType) Avros.generics( - view.getDataset().getDescriptor().getSchema()); + view.getDataset().getDescriptor().getSchema()); } else { return Avros.records(type); } diff --git a/kite-data/kite-data-crunch/src/test/java/org/kitesdk/data/crunch/TestCrunchDatasets.java b/kite-data/kite-data-crunch/src/test/java/org/kitesdk/data/crunch/TestCrunchDatasets.java index 2ebe0c0456..14ef906fea 100644 --- a/kite-data/kite-data-crunch/src/test/java/org/kitesdk/data/crunch/TestCrunchDatasets.java +++ b/kite-data/kite-data-crunch/src/test/java/org/kitesdk/data/crunch/TestCrunchDatasets.java @@ -16,54 +16,32 @@ package org.kitesdk.data.crunch; import com.google.common.io.Files; -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; -import java.util.Collection; - import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; -import org.apache.avro.generic.GenericRecord; -import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.MapFn; -import org.apache.crunch.PCollection; -import org.apache.crunch.Pipeline; -import org.apache.crunch.Target; +import org.apache.crunch.*; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.kitesdk.compat.Hadoop; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetDescriptor; -import org.kitesdk.data.DatasetReader; -import org.kitesdk.data.DatasetWriter; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.Formats; -import org.kitesdk.data.MiniDFSTest; -import org.kitesdk.data.Signalable; -import org.kitesdk.data.spi.PartitionKey; -import org.kitesdk.data.PartitionStrategy; +import org.kitesdk.data.*; import org.kitesdk.data.spi.DatasetRepository; -import org.kitesdk.data.spi.PartitionedDataset; -import org.kitesdk.data.View; import org.kitesdk.data.spi.LastModifiedAccessor; -import org.kitesdk.data.URIBuilder; +import org.kitesdk.data.spi.PartitionKey; +import org.kitesdk.data.spi.PartitionedDataset; import org.kitesdk.data.user.NewUserRecord; -import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.USER_SCHEMA; -import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.checkTestUsers; -import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.datasetSize; -import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.writeTestUsers; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; + +import static org.kitesdk.data.spi.filesystem.DatasetTestUtilities.*; @RunWith(Parameterized.class) public abstract class TestCrunchDatasets extends MiniDFSTest { @@ -654,4 +632,92 @@ public void testMultipleFileReadingFromCrunch() throws IOException { checkTestUsers(outputDataset, 10); } + + @Test + public void testMultipleFileReadingFromCrunchWithDifferentReaderWriterSchemas() { + Schema userNameOnlySchema = SchemaBuilder.record("userNameOnlyRecord") + .fields() + .requiredString("username") + .endRecord(); + + Schema emailOnlySchema = SchemaBuilder.record("emailOnlyRecord") + .fields() + .requiredString("email") + .endRecord(); + + // write two files, each of 5 records, using the original schema (username and email) + Dataset writeDatasetA = repo.create("ns", "inA", new DatasetDescriptor.Builder() + .schema(USER_SCHEMA).build()); + Dataset writeDatasetB = repo.create("ns", "inB", new DatasetDescriptor.Builder() + .schema(USER_SCHEMA).build()); + writeTestUsers(writeDatasetA, 5, 0); + writeTestUsers(writeDatasetB, 5, 5); + + // update the schema of the repositories (using a schema with only the username or email field) + repo.update("ns", "inA", new DatasetDescriptor.Builder(repo.load("ns", "inA").getDescriptor()) + .schema(userNameOnlySchema).build()); + repo.update("ns", "inB", new DatasetDescriptor.Builder(repo.load("ns", "inB").getDescriptor()) + .schema(emailOnlySchema).build()); + + // run a crunch singleInputPipeline to read/write the records using the reduced schemas + Dataset inputA = repo.load("ns", "inA"); + Dataset inputB = repo.load("ns", "inB"); + + Dataset outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder() + .schema(userNameOnlySchema).build()); + + Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class); + PCollection dataA = pipeline.read(CrunchDatasets.asSource(inputA)) + .filter("remove records that don't have the correct schema", + new FilterRecordsWithExpectedSchemaFn(userNameOnlySchema.toString())); + PCollection dataB = pipeline.read(CrunchDatasets.asSource(inputB)) + .filter("remove records that don't have the correct schema", + new FilterRecordsWithExpectedSchemaFn(emailOnlySchema.toString())); + pipeline.write(dataA.union(dataB), CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND); + pipeline.run(); + + // If the records did not have the correct schema, they would have been filtered. So this checks that they all had the + // expected schema indeed. + checkReaderIteration(outputDataset.newReader(), 10, new NopRecordValidator()); + + // Repeat the same test with only a single input, to ensure that the simple case also works + Dataset singleInputOutputDataset = repo.create("ns", "out2", new DatasetDescriptor.Builder() + .schema(userNameOnlySchema).build()); + + Pipeline singleInputPipeline = new MRPipeline(TestCrunchDatasets.class); + PCollection singleInputFiltered = singleInputPipeline.read(CrunchDatasets.asSource(inputA)) + .filter("remove records that don't have the correct schema", + new FilterRecordsWithExpectedSchemaFn(userNameOnlySchema.toString())); + singleInputPipeline.write(singleInputFiltered, CrunchDatasets.asTarget(singleInputOutputDataset), Target.WriteMode.APPEND); + singleInputPipeline.run(); + + checkReaderIteration(singleInputOutputDataset.newReader(), 5, new NopRecordValidator()); + } + + private static final class FilterRecordsWithExpectedSchemaFn extends FilterFn { + + private final String expectedSchemaString; + private transient Schema expectedSchema; + + private FilterRecordsWithExpectedSchemaFn(String expectedSchemaString) { + this.expectedSchemaString = expectedSchemaString; + } + + @Override + public void initialize() { + this.expectedSchema = new Schema.Parser().parse(expectedSchemaString); + } + + @Override + public boolean accept(GenericData.Record record) { + return expectedSchema.equals(record.getSchema()); + } + } + + private static class NopRecordValidator implements RecordValidator { + @Override + public void validate(Record record, int recordNum) { + // nop + } + } } diff --git a/kite-data/kite-data-mapreduce/src/test/java/org/kitesdk/data/mapreduce/TestMapReduce.java b/kite-data/kite-data-mapreduce/src/test/java/org/kitesdk/data/mapreduce/TestMapReduce.java index be3bbdbbcf..fe42d34397 100644 --- a/kite-data/kite-data-mapreduce/src/test/java/org/kitesdk/data/mapreduce/TestMapReduce.java +++ b/kite-data/kite-data-mapreduce/src/test/java/org/kitesdk/data/mapreduce/TestMapReduce.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.util.Utf8; @@ -26,6 +28,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -62,6 +65,24 @@ protected void map(GenericData.Record record, Void value, } } + private static class SchemaVerifyingMapper + extends Mapper { + + private transient Schema expectedSchema; + + @Override + protected void setup(Context context) { + this.expectedSchema = new Schema.Parser().parse(context.getConfiguration().get("expectedSchema")); + } + + @Override + protected void map(GenericData.Record record, Void value, Context context) { + if (!expectedSchema.equals(record.getSchema())) { + throw new IllegalStateException("Record schema is not as expected: " + record.getSchema()); + } + } + } + private static class GenericStatsReducer extends Reducer { @@ -121,6 +142,34 @@ public void testJobFailsWithExisting() throws Exception { job.waitForCompletion(true); } + @Test + public void testSchemaEvolution() throws Exception { + populateInputDataset(); + + Schema updatedSchema = SchemaBuilder.record("mystring").fields() + .requiredString("text") + .optionalString("newText") + .endRecord(); + + // update the schema of the dataset (one additional field) + DatasetDescriptor updatedDescriptor = new DatasetDescriptor.Builder(inputDataset.getDescriptor()) + .schema(updatedSchema) + .build(); + inputDataset = repo.update("ns", "in", updatedDescriptor, Record.class); + + @SuppressWarnings("deprecation") + Job job = new Job(); + + DatasetKeyInputFormat.configure(job).readFrom(inputDataset).withType(GenericData.Record.class); + + job.getConfiguration().set("expectedSchema", updatedSchema.toString()); + + job.setMapperClass(SchemaVerifyingMapper.class); + job.setOutputFormatClass(NullOutputFormat.class); + + Assert.assertTrue(job.waitForCompletion(true)); + } + @Test(expected = DatasetException.class) public void testJobFailsWithEmptyButReadyOutput() throws Exception { Assume.assumeTrue(!Hadoop.isHadoop1());