Skip to content

Commit

Permalink
Adding dataflow timestamp field in datastream-common json flow (#2042)
Browse files Browse the repository at this point in the history
* Adding dataflow timestamp field in datastream-common json flow

* updated UT
  • Loading branch information
shreyakhajanchi authored Dec 5, 2024
1 parent 09f68d2 commit ff14f97
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ record = new ObjectMapper().readTree(c.element());
outputObject.put("_metadata_stream", getStreamName(record));
outputObject.put("_metadata_timestamp", getSourceTimestamp(record));
outputObject.put("_metadata_read_timestamp", getMetadataTimestamp(record));
outputObject.put("_metadata_dataflow_timestamp", getCurrentTimestamp());
outputObject.put("_metadata_read_method", record.get("read_method").textValue());
outputObject.put("_metadata_source_type", sourceType);

Expand Down Expand Up @@ -234,4 +235,8 @@ private Boolean getMetadataIsDeleted(JsonNode record) {

return value.booleanValue();
}

private long getCurrentTimestamp() {
return System.currentTimeMillis() / 1000L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package com.google.cloud.teleport.v2.datastream.transforms;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.collect.ImmutableMap;
Expand All @@ -23,6 +27,7 @@
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
Expand Down Expand Up @@ -59,6 +64,9 @@ public void testProcessElement_validJson() {
FailsafeElement<String, String> expectedElement =
FailsafeElement.of(EXAMPLE_DATASTREAM_RECORD, EXAMPLE_DATASTREAM_RECORD);

FailsafeElementCoder<String, String> failsafeElementCoder =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

PCollection<FailsafeElement<String, String>> pCollection =
pipeline
.apply("CreateInput", Create.of(EXAMPLE_DATASTREAM_JSON))
Expand All @@ -70,7 +78,9 @@ public void testProcessElement_validJson() {
.withStreamName("my-stream")
.withRenameColumnValues(renameColumns)
.withLowercaseSourceColumns(false)))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
.setCoder(failsafeElementCoder)
.apply("RemoveTimestampProperty", ParDo.of(new RemoveTimestampPropertyFn()))
.setCoder(failsafeElementCoder);

PAssert.that(pCollection).containsInAnyOrder(expectedElement);

Expand All @@ -85,6 +95,9 @@ public void testProcessElement_hashRowId() {
FailsafeElement.of(
EXAMPLE_DATASTREAM_RECORD_WITH_HASH_ROWID, EXAMPLE_DATASTREAM_RECORD_WITH_HASH_ROWID);

FailsafeElementCoder<String, String> failsafeElementCoder =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

PCollection<FailsafeElement<String, String>> pCollection =
pipeline
.apply("CreateInput", Create.of(EXAMPLE_DATASTREAM_JSON))
Expand All @@ -97,10 +110,30 @@ public void testProcessElement_hashRowId() {
.withRenameColumnValues(renameColumns)
.withHashRowId(true)
.withLowercaseSourceColumns(false)))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
.setCoder(failsafeElementCoder)
.apply("RemoveDataflowTimestampProperty", ParDo.of(new RemoveTimestampPropertyFn()))
.setCoder(failsafeElementCoder);

PAssert.that(pCollection).containsInAnyOrder(expectedElement);

pipeline.run();
}

// Static nested DoFn class to remove timestamp property
static class RemoveTimestampPropertyFn
extends DoFn<FailsafeElement<String, String>, FailsafeElement<String, String>> {

@ProcessElement
public void processElement(
@Element FailsafeElement<String, String> element,
OutputReceiver<FailsafeElement<String, String>> out)
throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
JsonNode changeEvent = mapper.readTree(element.getPayload());
if (changeEvent instanceof ObjectNode) {
((ObjectNode) changeEvent).remove("_metadata_dataflow_timestamp");
}
out.output(FailsafeElement.of(changeEvent.toString(), changeEvent.toString()));
}
}
}

0 comments on commit ff14f97

Please sign in to comment.