Skip to content

Commit

Permalink
[FLINK-37112][runtime] Process event time extension relted watermarks…
Browse files Browse the repository at this point in the history
… in EventTimeProcessFunction for DataStream V2
  • Loading branch information
codenohup committed Jan 16, 2025
1 parent f03a564 commit d9d6644
Show file tree
Hide file tree
Showing 24 changed files with 2,218 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,20 @@
import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.api.common.watermark.WatermarkDeclarations;
import org.apache.flink.datastream.api.extension.eventtime.function.EventTimeProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor;
import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder;
import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.ProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;

/**
* The entry point for the event-time extension, which provides the following functionality:
Expand All @@ -47,6 +58,16 @@
* source.process(watermarkGeneratorProcessFunction)
* .process(...)
* }</pre>
* <li>provides a tool to encapsulate a user-defined {@link EventTimeProcessFunction} to provide
* the relevant components of the event-time extension.
* <pre>{@code
* stream.process(
* EventTimeExtension.wrapProcessFunction(
* new CustomEventTimeProcessFunction()
* )
* )
* .process(...)
* }</pre>
* </ul>
*/
@Experimental
Expand Down Expand Up @@ -110,4 +131,100 @@ public static <T> EventTimeWatermarkGeneratorBuilder<T> newWatermarkGeneratorBui
EventTimeExtractor<T> eventTimeExtractor) {
return new EventTimeWatermarkGeneratorBuilder<>(eventTimeExtractor);
}

// ======== Wrap user-defined {@link EventTimeProcessFunction} =========

/**
* Wrap the user-defined {@link OneInputEventTimeStreamProcessFunction}, which will provide
* related components such as {@link EventTimeManager} and declare the necessary built-in state
* required for the Timer, etc. Note that registering event timers of {@link
* EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}.
*/
public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> wrapProcessFunction(
OneInputEventTimeStreamProcessFunction<IN, OUT> processFunction) {
try {
return (OneInputStreamProcessFunction<IN, OUT>)
getEventTimeExtensionImplClass()
.getMethod(
"wrapProcessFunction",
OneInputEventTimeStreamProcessFunction.class)
.invoke(null, processFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Wrap the user-defined {@link TwoOutputStreamProcessFunction}, which will provide related
* components such as {@link EventTimeManager} and declare the necessary built-in state required
* for the Timer, etc. Note that registering event timers of {@link EventTimeProcessFunction}
* can only be used with {@link KeyedPartitionStream}.
*/
public static <IN, OUT1, OUT2>
TwoOutputStreamProcessFunction<IN, OUT1, OUT2> wrapProcessFunction(
TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> processFunction) {
try {
return (TwoOutputStreamProcessFunction<IN, OUT1, OUT2>)
getEventTimeExtensionImplClass()
.getMethod(
"wrapProcessFunction",
TwoOutputEventTimeStreamProcessFunction.class)
.invoke(null, processFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Wrap the user-defined {@link TwoInputNonBroadcastEventTimeStreamProcessFunction}, which will
* provide related components such as {@link EventTimeManager} and declare the necessary
* built-in state required for the Timer, etc. Note that registering event timers of {@link
* EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}.
*/
public static <IN1, IN2, OUT>
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> wrapProcessFunction(
TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT>
processFunction) {
try {
return (TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>)
getEventTimeExtensionImplClass()
.getMethod(
"wrapProcessFunction",
TwoInputNonBroadcastEventTimeStreamProcessFunction.class)
.invoke(null, processFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Wrap the user-defined {@link TwoInputBroadcastEventTimeStreamProcessFunction}, which will
* provide related components such as {@link EventTimeManager} and declare the necessary
* built-in state required for the Timer, etc. Note that registering event timers of {@link
* EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}.
*/
public static <IN1, IN2, OUT>
TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> wrapProcessFunction(
TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT>
processFunction) {
try {
return (TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT>)
getEventTimeExtensionImplClass()
.getMethod(
"wrapProcessFunction",
TwoInputBroadcastEventTimeStreamProcessFunction.class)
.invoke(null, processFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static Class<?> getEventTimeExtensionImplClass() {
try {
return Class.forName(
"org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Please ensure that flink-datastream in your class path");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.eventtime.function;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
import org.apache.flink.datastream.api.function.ProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;

/**
* The base interface for event time processing, indicating that the {@link ProcessFunction} will be
* enriched with event time processing functions, such as registering event timers and handle event
* time watermarks.
*
* <p>Note that registering event timers can only be used with {@link KeyedPartitionStream}.
*/
@Experimental
public interface EventTimeProcessFunction extends ProcessFunction {
/**
* Initialize the {@link EventTimeProcessFunction} with an instance of {@link EventTimeManager}.
* Note that this method should be invoked before the open method.
*/
void initEventTimeProcessFunction(EventTimeManager eventTimeManager);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.eventtime.function;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;

/** The {@link OneInputStreamProcessFunction} that extends with event time support. */
@Experimental
public interface OneInputEventTimeStreamProcessFunction<IN, OUT>
extends EventTimeProcessFunction, OneInputStreamProcessFunction<IN, OUT> {

/**
* The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has
* received an EventTimeWatermark. Other types of watermarks will be processed by the {@code
* ProcessFunction#onWatermark} method.
*/
default void onEventTimeWatermark(
long watermarkTimestamp, Collector<OUT> output, NonPartitionedContext<OUT> ctx)
throws Exception {}

/**
* Invoked when an event-time timer fires. Note that it is only used in {@link
* KeyedPartitionStream}.
*/
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.eventtime.function;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;

/** The {@link TwoInputBroadcastStreamProcessFunction} that extends with event time support. */
@Experimental
public interface TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT>
extends EventTimeProcessFunction, TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> {

/**
* The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has
* received an EventTimeWatermark. Other types of watermarks will be processed by the {@code
* ProcessFunction#onWatermark} method.
*/
default void onEventTimeWatermark(
long watermarkTimestamp, Collector<OUT> output, NonPartitionedContext<OUT> ctx)
throws Exception {}

/**
* Invoked when an event-time timer fires. Note that it is only used in {@link
* KeyedPartitionStream}.
*/
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.eventtime.function;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;

/** The {@link TwoInputNonBroadcastStreamProcessFunction} that extends with event time support. */
@Experimental
public interface TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT>
extends EventTimeProcessFunction, TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> {

/**
* The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has
* received an EventTimeWatermark. Other types of watermarks will be processed by the {@code
* ProcessFunction#onWatermark} method.
*/
default void onEventTimeWatermark(
long watermarkTimestamp, Collector<OUT> output, NonPartitionedContext<OUT> ctx)
throws Exception {}

/**
* Invoked when an event-time timer fires. Note that it is only used in {@link
* KeyedPartitionStream}.
*/
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.eventtime.function;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;

/** The {@link TwoOutputStreamProcessFunction} that extends with event time support. */
@Experimental
public interface TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2>
extends EventTimeProcessFunction, TwoOutputStreamProcessFunction<IN, OUT1, OUT2> {

/**
* The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has
* received an EventTimeWatermark. Other types of watermarks will be processed by the {@code
* ProcessFunction#onWatermark} method.
*/
default void onEventTimeWatermark(
long watermarkTimestamp,
Collector<OUT1> output1,
Collector<OUT2> output2,
TwoOutputNonPartitionedContext<OUT1, OUT2> ctx)
throws Exception {}

/**
* Invoked when an event-time timer fires. Note that it is only used in {@link
* KeyedPartitionStream}.
*/
default void onEventTimer(
long timestamp,
Collector<OUT1> output1,
Collector<OUT2> output2,
TwoOutputPartitionedContext<OUT1, OUT2> ctx) {}
}
Loading

0 comments on commit d9d6644

Please sign in to comment.