From 40151abbfdd3b1deb6a7b3de395562de91b818a6 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 7 Jan 2025 16:31:36 -0500 Subject: [PATCH] Add Iceberg support for name-based mapping schema (#33315) * Add Iceberg support for name-based mapping schema * Add nullable annotation * Add nested field * iceberg-gcp already as a runtimeOnly * Trigger IT tests --- .../IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/iceberg/build.gradle | 2 + .../beam/sdk/io/iceberg/ScanTaskReader.java | 37 +++- .../sdk/io/iceberg/IcebergIOReadTest.java | 202 ++++++++++++++++++ .../sdk/io/iceberg/TestDataWarehouse.java | 4 + .../beam/sdk/io/iceberg/TestFixtures.java | 7 + 6 files changed, 244 insertions(+), 10 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index bbdc3a3910ef..62ae7886c573 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3 + "modification": 4 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index f7a9e5c8533d..19fcbb7d1ea0 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -65,6 +65,8 @@ dependencies { testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_gcs_connector testImplementation library.java.bigdataoss_util_hadoop + testImplementation "org.apache.parquet:parquet-avro:$parquet_version" + testImplementation "org.apache.parquet:parquet-common:$parquet_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:extensions:google-cloud-platform-core") diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index 5784dfd79744..c9906618a64d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -35,6 +35,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; @@ -47,6 +48,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Type; @@ -98,6 +100,8 @@ public boolean advance() throws IOException { // which are not null-safe. @SuppressWarnings("nullness") org.apache.iceberg.@NonNull Schema project = this.project; + @Nullable + String nameMapping = source.getTable().properties().get(TableProperties.DEFAULT_NAME_MAPPING); do { // If our current iterator is working... do that. @@ -129,37 +133,52 @@ public boolean advance() throws IOException { switch (file.format()) { case ORC: LOG.info("Preparing ORC input"); - iterable = + ORC.ReadBuilder orcReader = ORC.read(input) .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc( fileSchema -> GenericOrcReader.buildReader(project, fileSchema, idToConstants)) - .filter(fileTask.residual()) - .build(); + .filter(fileTask.residual()); + + if (nameMapping != null) { + orcReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + iterable = orcReader.build(); break; case PARQUET: LOG.info("Preparing Parquet input."); - iterable = + Parquet.ReadBuilder parquetReader = Parquet.read(input) .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc( fileSchema -> GenericParquetReaders.buildReader(project, fileSchema, idToConstants)) - .filter(fileTask.residual()) - .build(); + .filter(fileTask.residual()); + + if (nameMapping != null) { + parquetReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + iterable = parquetReader.build(); break; case AVRO: LOG.info("Preparing Avro input."); - iterable = + Avro.ReadBuilder avroReader = Avro.read(input) .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc( - fileSchema -> DataReader.create(project, fileSchema, idToConstants)) - .build(); + fileSchema -> DataReader.create(project, fileSchema, idToConstants)); + + if (nameMapping != null) { + avroReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + iterable = avroReader.build(); break; default: throw new UnsupportedOperationException("Cannot read format: " + file.format()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 6ff3bdf6a4ff..1c3f9b53f31a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -21,6 +21,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -28,6 +30,8 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; @@ -36,13 +40,31 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.MappedFields; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -210,4 +232,184 @@ public void testIdentityColumnScan() throws Exception { testPipeline.run(); } + + @Test + public void testNameMappingScan() throws Exception { + org.apache.avro.Schema metadataSchema = + org.apache.avro.Schema.createRecord( + "metadata", + null, + null, + false, + ImmutableList.of( + new org.apache.avro.Schema.Field( + "source", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), + null, + null))); + + org.apache.avro.Schema avroSchema = + org.apache.avro.Schema.createRecord( + "test", + null, + null, + false, + ImmutableList.of( + new org.apache.avro.Schema.Field( + "data", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), + null, + null), + new org.apache.avro.Schema.Field( + "id", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), + null, + null), + new org.apache.avro.Schema.Field("metadata", metadataSchema, null, null))); + + List> recordData = + ImmutableList.>builder() + .add( + ImmutableMap.of( + "id", + 0L, + "data", + "clarification", + "metadata", + ImmutableMap.of("source", "systemA"))) + .add( + ImmutableMap.of( + "id", 1L, "data", "risky", "metadata", ImmutableMap.of("source", "systemB"))) + .add( + ImmutableMap.of( + "id", 2L, "data", "falafel", "metadata", ImmutableMap.of("source", "systemC"))) + .build(); + + List avroRecords = + recordData.stream() + .map(data -> avroGenericRecord(avroSchema, data)) + .collect(Collectors.toList()); + + Configuration hadoopConf = new Configuration(); + String path = createParquetFile(avroSchema, avroRecords); + HadoopInputFile inputFile = HadoopInputFile.fromLocation(path, hadoopConf); + + NameMapping defaultMapping = + NameMapping.of( + MappedField.of(1, "id"), + MappedField.of(2, "data"), + MappedField.of(3, "metadata", MappedFields.of(MappedField.of(4, "source")))); + ImmutableMap tableProperties = + ImmutableMap.builder() + .put(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(defaultMapping)) + .build(); + + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = + warehouse + .buildTable(tableId, TestFixtures.NESTED_SCHEMA) + .withProperties(tableProperties) + .withPartitionSpec(PartitionSpec.unpartitioned()) + .create(); + + MetricsConfig metricsConfig = MetricsConfig.forTable(simpleTable); + Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withFormat(FileFormat.PARQUET) + .withInputFile(inputFile) + .withMetrics(metrics) + .build(); + + final Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.NESTED_SCHEMA); + + simpleTable.newFastAppend().appendFile(dataFile).commit(); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + PCollection output = + testPipeline + .apply(IcebergIO.readRows(catalogConfig).from(tableId)) + .apply(ParDo.of(new PrintRow())) + .setCoder(RowCoder.of(beamSchema)); + + final Row[] expectedRows = + recordData.stream() + .map(data -> icebergGenericRecord(TestFixtures.NESTED_SCHEMA.asStruct(), data)) + .map(record -> IcebergUtils.icebergRecordToBeamRow(beamSchema, record)) + .toArray(Row[]::new); + + PAssert.that(output) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(expectedRows)); + return null; + }); + + testPipeline.run(); + } + + @SuppressWarnings("unchecked") + public static GenericRecord avroGenericRecord( + org.apache.avro.Schema schema, Map values) { + GenericRecord record = new GenericData.Record(schema); + for (org.apache.avro.Schema.Field field : schema.getFields()) { + Object rawValue = values.get(field.name()); + Object avroValue = + rawValue instanceof Map + ? avroGenericRecord(field.schema(), (Map) rawValue) + : rawValue; + record.put(field.name(), avroValue); + } + return record; + } + + @SuppressWarnings("unchecked") + public static Record icebergGenericRecord(Types.StructType type, Map values) { + org.apache.iceberg.data.GenericRecord record = + org.apache.iceberg.data.GenericRecord.create(type); + for (Types.NestedField field : type.fields()) { + Object rawValue = values.get(field.name()); + Object value = + rawValue instanceof Map + ? icebergGenericRecord(field.type().asStructType(), (Map) rawValue) + : rawValue; + record.setField(field.name(), value); + } + return record; + } + + public static String createParquetFile(org.apache.avro.Schema schema, List records) + throws IOException { + + File tempFile = createTempFile(); + Path file = new Path(tempFile.getPath()); + + AvroParquetWriter.Builder builder = AvroParquetWriter.builder(file); + ParquetWriter parquetWriter = builder.withSchema(schema).build(); + for (GenericRecord record : records) { + parquetWriter.write(record); + } + parquetWriter.close(); + + return tempFile.getPath(); + } + + private static File createTempFile() throws IOException { + File tempFile = File.createTempFile(ScanSourceTest.class.getSimpleName(), ".tmp"); + tempFile.deleteOnExit(); + boolean unused = tempFile.delete(); + return tempFile; + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index 9352123b5c77..d7a6cd34838c 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -163,6 +163,10 @@ public Table createTable( return catalog.createTable(tableId, schema, partitionSpec); } + public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) { + return catalog.buildTable(tableId, schema); + } + public Table loadTable(TableIdentifier tableId) { return catalog.loadTable(tableId); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java index a2ca86d1b5a2..a3ab3c8b50d4 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java @@ -36,6 +36,13 @@ public class TestFixtures { new Schema( required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get())); + public static final Schema NESTED_SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "data", Types.StringType.get()), + optional( + 3, "metadata", Types.StructType.of(optional(4, "source", Types.StringType.get())))); + public static final List> FILE1SNAPSHOT1_DATA = ImmutableList.of( ImmutableMap.of("id", 0L, "data", "clarification"),