Skip to content

Commit

Permalink
[Iceberg] Add BQMS test that validates using a BQ query (#33625)
Browse files Browse the repository at this point in the history
* add test

* spotless

* another query test
  • Loading branch information
ahmedabu98 authored Jan 24, 2025
1 parent b1d6b19 commit 4d627b4
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3
"modification": 1
}
2 changes: 2 additions & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ dependencies {

// BigQueryMetastore catalog dep
testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation library.java.google_api_services_bigquery

testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,49 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.RowFilter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryMetastoreCatalogIT");
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static final String DATASET = "managed_iceberg_bqms_tests_no_delete";
static final String DATASET = "managed_iceberg_bqms_tests_" + System.nanoTime();;
static final long SALT = System.nanoTime();

@BeforeClass
public static void createDataset() throws IOException, InterruptedException {
BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET);
}

@AfterClass
public static void deleteDataset() {
BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET);
}

@Override
public String tableId() {
return DATASET + "." + testName.getMethodName() + "_" + SALT;
Expand All @@ -41,7 +71,7 @@ public Catalog createCatalog() {
BQMS_CATALOG,
"bqms_" + catalogName,
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_project", OPTIONS.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.build(),
Expand All @@ -65,12 +95,53 @@ public Map<String, Object> managedIcebergConfig(String tableId) {
.put(
"catalog_properties",
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_project", OPTIONS.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.put("catalog-impl", BQMS_CATALOG)
.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
.build())
.build();
}

@Test
public void testWriteToPartitionedAndValidateWithBQQuery()
throws IOException, InterruptedException {
// For an example row where bool=true, modulo_5=3, str=value_303,
// this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/
PartitionSpec partitionSpec =
PartitionSpec.builderFor(ICEBERG_SCHEMA)
.identity("bool")
.hour("datetime")
.truncate("str", "value_x".length())
.build();
catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec);

// Write with Beam
Map<String, Object> config = managedIcebergConfig(tableId());
PCollection<Row> input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

// Fetch records using a BigQuery query and validate
BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName());
String query = String.format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableId());
List<TableRow> rows = bqClient.queryUnflattened(query, OPTIONS.getProject(), true, true);
List<Row> beamRows =
rows.stream()
.map(tr -> BigQueryUtils.toBeamRow(BEAM_SCHEMA, tr))
.collect(Collectors.toList());

assertThat(beamRows, containsInAnyOrder(inputRows.toArray()));

String queryByPartition =
String.format("SELECT bool, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId());
rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true);
RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool", "datetime"));
beamRows =
rows.stream()
.map(tr -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tr))
.collect(Collectors.toList());
assertThat(beamRows, containsInAnyOrder(inputRows.stream().map(rowFilter::filter).toArray()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Integer numRecords() {
@Override
public Catalog createCatalog() {
Configuration catalogHadoopConf = new Configuration();
catalogHadoopConf.set("fs.gs.project.id", options.getProject());
catalogHadoopConf.set("fs.gs.project.id", OPTIONS.getProject());
catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT");

HadoopCatalog catalog = new HadoopCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ public static String warehouse(Class<? extends IcebergCatalogBaseIT> testClass)

@Before
public void setUp() throws Exception {
options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
warehouse =
String.format(
"%s/%s/%s",
TestPipeline.testingPipelineOptions().getTempLocation(),
getClass().getSimpleName(),
RANDOM);
warehouse = warehouse(getClass());
catalogSetup();
catalog = createCatalog();
Expand All @@ -162,7 +167,7 @@ public void cleanUp() throws Exception {
}

try {
GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
GcsUtil gcsUtil = OPTIONS.as(GcsOptions.class).getGcsUtil();
GcsPath path = GcsPath.fromUri(warehouse);

@Nullable
Expand Down Expand Up @@ -190,7 +195,8 @@ public void cleanUp() throws Exception {

protected static String warehouse;
public Catalog catalog;
protected static GcpOptions options;
protected static final GcpOptions OPTIONS =
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
private static final String RANDOM = UUID.randomUUID().toString();
@Rule public TestPipeline pipeline = TestPipeline.create();
@Rule public TestName testName = new TestName();
Expand All @@ -210,7 +216,7 @@ public void cleanUp() throws Exception {
.addInt32Field("nested_int")
.addFloatField("nested_float")
.build();
private static final Schema BEAM_SCHEMA =
protected static final Schema BEAM_SCHEMA =
Schema.builder()
.addStringField("str")
.addStringField("char")
Expand Down Expand Up @@ -262,16 +268,16 @@ public Row apply(Long num) {
}
};

private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
private static final SimpleFunction<Row, Record> RECORD_FUNC =
protected static final SimpleFunction<Row, Record> RECORD_FUNC =
new SimpleFunction<Row, Record>() {
@Override
public Record apply(Row input) {
return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
}
};
private final List<Row> inputRows =
protected final List<Row> inputRows =
LongStream.range(0, numRecords()).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());

/** Populates the Iceberg table and Returns a {@link List<Row>} of expected elements. */
Expand Down

0 comments on commit 4d627b4

Please sign in to comment.