From ff14f97efbc47f2a13a9bd91f76e04b43967a1e9 Mon Sep 17 00:00:00 2001 From: shreyakhajanchi <92910380+shreyakhajanchi@users.noreply.github.com> Date: Thu, 5 Dec 2024 17:33:06 +0530 Subject: [PATCH] Adding dataflow timestamp field in datastream-common json flow (#2042) * Adding dataflow timestamp field in datastream-common json flow * updated UT --- .../FormatDatastreamJsonToJson.java | 5 +++ .../FormatDatastreamJsonToJsonTest.java | 37 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJson.java b/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJson.java index bd15edcba4..15de08e0bf 100644 --- a/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJson.java +++ b/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJson.java @@ -76,6 +76,7 @@ record = new ObjectMapper().readTree(c.element()); outputObject.put("_metadata_stream", getStreamName(record)); outputObject.put("_metadata_timestamp", getSourceTimestamp(record)); outputObject.put("_metadata_read_timestamp", getMetadataTimestamp(record)); + outputObject.put("_metadata_dataflow_timestamp", getCurrentTimestamp()); outputObject.put("_metadata_read_method", record.get("read_method").textValue()); outputObject.put("_metadata_source_type", sourceType); @@ -234,4 +235,8 @@ private Boolean getMetadataIsDeleted(JsonNode record) { return value.booleanValue(); } + + private long getCurrentTimestamp() { + return System.currentTimeMillis() / 1000L; + } } diff --git a/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java b/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java index 3794820df9..5faaeb55ca 100644 --- a/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java +++ b/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java @@ -15,6 +15,10 @@ */ package com.google.cloud.teleport.v2.datastream.transforms; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.collect.ImmutableMap; @@ -23,6 +27,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; @@ -59,6 +64,9 @@ public void testProcessElement_validJson() { FailsafeElement expectedElement = FailsafeElement.of(EXAMPLE_DATASTREAM_RECORD, EXAMPLE_DATASTREAM_RECORD); + FailsafeElementCoder failsafeElementCoder = + FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + PCollection> pCollection = pipeline .apply("CreateInput", Create.of(EXAMPLE_DATASTREAM_JSON)) @@ -70,7 +78,9 @@ public void testProcessElement_validJson() { .withStreamName("my-stream") .withRenameColumnValues(renameColumns) .withLowercaseSourceColumns(false))) - .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + .setCoder(failsafeElementCoder) + .apply("RemoveTimestampProperty", ParDo.of(new RemoveTimestampPropertyFn())) + .setCoder(failsafeElementCoder); PAssert.that(pCollection).containsInAnyOrder(expectedElement); @@ -85,6 +95,9 @@ public void testProcessElement_hashRowId() { FailsafeElement.of( EXAMPLE_DATASTREAM_RECORD_WITH_HASH_ROWID, EXAMPLE_DATASTREAM_RECORD_WITH_HASH_ROWID); + FailsafeElementCoder failsafeElementCoder = + FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + PCollection> pCollection = pipeline .apply("CreateInput", Create.of(EXAMPLE_DATASTREAM_JSON)) @@ -97,10 +110,30 @@ public void testProcessElement_hashRowId() { .withRenameColumnValues(renameColumns) .withHashRowId(true) .withLowercaseSourceColumns(false))) - .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + .setCoder(failsafeElementCoder) + .apply("RemoveDataflowTimestampProperty", ParDo.of(new RemoveTimestampPropertyFn())) + .setCoder(failsafeElementCoder); PAssert.that(pCollection).containsInAnyOrder(expectedElement); pipeline.run(); } + + // Static nested DoFn class to remove timestamp property + static class RemoveTimestampPropertyFn + extends DoFn, FailsafeElement> { + + @ProcessElement + public void processElement( + @Element FailsafeElement element, + OutputReceiver> out) + throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode changeEvent = mapper.readTree(element.getPayload()); + if (changeEvent instanceof ObjectNode) { + ((ObjectNode) changeEvent).remove("_metadata_dataflow_timestamp"); + } + out.output(FailsafeElement.of(changeEvent.toString(), changeEvent.toString())); + } + } }