From 989c374718c52874893a49dddde02c21f18f8581 Mon Sep 17 00:00:00 2001 From: Xu Huang Date: Tue, 14 Jan 2025 14:16:22 +0800 Subject: [PATCH] [FLINK-37112][runtime] Process event time extension related watermarks in operator for DataStream V2 --- .../impl/operators/ProcessOperator.java | 16 +- .../TwoInputBroadcastProcessOperator.java | 24 +- .../TwoInputNonBroadcastProcessOperator.java | 24 +- .../operators/TwoOutputProcessOperator.java | 16 +- .../io/AbstractStreamTaskNetworkInput.java | 7 + .../eventtime/EventTimeWatermarkCombiner.java | 117 +++++++ .../eventtime/EventTimeWatermarkHandler.java | 220 ++++++++++++ .../util/watermark/WatermarkUtils.java | 26 ++ .../EventTimeWatermarkCombinerTest.java | 276 +++++++++++++++ .../EventTimeWatermarkHandlerTest.java | 317 ++++++++++++++++++ 10 files changed, 1037 insertions(+), 6 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkCombiner.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkCombinerTest.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkHandlerTest.java diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java index cde50f26a744d3..1782e043414097 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; @@ -38,6 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; import java.util.Map; import java.util.function.BiConsumer; @@ -60,6 +62,9 @@ public class ProcessOperator protected transient Map> watermarkDeclarationMap; + // {@link EventTimeWatermarkHandler} will be used to process event time related watermarks + protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + public ProcessOperator(OneInputStreamProcessFunction userFunction) { super(userFunction); } @@ -99,6 +104,8 @@ public void open() throws Exception { outputCollector = getOutputCollector(); nonPartitionedContext = getNonPartitionedContext(); partitionedContext.setNonPartitionedContext(nonPartitionedContext); + this.eventTimeWatermarkHandler = + new EventTimeWatermarkHandler(1, output, timeServiceManager); // Initialize event time extension related ProcessFunction if (userFunction instanceof ExtractEventTimeProcessFunction) { @@ -128,7 +135,14 @@ public void processWatermarkInternal(WatermarkEvent watermark) throws Exception .get(watermark.getWatermark().getIdentifier()) .getDefaultHandlingStrategy() == WatermarkHandlingStrategy.FORWARD) { - output.emitWatermark(watermark); + + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) { + // if the watermark is event time related watermark, process them to advance event + // time + eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0); + } else { + output.emitWatermark(watermark); + } } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java index 8f3e38d14a2a06..926c133f0fe714 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.streaming.api.operators.BoundedMultiInput; @@ -37,6 +38,7 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; import java.util.Map; import java.util.function.BiConsumer; @@ -62,6 +64,9 @@ public class TwoInputBroadcastProcessOperator protected transient Map> watermarkDeclarationMap; + // {@link EventTimeWatermarkHandler} will be used to process event time related watermarks + protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + public TwoInputBroadcastProcessOperator( TwoInputBroadcastStreamProcessFunction userFunction) { super(userFunction); @@ -100,6 +105,9 @@ public void open() throws Exception { getOperatorStateBackend()); this.nonPartitionedContext = getNonPartitionedContext(); this.partitionedContext.setNonPartitionedContext(this.nonPartitionedContext); + this.eventTimeWatermarkHandler = + new EventTimeWatermarkHandler(2, output, timeServiceManager); + this.userFunction.open(this.nonPartitionedContext); } @@ -126,7 +134,13 @@ public void processWatermark1Internal(WatermarkEvent watermark) throws Exception .get(watermark.getWatermark().getIdentifier()) .getDefaultHandlingStrategy() == WatermarkHandlingStrategy.FORWARD) { - output.emitWatermark(watermark); + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) { + // if the watermark is event time related watermark, process them to advance event + // time + eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0); + } else { + output.emitWatermark(watermark); + } } } @@ -140,7 +154,13 @@ public void processWatermark2Internal(WatermarkEvent watermark) throws Exception .get(watermark.getWatermark().getIdentifier()) .getDefaultHandlingStrategy() == WatermarkHandlingStrategy.FORWARD) { - output.emitWatermark(watermark); + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) { + // if the watermark is event time related watermark, process them to advance event + // time + eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 1); + } else { + output.emitWatermark(watermark); + } } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java index fcf6d8698beb15..3d699dbbf2d9b1 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -38,6 +39,7 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; import java.util.Map; import java.util.function.BiConsumer; @@ -63,6 +65,9 @@ public class TwoInputNonBroadcastProcessOperator protected transient Map> watermarkDeclarationMap; + // {@link EventTimeWatermarkHandler} will be used to process event time related watermarks + protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + public TwoInputNonBroadcastProcessOperator( TwoInputNonBroadcastStreamProcessFunction userFunction) { super(userFunction); @@ -103,6 +108,9 @@ public void open() throws Exception { operatorStateBackend); this.nonPartitionedContext = getNonPartitionedContext(); this.partitionedContext.setNonPartitionedContext(this.nonPartitionedContext); + this.eventTimeWatermarkHandler = + new EventTimeWatermarkHandler(2, output, timeServiceManager); + this.userFunction.open(this.nonPartitionedContext); } @@ -129,7 +137,13 @@ public void processWatermark1Internal(WatermarkEvent watermark) throws Exception .get(watermark.getWatermark().getIdentifier()) .getDefaultHandlingStrategy() == WatermarkHandlingStrategy.FORWARD) { - output.emitWatermark(watermark); + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) { + // if the watermark is event time related watermark, process them to advance event + // time + eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0); + } else { + output.emitWatermark(watermark); + } } } @@ -143,7 +157,13 @@ public void processWatermark2Internal(WatermarkEvent watermark) throws Exception .get(watermark.getWatermark().getIdentifier()) .getDefaultHandlingStrategy() == WatermarkHandlingStrategy.FORWARD) { - output.emitWatermark(watermark); + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) { + // if the watermark is event time related watermark, process them to advance event + // time + eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 1); + } else { + output.emitWatermark(watermark); + } } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java index aefbc55a1f6730..91d0c0f074f25b 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java @@ -31,6 +31,7 @@ import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.streaming.api.operators.BoundedOneInput; @@ -39,6 +40,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; import org.apache.flink.util.OutputTag; import java.util.Map; @@ -70,6 +72,9 @@ public class TwoOutputProcessOperator protected transient Map> watermarkDeclarationMap; + // {@link EventTimeWatermarkHandler} will be used to process event time related watermarks + protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + public TwoOutputProcessOperator( TwoOutputStreamProcessFunction userFunction, OutputTag outputTag) { @@ -112,6 +117,9 @@ public void open() throws Exception { operatorStateStore); this.nonPartitionedContext = getNonPartitionedContext(); this.partitionedContext.setNonPartitionedContext(nonPartitionedContext); + this.eventTimeWatermarkHandler = + new EventTimeWatermarkHandler(1, output, timeServiceManager); + this.userFunction.open(this.nonPartitionedContext); } @@ -136,7 +144,13 @@ public void processWatermarkInternal(WatermarkEvent watermark) throws Exception .get(watermark.getWatermark().getIdentifier()) .getDefaultHandlingStrategy() == WatermarkHandlingStrategy.FORWARD) { - output.emitWatermark(watermark); + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) { + // if the watermark is event time related watermark, process them to advance event + // time + eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0); + } else { + output.emitWatermark(watermark); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java index cb6675d81badc2..c076f5bf433950 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner; import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve; +import org.apache.flink.streaming.util.watermark.WatermarkUtils; import org.apache.flink.util.ExceptionUtils; import java.io.IOException; @@ -118,8 +119,14 @@ public AbstractStreamTaskNetworkInput( this.recordAttributesCombiner = new RecordAttributesCombiner(checkpointedInputGate.getNumberOfInputChannels()); + WatermarkUtils.addEventTimeWatermarkCombinerIfNeeded( + watermarkDeclarationSet, watermarkCombiners, flattenedChannelIndices.size()); for (AbstractInternalWatermarkDeclaration watermarkDeclaration : watermarkDeclarationSet) { + if (watermarkCombiners.containsKey(watermarkDeclaration.getIdentifier())) { + continue; + } + watermarkCombiners.put( watermarkDeclaration.getIdentifier(), watermarkDeclaration.createWatermarkCombiner( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkCombiner.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkCombiner.java new file mode 100644 index 00000000000000..017317072932a4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkCombiner.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.watermark.extension.eventtime; + +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner; +import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import java.util.function.Consumer; + +/** + * A {@link WatermarkCombiner} used to combine {@link EventTimeExtension} related watermarks in + * input channels. + */ +public class EventTimeWatermarkCombiner extends StatusWatermarkValve implements WatermarkCombiner { + + private WrappedDataOutput output; + + public EventTimeWatermarkCombiner(int numInputChannels) { + super(numInputChannels); + this.output = new WrappedDataOutput<>(); + } + + @Override + public void combineWatermark( + Watermark watermark, int channelIndex, Consumer watermarkEmitter) + throws Exception { + output.setWatermarkEmitter(watermarkEmitter); + + if (EventTimeExtension.isEventTimeWatermark(watermark)) { + inputWatermark( + new org.apache.flink.streaming.api.watermark.Watermark( + ((LongWatermark) watermark).getValue()), + channelIndex, + output); + } else if (EventTimeExtension.isIdleStatusWatermark(watermark.getIdentifier())) { + inputWatermarkStatus( + new WatermarkStatus( + ((BoolWatermark) watermark).getValue() + ? WatermarkStatus.IDLE_STATUS + : WatermarkStatus.ACTIVE_STATUS), + channelIndex, + output); + } + } + + /** Wrap {@link DataOutput} to emit watermarks using {@code watermarkEmitter}. */ + static class WrappedDataOutput implements DataOutput { + + private Consumer watermarkEmitter; + + public WrappedDataOutput() {} + + public void setWatermarkEmitter(Consumer watermarkEmitter) { + this.watermarkEmitter = watermarkEmitter; + } + + @Override + public void emitRecord(StreamRecord streamRecord) throws Exception { + throw new RuntimeException("Should not emit records with this output."); + } + + @Override + public void emitWatermark(org.apache.flink.streaming.api.watermark.Watermark watermark) + throws Exception { + watermarkEmitter.accept( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark( + watermark.getTimestamp())); + } + + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + watermarkEmitter.accept( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark( + watermarkStatus.isIdle())); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception { + throw new RuntimeException("Should not emit LatencyMarker with this output."); + } + + @Override + public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception { + throw new RuntimeException("Should not emit RecordAttributes with this output."); + } + + @Override + public void emitWatermark(WatermarkEvent watermark) throws Exception { + throw new RuntimeException("Should not emit WatermarkEvent with this output."); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler.java new file mode 100644 index 00000000000000..4a3a4b5c5c05f1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.watermark.extension.eventtime; + +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class is used to handle {@link EventTimeExtension} related watermarks in operator, such as + * {@link EventTimeExtension#EVENT_TIME_WATERMARK_DECLARATION} and {@link + * EventTimeExtension#IDLE_STATUS_WATERMARK_DECLARATION}. It will emit event time watermark and idle + * status to downstream operators according to received watermarks. + */ +public class EventTimeWatermarkHandler { + + /** number of input of operator, it should between 1 and 2 in current design. */ + private final int numOfInput; + + private final Output output; + + private final List eventTimePerInput; + + /** + * time service manager is used to advance event time in operator, and it may be null if the + * operator is not keyed. + */ + @Nullable private final InternalTimeServiceManager timeServiceManager; + + private long lastEmitWatermark = Long.MIN_VALUE; + + private boolean lastEmitIdleStatus = false; + + /** A bitset to record whether the watermark has been received from each input. */ + private final BitSet hasReceiveWatermarks; + + public EventTimeWatermarkHandler( + int numOfInput, + Output output, + @Nullable InternalTimeServiceManager timeServiceManager) { + checkArgument(numOfInput >= 1 && numOfInput <= 2, "numOfInput should between 1 and 2"); + this.numOfInput = numOfInput; + this.output = output; + this.eventTimePerInput = new ArrayList<>(numOfInput); + for (int i = 0; i < numOfInput; i++) { + eventTimePerInput.add(new EventTimeWithIdleStatus()); + } + this.timeServiceManager = timeServiceManager; + this.hasReceiveWatermarks = new BitSet(numOfInput); + } + + private EventTimeUpdateStatus processEventTime(long timestamp, int inputIndex) + throws Exception { + checkState(inputIndex < numOfInput); + hasReceiveWatermarks.set(inputIndex); + eventTimePerInput.get(inputIndex).setEventTime(timestamp); + eventTimePerInput.get(inputIndex).setIdleStatus(false); + + return tryAdvanceEventTimeAndEmitWatermark(); + } + + private EventTimeUpdateStatus tryAdvanceEventTimeAndEmitWatermark() throws Exception { + // if current event time is larger than last emit watermark, emit it + long currentEventTime = getCurrentEventTime(); + if (currentEventTime > lastEmitWatermark + && hasReceiveWatermarks.cardinality() == numOfInput) { + output.emitWatermark( + new WatermarkEvent( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark( + currentEventTime), + false)); + lastEmitWatermark = currentEventTime; + if (timeServiceManager != null) { + timeServiceManager.advanceWatermark(new Watermark(currentEventTime)); + } + return EventTimeUpdateStatus.ofUpdatedWatermark(lastEmitWatermark); + } + return EventTimeUpdateStatus.NO_UPDATE; + } + + private void processEventTimeIdleStatus(boolean isIdle, int inputIndex) { + checkState(inputIndex < numOfInput); + hasReceiveWatermarks.set(inputIndex); + eventTimePerInput.get(inputIndex).setIdleStatus(isIdle); + tryEmitEventTimeIdleStatus(); + } + + private void tryEmitEventTimeIdleStatus() { + // emit idle status if current idle status is different from last emit + boolean inputIdle = isAllInputIdle(); + if (inputIdle != lastEmitIdleStatus) { + output.emitWatermark( + new WatermarkEvent( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark( + inputIdle), + false)); + lastEmitIdleStatus = inputIdle; + } + } + + private long getCurrentEventTime() { + long currentEventTime = Long.MAX_VALUE; + for (EventTimeWithIdleStatus eventTimeWithIdleStatus : eventTimePerInput) { + if (!eventTimeWithIdleStatus.isIdle()) { + currentEventTime = + Math.min(currentEventTime, eventTimeWithIdleStatus.getEventTime()); + } + } + return currentEventTime; + } + + private boolean isAllInputIdle() { + boolean allInputIsIdle = true; + for (EventTimeWithIdleStatus eventTimeWithIdleStatus : eventTimePerInput) { + allInputIsIdle &= eventTimeWithIdleStatus.isIdle(); + } + return allInputIsIdle; + } + + public long getLastEmitWatermark() { + return lastEmitWatermark; + } + + /** + * Process EventTimeWatermark/IdleStatusWatermark. + * + *

It's caller's responsibility to check whether the watermark is + * EventTimeWatermark/IdleStatusWatermark. + * + * @return the status of event time watermark update. + */ + public EventTimeUpdateStatus processWatermark( + org.apache.flink.api.common.watermark.Watermark watermark, int inputIndex) + throws Exception { + if (EventTimeExtension.isEventTimeWatermark(watermark.getIdentifier())) { + long timestamp = ((LongWatermark) watermark).getValue(); + return this.processEventTime(timestamp, inputIndex); + } else if (EventTimeExtension.isIdleStatusWatermark(watermark.getIdentifier())) { + boolean isIdle = ((BoolWatermark) watermark).getValue(); + this.processEventTimeIdleStatus(isIdle, inputIndex); + } + return EventTimeUpdateStatus.NO_UPDATE; + } + + /** This class represents event-time updated status. */ + public static class EventTimeUpdateStatus { + + public static final EventTimeUpdateStatus NO_UPDATE = new EventTimeUpdateStatus(false, -1L); + + private final boolean isEventTimeUpdated; + + private final long newEventTime; + + private EventTimeUpdateStatus(boolean isEventTimeUpdated, long newEventTime) { + this.isEventTimeUpdated = isEventTimeUpdated; + this.newEventTime = newEventTime; + } + + public boolean isEventTimeUpdated() { + return isEventTimeUpdated; + } + + public long getNewEventTime() { + return newEventTime; + } + + public static EventTimeUpdateStatus ofUpdatedWatermark(long newEventTime) { + return new EventTimeUpdateStatus(true, newEventTime); + } + } + + static class EventTimeWithIdleStatus { + private long eventTime = Long.MIN_VALUE; + private boolean isIdle = false; + + public long getEventTime() { + return eventTime; + } + + public void setEventTime(long eventTime) { + this.eventTime = Math.max(this.eventTime, eventTime); + } + + public boolean isIdle() { + return isIdle; + } + + public void setIdleStatus(boolean idle) { + isIdle = idle; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java b/flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java index 38f8602d8ba9f8..8954578ff4f14a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.watermark.Watermark; import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; import org.apache.flink.datastream.api.function.ProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -29,9 +30,12 @@ import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; +import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkCombiner; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -95,4 +99,26 @@ private static Collection getWatermarkDeclaratio .map(AbstractInternalWatermarkDeclaration::from) .collect(Collectors.toSet()); } + + /** Create watermark combiners if there are event time watermark declarations. */ + public static void addEventTimeWatermarkCombinerIfNeeded( + Set> watermarkDeclarationSet, + Map watermarkCombiners, + int numberOfInputChannels) { + if (watermarkDeclarationSet.stream() + .anyMatch( + declaration -> + EventTimeExtension.isEventTimeWatermark( + declaration.getIdentifier()))) { + // create event time watermark combiner + EventTimeWatermarkCombiner eventTimeWatermarkCombiner = + new EventTimeWatermarkCombiner(numberOfInputChannels); + watermarkCombiners.put( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.getIdentifier(), + eventTimeWatermarkCombiner); + watermarkCombiners.put( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.getIdentifier(), + eventTimeWatermarkCombiner); + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkCombinerTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkCombinerTest.java new file mode 100644 index 00000000000000..32f634dd8a3a52 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkCombinerTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream.extension.eventtime; + +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkCombiner; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link EventTimeWatermarkCombiner}. */ +class EventTimeWatermarkCombinerTest { + + private final List outputWatermarks = new ArrayList<>(); + private EventTimeWatermarkCombiner combiner; + + @BeforeEach + void before() { + combiner = new EventTimeWatermarkCombiner(2); + } + + @AfterEach + void after() { + outputWatermarks.clear(); + combiner = null; + } + + @Test + void testCombinedResultIsMin() throws Exception { + // The test scenario is as follows: + // ----------------------------------------------------------------------------- + // test scenario | expected result + // ----------------------------------------------------------------------------- + // Step | Channel 0 | Channel 1 | output event time | output idle status + // ----------------------------------------------------------------------------- + // 1 | 1 | 2 | [1] | [] + // 2 | 3 | | [1, 2] | [] + // ----------------------------------------------------------------------------- + // e.g. The step 1 means that Channel 0 will receive the event time watermark with value 1, + // and the Channel 1 will receive the event time watermark with value 2. + // After step 1 has been executed, the combiner should output an event time watermark with + // value 1. And Step2 means that Channel 0 will receive the event time watermark with value + // 3, After step 1 has been executed, the combiner should output an event time watermark + // with value 2 again. + + // Step 1 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1), + 0, + outputWatermarks::add); + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2), + 1, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L); + + // Step 2 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(3), + 0, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L, 2L); + + checkOutputIdleStatusWatermarkValues(); + } + + @Test + void testCombineWhenPartialChannelsIdle() throws Exception { + // The test scenario is as follows: + // ----------------------------------------------------------------------------- + // test scenario | expected result + // ----------------------------------------------------------------------------- + // Step | Channel 0 | Channel 1 | output event time | output idle status + // ----------------------------------------------------------------------------- + // 1 | 1 | | [] | [] + // 2 | | true | [1] | [] + // 3 | 2 | | [1,2] | [] + // 4 | | false | [1,2] | [] + // 5 | | 3 | [1,2] | [] + // 6 | 4 | | [1,2,3] | [] + // ----------------------------------------------------------------------------- + // e.g. The step 1 means that Channel 0 will receive the event time watermark with value 1. + // After step 1 has been executed, the combiner should not output any event time watermark + // as the combiner has not received event time watermark from all input channels. + // The step 2 means that Channel 1 will receive the idle status watermark with value true. + // After step 2 has been executed, the combiner should output an event time watermark with + // value 1. + + // Step 1 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1), + 0, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(); + + // Step 2 + combiner.combineWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), + 1, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L); + + // Step 3 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2), + 0, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L, 2L); + + // Step 4 + combiner.combineWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false), + 1, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L, 2L); + + // Step 5 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(3), + 1, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L, 2L); + + // Step 6 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(4), + 0, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L, 2L, 3L); + + checkOutputIdleStatusWatermarkValues(); + } + + @Test + void testCombineWhenAllChannelsIdle() throws Exception { + // The test scenario is as follows: + // ----------------------------------------------------------------------------- + // test scenario | expected result + // ----------------------------------------------------------------------------- + // Step | Channel 0 | Channel 1 | output event time | output idle status + // ----------------------------------------------------------------------------- + // 1 | 1 | 2 | [1] | [] + // 2 | true | | [1,2] | [] + // 3 | | true | [1,2] | [true] + // 4 | false | | [1,2] | [true, false] + // 5 | 3 | | [1,2,3] | [true, false] + // 6 | | false | [1,2,3] | [true, false] + // ----------------------------------------------------------------------------- + + // Step 1 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1), + 0, + outputWatermarks::add); + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2), + 1, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L); + + // Step 2 + combiner.combineWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), + 0, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L, 2L); + checkOutputIdleStatusWatermarkValues(); + + // Step 3 + combiner.combineWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), + 1, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L, 2L); + checkOutputIdleStatusWatermarkValues(true); + + // Step 4 + combiner.combineWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false), + 0, + outputWatermarks::add); + checkOutputIdleStatusWatermarkValues(true, false); + + // Step 5 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(3), + 0, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(1L, 2L, 3L); + checkOutputIdleStatusWatermarkValues(true, false); + + // Step 6 + combiner.combineWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false), + 1, + outputWatermarks::add); + checkOutputIdleStatusWatermarkValues(true, false); + } + + @Test + void testCombineWaitForAllChannels() throws Exception { + // The test scenario is as follows: + // ----------------------------------------------------------------------------- + // test scenario | expected result + // ----------------------------------------------------------------------------- + // Step | Channel 0 | Channel 1 | output event time | output idle status + // ----------------------------------------------------------------------------- + // 1 | 1 | | [] | [] + // 2 | 3 | | [] | [] + // 3 | | 2 | [2] | [] + // ----------------------------------------------------------------------------- + + // Step 1 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1), + 0, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(); + + // Step 2 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(3), + 0, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(); + + // Step 3 + combiner.combineWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2), + 1, + outputWatermarks::add); + checkOutputEventTimeWatermarkValues(2L); + + checkOutputIdleStatusWatermarkValues(); + } + + private void checkOutputEventTimeWatermarkValues(Long... expectedReceivedWatermarkValues) { + assertThat( + outputWatermarks.stream() + .filter(w -> w instanceof LongWatermark) + .map(w -> ((LongWatermark) w).getValue())) + .containsExactly(expectedReceivedWatermarkValues); + } + + private void checkOutputIdleStatusWatermarkValues(Boolean... expectedReceivedWatermarkValues) { + assertThat( + outputWatermarks.stream() + .filter(w -> w instanceof BoolWatermark) + .map(w -> ((BoolWatermark) w).getValue())) + .containsExactly(expectedReceivedWatermarkValues); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkHandlerTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkHandlerTest.java new file mode 100644 index 00000000000000..2ab186cbd2eab9 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkHandlerTest.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream.extension.eventtime; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.OutputTag; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link EventTimeWatermarkHandler}. */ +class EventTimeWatermarkHandlerTest { + + private static final List outputWatermarks = new ArrayList<>(); + private static final List advancedEventTimes = new ArrayList<>(); + + @AfterEach + void after() { + outputWatermarks.clear(); + advancedEventTimes.clear(); + } + + @Test + void testOneInputWatermarkHandler() throws Exception { + // The test scenario is as follows: + // ----------------------------------------------------------------------------- + // test scenario| expected result + // ----------------------------------------------------------------------------- + // Step|Input 0|updateStatus|eventTimes| idleStatus |advancedEventTimes + // ----------------------------------------------------------------------------- + // 1 | 1 | true,1 | [1] | [] | [1] + // 2 | 2 | true,2 | [1,2] | [] | [1,2] + // 3 | 1 | false,-1 | [1,2] | [] | [1,2] + // 4 | true | false,-1 | [1,2] | [true] | [1,2] + // 5 | false | false,-1 | [1,2] |[true,false]| [1,2] + // ----------------------------------------------------------------------------- + // For example, Step 1 indicates that Input 0 will receive an event time watermark with a + // value of 1. + // After Step 1 is executed, the `updateStatus.isEventTimeUpdated` returned by the handler + // should be true, + // and `updateStatus.getNewEventTime` should be equal to 1. + // Additionally, the handler should output an event time watermark with a value of 1 and + // advance the current event time to 2. + + EventTimeWatermarkHandler watermarkHandler = + new EventTimeWatermarkHandler( + 1, new TestOutput(), new TestInternalTimeServiceManager()); + EventTimeWatermarkHandler.EventTimeUpdateStatus updateStatus; + + // Step 1 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1L), 0); + assertThat(updateStatus.isEventTimeUpdated()).isTrue(); + assertThat(updateStatus.getNewEventTime()).isEqualTo(1L); + checkOutputEventTimeWatermarkValues(1L); + checkOutputIdleStatusWatermarkValues(); + checkAdvancedEventTimes(1L); + + // Step 2 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2L), 0); + assertThat(updateStatus.isEventTimeUpdated()).isTrue(); + assertThat(updateStatus.getNewEventTime()).isEqualTo(2L); + checkOutputEventTimeWatermarkValues(1L, 2L); + checkOutputIdleStatusWatermarkValues(); + checkAdvancedEventTimes(1L, 2L); + + // Step 3 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1L), 0); + assertThat(updateStatus.isEventTimeUpdated()).isFalse(); + assertThat(updateStatus.getNewEventTime()).isEqualTo(-1L); + checkOutputEventTimeWatermarkValues(1L, 2L); + checkOutputIdleStatusWatermarkValues(); + checkAdvancedEventTimes(1L, 2L); + + // Step 4 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), 0); + assertThat(updateStatus.isEventTimeUpdated()).isFalse(); + checkOutputEventTimeWatermarkValues(1L, 2L); + checkOutputIdleStatusWatermarkValues(true); + checkAdvancedEventTimes(1L, 2L); + + // Step 5 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false), + 0); + assertThat(updateStatus.isEventTimeUpdated()).isFalse(); + checkOutputEventTimeWatermarkValues(1L, 2L); + checkOutputIdleStatusWatermarkValues(true, false); + checkAdvancedEventTimes(1L, 2L); + } + + @Test + void testTwoInputWatermarkHandler() throws Exception { + // The test scenario is as follows: + // --------------------------------------------------------------------------------- + // test scenario | expected result + // --------------------------------------------------------------------------------- + // Step| Input 0 | Input 1 |updateStatus|eventTimes| idleStatus |advancedEventTimes + // --------------------------------------------------------------------------------- + // 1 | 1 | | false,-1 | [] | [] | [] + // 2 | | 2 | true,1 | [1] | [] | [1] + // 3 | true | | false,-1 | [1] | [] | [1] + // 4 | | true | false,-1 | [1] | [true] | [1] + // 5 | | false | false,-1 | [1] |[true,false]| [1] + // ----------------------------------------------------------------------------- + // For example, Step 1 indicates that Input 0 will receive an event time watermark with a + // value of 1. + // After Step 1 is executed, the `updateStatus.isEventTimeUpdated` returned by the handler + // should be false, + // and `updateStatus.getNewEventTime` should be equal to -1. + // Additionally, the handler should not output any event time watermark and idle status + // watermark. + + EventTimeWatermarkHandler watermarkHandler = + new EventTimeWatermarkHandler( + 2, new TestOutput(), new TestInternalTimeServiceManager()); + EventTimeWatermarkHandler.EventTimeUpdateStatus updateStatus; + + // Step 1 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1L), 0); + assertThat(updateStatus.isEventTimeUpdated()).isFalse(); + checkOutputEventTimeWatermarkValues(); + checkOutputIdleStatusWatermarkValues(); + checkAdvancedEventTimes(); + + // Step 2 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2L), 1); + assertThat(updateStatus.isEventTimeUpdated()).isTrue(); + assertThat(updateStatus.getNewEventTime()).isEqualTo(1L); + checkOutputEventTimeWatermarkValues(1L); + checkOutputIdleStatusWatermarkValues(); + checkAdvancedEventTimes(1L); + + // Step 3 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), 0); + assertThat(updateStatus.isEventTimeUpdated()).isFalse(); + assertThat(updateStatus.getNewEventTime()).isEqualTo(-1L); + checkOutputEventTimeWatermarkValues(1L); + checkOutputIdleStatusWatermarkValues(); + checkAdvancedEventTimes(1L); + + // Step 4 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), 1); + assertThat(updateStatus.isEventTimeUpdated()).isFalse(); + assertThat(updateStatus.getNewEventTime()).isEqualTo(-1L); + checkOutputEventTimeWatermarkValues(1L); + checkOutputIdleStatusWatermarkValues(true); + checkAdvancedEventTimes(1L); + + // Step 5 + updateStatus = + watermarkHandler.processWatermark( + EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false), + 1); + assertThat(updateStatus.isEventTimeUpdated()).isFalse(); + assertThat(updateStatus.getNewEventTime()).isEqualTo(-1L); + checkOutputEventTimeWatermarkValues(1L); + checkOutputIdleStatusWatermarkValues(true, false); + checkAdvancedEventTimes(1L); + } + + private static class TestOutput implements Output { + @Override + public void emitWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) { + throw new UnsupportedOperationException(); + } + + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) { + throw new UnsupportedOperationException(); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { + throw new UnsupportedOperationException(); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + throw new UnsupportedOperationException(); + } + + @Override + public void emitRecordAttributes(RecordAttributes recordAttributes) { + throw new UnsupportedOperationException(); + } + + @Override + public void emitWatermark(WatermarkEvent watermark) { + outputWatermarks.add(watermark.getWatermark()); + } + + @Override + public void collect(Long record) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() {} + } + + private static class TestInternalTimeServiceManager + implements InternalTimeServiceManager { + + @Override + public InternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + throw new UnsupportedOperationException(); + } + + @Override + public InternalTimerService getAsyncInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable, + AsyncExecutionController asyncExecutionController) { + throw new UnsupportedOperationException(); + } + + @Override + public void advanceWatermark(org.apache.flink.streaming.api.watermark.Watermark watermark) + throws Exception { + advancedEventTimes.add(watermark.getTimestamp()); + } + + @Override + public boolean tryAdvanceWatermark( + org.apache.flink.streaming.api.watermark.Watermark watermark, + ShouldStopAdvancingFn shouldStopAdvancingFn) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void snapshotToRawKeyedState( + KeyedStateCheckpointOutputStream stateCheckpointOutputStream, String operatorName) + throws Exception { + throw new UnsupportedOperationException(); + } + } + + private void checkOutputEventTimeWatermarkValues(Long... expectedReceivedWatermarkValues) { + assertThat( + outputWatermarks.stream() + .filter(w -> w instanceof LongWatermark) + .map(w -> ((LongWatermark) w).getValue())) + .containsExactly(expectedReceivedWatermarkValues); + } + + private void checkOutputIdleStatusWatermarkValues(Boolean... expectedReceivedWatermarkValues) { + assertThat( + outputWatermarks.stream() + .filter(w -> w instanceof BoolWatermark) + .map(w -> ((BoolWatermark) w).getValue())) + .containsExactly(expectedReceivedWatermarkValues); + } + + private void checkAdvancedEventTimes(Long... expectedAdvancedEventTimes) { + assertThat(advancedEventTimes).containsExactly(expectedAdvancedEventTimes); + } +}