Skip to content

Commit

Permalink
[FLINK-37112][runtime] Process event time extension related watermark…
Browse files Browse the repository at this point in the history
…s in operator for DataStream V2
  • Loading branch information
codenohup committed Jan 16, 2025
1 parent b62280c commit 989c374
Show file tree
Hide file tree
Showing 10 changed files with 1,037 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.event.WatermarkEvent;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;

import java.util.Map;
import java.util.function.BiConsumer;
Expand All @@ -60,6 +62,9 @@ public class ProcessOperator<IN, OUT>
protected transient Map<String, AbstractInternalWatermarkDeclaration<?>>
watermarkDeclarationMap;

// {@link EventTimeWatermarkHandler} will be used to process event time related watermarks
protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;

public ProcessOperator(OneInputStreamProcessFunction<IN, OUT> userFunction) {
super(userFunction);
}
Expand Down Expand Up @@ -99,6 +104,8 @@ public void open() throws Exception {
outputCollector = getOutputCollector();
nonPartitionedContext = getNonPartitionedContext();
partitionedContext.setNonPartitionedContext(nonPartitionedContext);
this.eventTimeWatermarkHandler =
new EventTimeWatermarkHandler(1, output, timeServiceManager);

// Initialize event time extension related ProcessFunction
if (userFunction instanceof ExtractEventTimeProcessFunction) {
Expand Down Expand Up @@ -128,7 +135,14 @@ public void processWatermarkInternal(WatermarkEvent watermark) throws Exception
.get(watermark.getWatermark().getIdentifier())
.getDefaultHandlingStrategy()
== WatermarkHandlingStrategy.FORWARD) {
output.emitWatermark(watermark);

if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) {
// if the watermark is event time related watermark, process them to advance event
// time
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
} else {
output.emitWatermark(watermark);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;

import java.util.Map;
import java.util.function.BiConsumer;
Expand All @@ -62,6 +64,9 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
protected transient Map<String, AbstractInternalWatermarkDeclaration<?>>
watermarkDeclarationMap;

// {@link EventTimeWatermarkHandler} will be used to process event time related watermarks
protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;

public TwoInputBroadcastProcessOperator(
TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> userFunction) {
super(userFunction);
Expand Down Expand Up @@ -100,6 +105,9 @@ public void open() throws Exception {
getOperatorStateBackend());
this.nonPartitionedContext = getNonPartitionedContext();
this.partitionedContext.setNonPartitionedContext(this.nonPartitionedContext);
this.eventTimeWatermarkHandler =
new EventTimeWatermarkHandler(2, output, timeServiceManager);

this.userFunction.open(this.nonPartitionedContext);
}

Expand All @@ -126,7 +134,13 @@ public void processWatermark1Internal(WatermarkEvent watermark) throws Exception
.get(watermark.getWatermark().getIdentifier())
.getDefaultHandlingStrategy()
== WatermarkHandlingStrategy.FORWARD) {
output.emitWatermark(watermark);
if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) {
// if the watermark is event time related watermark, process them to advance event
// time
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
} else {
output.emitWatermark(watermark);
}
}
}

Expand All @@ -140,7 +154,13 @@ public void processWatermark2Internal(WatermarkEvent watermark) throws Exception
.get(watermark.getWatermark().getIdentifier())
.getDefaultHandlingStrategy()
== WatermarkHandlingStrategy.FORWARD) {
output.emitWatermark(watermark);
if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) {
// if the watermark is event time related watermark, process them to advance event
// time
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 1);
} else {
output.emitWatermark(watermark);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.state.OperatorStateBackend;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;

import java.util.Map;
import java.util.function.BiConsumer;
Expand All @@ -63,6 +65,9 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, OUT>
protected transient Map<String, AbstractInternalWatermarkDeclaration<?>>
watermarkDeclarationMap;

// {@link EventTimeWatermarkHandler} will be used to process event time related watermarks
protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;

public TwoInputNonBroadcastProcessOperator(
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> userFunction) {
super(userFunction);
Expand Down Expand Up @@ -103,6 +108,9 @@ public void open() throws Exception {
operatorStateBackend);
this.nonPartitionedContext = getNonPartitionedContext();
this.partitionedContext.setNonPartitionedContext(this.nonPartitionedContext);
this.eventTimeWatermarkHandler =
new EventTimeWatermarkHandler(2, output, timeServiceManager);

this.userFunction.open(this.nonPartitionedContext);
}

Expand All @@ -129,7 +137,13 @@ public void processWatermark1Internal(WatermarkEvent watermark) throws Exception
.get(watermark.getWatermark().getIdentifier())
.getDefaultHandlingStrategy()
== WatermarkHandlingStrategy.FORWARD) {
output.emitWatermark(watermark);
if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) {
// if the watermark is event time related watermark, process them to advance event
// time
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
} else {
output.emitWatermark(watermark);
}
}
}

Expand All @@ -143,7 +157,13 @@ public void processWatermark2Internal(WatermarkEvent watermark) throws Exception
.get(watermark.getWatermark().getIdentifier())
.getDefaultHandlingStrategy()
== WatermarkHandlingStrategy.FORWARD) {
output.emitWatermark(watermark);
if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) {
// if the watermark is event time related watermark, process them to advance event
// time
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 1);
} else {
output.emitWatermark(watermark);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedContext;
import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
Expand All @@ -39,6 +40,7 @@
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
import org.apache.flink.util.OutputTag;

import java.util.Map;
Expand Down Expand Up @@ -70,6 +72,9 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE>
protected transient Map<String, AbstractInternalWatermarkDeclaration<?>>
watermarkDeclarationMap;

// {@link EventTimeWatermarkHandler} will be used to process event time related watermarks
protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;

public TwoOutputProcessOperator(
TwoOutputStreamProcessFunction<IN, OUT_MAIN, OUT_SIDE> userFunction,
OutputTag<OUT_SIDE> outputTag) {
Expand Down Expand Up @@ -112,6 +117,9 @@ public void open() throws Exception {
operatorStateStore);
this.nonPartitionedContext = getNonPartitionedContext();
this.partitionedContext.setNonPartitionedContext(nonPartitionedContext);
this.eventTimeWatermarkHandler =
new EventTimeWatermarkHandler(1, output, timeServiceManager);

this.userFunction.open(this.nonPartitionedContext);
}

Expand All @@ -136,7 +144,13 @@ public void processWatermarkInternal(WatermarkEvent watermark) throws Exception
.get(watermark.getWatermark().getIdentifier())
.getDefaultHandlingStrategy()
== WatermarkHandlingStrategy.FORWARD) {
output.emitWatermark(watermark);
if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark())) {
// if the watermark is event time related watermark, process them to advance event
// time
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
} else {
output.emitWatermark(watermark);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.util.watermark.WatermarkUtils;
import org.apache.flink.util.ExceptionUtils;

import java.io.IOException;
Expand Down Expand Up @@ -118,8 +119,14 @@ public AbstractStreamTaskNetworkInput(
this.recordAttributesCombiner =
new RecordAttributesCombiner(checkpointedInputGate.getNumberOfInputChannels());

WatermarkUtils.addEventTimeWatermarkCombinerIfNeeded(
watermarkDeclarationSet, watermarkCombiners, flattenedChannelIndices.size());
for (AbstractInternalWatermarkDeclaration<?> watermarkDeclaration :
watermarkDeclarationSet) {
if (watermarkCombiners.containsKey(watermarkDeclaration.getIdentifier())) {
continue;
}

watermarkCombiners.put(
watermarkDeclaration.getIdentifier(),
watermarkDeclaration.createWatermarkCombiner(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.watermark.extension.eventtime;

import org.apache.flink.api.common.watermark.BoolWatermark;
import org.apache.flink.api.common.watermark.LongWatermark;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.util.function.Consumer;

/**
* A {@link WatermarkCombiner} used to combine {@link EventTimeExtension} related watermarks in
* input channels.
*/
public class EventTimeWatermarkCombiner extends StatusWatermarkValve implements WatermarkCombiner {

private WrappedDataOutput<?> output;

public EventTimeWatermarkCombiner(int numInputChannels) {
super(numInputChannels);
this.output = new WrappedDataOutput<>();
}

@Override
public void combineWatermark(
Watermark watermark, int channelIndex, Consumer<Watermark> watermarkEmitter)
throws Exception {
output.setWatermarkEmitter(watermarkEmitter);

if (EventTimeExtension.isEventTimeWatermark(watermark)) {
inputWatermark(
new org.apache.flink.streaming.api.watermark.Watermark(
((LongWatermark) watermark).getValue()),
channelIndex,
output);
} else if (EventTimeExtension.isIdleStatusWatermark(watermark.getIdentifier())) {
inputWatermarkStatus(
new WatermarkStatus(
((BoolWatermark) watermark).getValue()
? WatermarkStatus.IDLE_STATUS
: WatermarkStatus.ACTIVE_STATUS),
channelIndex,
output);
}
}

/** Wrap {@link DataOutput} to emit watermarks using {@code watermarkEmitter}. */
static class WrappedDataOutput<T> implements DataOutput<T> {

private Consumer<Watermark> watermarkEmitter;

public WrappedDataOutput() {}

public void setWatermarkEmitter(Consumer<Watermark> watermarkEmitter) {
this.watermarkEmitter = watermarkEmitter;
}

@Override
public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
throw new RuntimeException("Should not emit records with this output.");
}

@Override
public void emitWatermark(org.apache.flink.streaming.api.watermark.Watermark watermark)
throws Exception {
watermarkEmitter.accept(
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(
watermark.getTimestamp()));
}

@Override
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
watermarkEmitter.accept(
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(
watermarkStatus.isIdle()));
}

@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
throw new RuntimeException("Should not emit LatencyMarker with this output.");
}

@Override
public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
throw new RuntimeException("Should not emit RecordAttributes with this output.");
}

@Override
public void emitWatermark(WatermarkEvent watermark) throws Exception {
throw new RuntimeException("Should not emit WatermarkEvent with this output.");
}
}
}
Loading

0 comments on commit 989c374

Please sign in to comment.