From 9c480feafb9c7f557c73a237bca2f678d43529f4 Mon Sep 17 00:00:00 2001 From: Jayakumar Chinnappan Date: Mon, 30 Dec 2024 04:08:53 +0530 Subject: [PATCH] KafkaToBigQueryFlex template: Add support to persist Kafka keys for json format Fixes #2088 --- .../v2/templates/KafkaToBigQueryFlex.java | 1 + .../transforms/StringMessageToTableRow.java | 71 ++++++++++++++++--- .../templates/KafkaToBigQueryFlexJsonIT.java | 34 +++++++-- 3 files changed, 91 insertions(+), 15 deletions(-) diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java index 402699d18b..4f4c55b5a8 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java @@ -523,6 +523,7 @@ public static Pipeline runJsonPipeline( .setFunctionName(options.getJavascriptTextTransformFunctionName()) .setReloadIntervalMinutes( options.getJavascriptTextTransformReloadIntervalMinutes()) + .setPersistKafkaKey(options.getPersistKafkaKey()) .setSuccessTag(TRANSFORM_OUT) .setFailureTag(TRANSFORM_DEADLETTER_OUT) .build()); diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/StringMessageToTableRow.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/StringMessageToTableRow.java index ddb82db666..32dd8811cf 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/StringMessageToTableRow.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/StringMessageToTableRow.java @@ -15,11 +15,13 @@ */ package com.google.cloud.teleport.v2.transforms; +import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.convertJsonToTableRow; + import com.google.api.services.bigquery.model.TableRow; import com.google.auto.value.AutoValue; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; -import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow; import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf; +import com.google.cloud.teleport.v2.utils.BigQueryConstants; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.base.Strings; import javax.annotation.Nullable; @@ -32,10 +34,8 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.*; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; /** * The {@link StringMessageToTableRow} class is a {@link PTransform} which transforms incoming Kafka @@ -60,6 +60,8 @@ public abstract class StringMessageToTableRow public abstract @Nullable String fileSystemPath(); + public abstract @Nullable Boolean persistKafkaKey(); + public abstract @Nullable String functionName(); public abstract @Nullable Integer reloadIntervalMinutes(); @@ -76,6 +78,8 @@ public static Builder newBuilder() { public abstract static class Builder { public abstract Builder setFileSystemPath(@Nullable String fileSystemPath); + public abstract Builder setPersistKafkaKey(@Nullable Boolean persistKafkaKey); + public abstract Builder setFunctionName(@Nullable String functionName); public abstract Builder setReloadIntervalMinutes(@Nullable Integer value); @@ -138,10 +142,7 @@ public PCollectionTuple expand(PCollection> input) { PCollectionTuple tableRowOut = failsafeElements.apply( "JsonToTableRow", - FailsafeJsonToTableRow.>newBuilder() - .setSuccessTag(TABLE_ROW_OUT) - .setFailureTag(TABLE_ROW_DEADLETTER_OUT) - .build()); + new JsonStringToTableRow(persistKafkaKey(), TABLE_ROW_OUT, TABLE_ROW_DEADLETTER_OUT)); PCollection, String>> badRecords = tableRowOut.get(TABLE_ROW_DEADLETTER_OUT).setCoder(FAILSAFE_CODER); @@ -176,4 +177,56 @@ public void processElement(ProcessContext context) { context.output(FailsafeElement.of(message, message.getKV().getValue())); } } + + static class JsonStringToTableRow + extends PTransform< + PCollection, String>>, PCollectionTuple> { + + private final Boolean persistKafkaKey; + + private final TupleTag successTag; + + private final TupleTag, String>> failureTag; + + JsonStringToTableRow( + Boolean persistKafkaKey, + TupleTag successTag, + TupleTag, String>> failureTag) { + this.persistKafkaKey = persistKafkaKey; + this.successTag = successTag; + this.failureTag = failureTag; + } + + @Override + public PCollectionTuple expand( + PCollection, String>> failsafeElements) { + return failsafeElements.apply( + "JsonStringToTableRow", + ParDo.of( + new DoFn, String>, TableRow>() { + @ProcessElement + public void processElement(ProcessContext context) { + FailsafeElement, String> element = + context.element(); + String json = element.getPayload(); + + try { + TableRow row = convertJsonToTableRow(json); + if (persistKafkaKey) { + String key = element.getOriginalPayload().getKV().getKey(); + row.set(BigQueryConstants.KAFKA_KEY_FIELD, key); + } + context.output(row); + } catch (Exception e) { + context.output( + failureTag, + FailsafeElement.of(element) + .setErrorMessage(e.getMessage()) + .setStacktrace(Throwables.getStackTraceAsString(e))); + } + } + }) + .withOutputTags(successTag, TupleTagList.of(failureTag))); + } + } } diff --git a/v2/kafka-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlexJsonIT.java b/v2/kafka-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlexJsonIT.java index 4ff8136c3c..51f612763f 100644 --- a/v2/kafka-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlexJsonIT.java +++ b/v2/kafka-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlexJsonIT.java @@ -90,6 +90,17 @@ public void testKafkaToBigQuery() throws IOException { .addParameter("kafkaReadAuthenticationMode", "NONE")); } + @Test + public void testKafkaToBigQueryWithKey() throws IOException { + baseKafkaToBigQuery( + b -> + b.addParameter("messageFormat", "JSON") + .addParameter("writeMode", "SINGLE_TABLE_NAME") + .addParameter("useBigQueryDLQ", "false") + .addParameter("kafkaReadAuthenticationMode", "NONE") + .addParameter("persistKafkaKey", "true")); + } + @Test public void testKafkaToBigQueryWithUdfFunction() throws RestClientException, IOException { String udfFileName = "input/transform.js"; @@ -175,7 +186,8 @@ public void baseKafkaToBigQuery( Schema bqSchema = Schema.of( Field.of("id", StandardSQLTypeName.INT64), - Field.of("name", StandardSQLTypeName.STRING)); + Field.of("name", StandardSQLTypeName.STRING), + Field.of("_key", StandardSQLTypeName.STRING)); TableId tableId = bigQueryClient.createTable(bqTable, bqSchema); TableId deadletterTableId = TableId.of(tableId.getDataset(), tableId.getTable() + "_dlq"); @@ -226,11 +238,21 @@ public void baseKafkaToBigQuery( TableResult tableRows = bigQueryClient.readTable(bqTable); TableResult dlqRows = bigQueryClient.readTable(deadletterTableId); - assertThatBigQueryRecords(tableRows) - .hasRecordsUnordered( - List.of( - Map.of("id", 11, "name", namePostProcessor.apply("Dataflow")), - Map.of("id", 12, "name", namePostProcessor.apply("Pub/Sub")))); + if (options.getParameter("persistKafkaKey") != null + && options.getParameter("persistKafkaKey").equals("true")) { + assertThatBigQueryRecords(tableRows) + .hasRecordsUnordered( + List.of( + Map.of("id", 11, "name", namePostProcessor.apply("Dataflow"), "_key", "11"), + Map.of("id", 12, "name", namePostProcessor.apply("Pub/Sub"), "_key", "12"))); + } else { + assertThatBigQueryRecords(tableRows) + .hasRecordsUnordered( + List.of( + Map.of("id", 11, "name", namePostProcessor.apply("Dataflow"), "_key", "null"), + Map.of("id", 12, "name", namePostProcessor.apply("Pub/Sub"), "_key", "null"))); + } + assertThatBigQueryRecords(dlqRows).hasRecordsWithStrings(List.of("bad json string")); }