From ebc4796038f0293310154f9866a17a352b9b5751 Mon Sep 17 00:00:00 2001 From: Xu Huang Date: Thu, 16 Jan 2025 13:46:42 +0800 Subject: [PATCH] [FLINK-37112][API] Process event time extension related watermarks in EventTimeProcessFunction for DataStream V2 --- .../eventtime/EventTimeExtension.java | 134 +++ .../function/EventTimeProcessFunction.java | 40 + ...neInputEventTimeStreamProcessFunction.java | 47 + ...oadcastEventTimeStreamProcessFunction.java | 47 + ...oadcastEventTimeStreamProcessFunction.java | 47 + ...oOutputEventTimeStreamProcessFunction.java | 54 ++ .../eventtime/timer/EventTimeManager.java | 53 ++ .../eventtime/EventTimeExtensionImpl.java | 38 + ...eWrappedOneInputStreamProcessFunction.java | 143 +++ ...woInputBroadcastStreamProcessFunction.java | 180 ++++ ...nputNonBroadcastStreamProcessFunction.java | 179 ++++ ...WrappedTwoOutputStreamProcessFunction.java | 156 ++++ .../timer/DefaultEventTimeManager.java | 72 ++ .../impl/operators/KeyedProcessOperator.java | 17 +- ...KeyedTwoInputBroadcastProcessOperator.java | 17 +- ...edTwoInputNonBroadcastProcessOperator.java | 18 +- .../KeyedTwoOutputProcessOperator.java | 21 +- .../impl/operators/ProcessOperator.java | 18 + .../TwoInputBroadcastProcessOperator.java | 20 + .../TwoInputNonBroadcastProcessOperator.java | 25 + .../operators/TwoOutputProcessOperator.java | 19 + .../impl/stream/BroadcastStreamImpl.java | 15 + .../datastream/impl/utils/StreamUtils.java | 31 + .../eventtime/EventTimeExtensionITCase.java | 848 ++++++++++++++++++ 24 files changed, 2235 insertions(+), 4 deletions(-) create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java create mode 100644 flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java create mode 100644 flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java create mode 100644 flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java create mode 100644 flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java create mode 100644 flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java index 9417baef1d5932..a0ac5f8c499979 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java @@ -23,9 +23,20 @@ import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; import org.apache.flink.api.common.watermark.Watermark; import org.apache.flink.api.common.watermark.WatermarkDeclarations; +import org.apache.flink.datastream.api.extension.eventtime.function.EventTimeProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction; import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor; import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.datastream.api.function.ProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; /** * The entry point for the event-time extension, which provides the following functionality: @@ -47,6 +58,16 @@ * source.process(watermarkGeneratorProcessFunction) * .process(...) * } + *
  • provides a tool to encapsulate a user-defined {@link EventTimeProcessFunction} to provide + * the relevant components of the event-time extension. + *
    {@code
    + * stream.process(
    + *          EventTimeExtension.wrapProcessFunction(
    + *              new CustomEventTimeProcessFunction()
    + *          )
    + *       )
    + *       .process(...)
    + * }
    * */ @Experimental @@ -136,4 +157,117 @@ public static EventTimeWatermarkGeneratorBuilder newWatermarkGeneratorBui EventTimeExtractor eventTimeExtractor) { return new EventTimeWatermarkGeneratorBuilder<>(eventTimeExtractor); } + + // ======== Wrap user-defined {@link EventTimeProcessFunction} ========= + + /** + * Wrap the user-defined {@link OneInputEventTimeStreamProcessFunction}, which will provide + * related components such as {@link EventTimeManager} and declare the necessary built-in state + * required for the Timer, etc. Note that registering event timers of {@link + * EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}. + * + * @param processFunction The user-defined {@link OneInputEventTimeStreamProcessFunction} that + * needs to be wrapped. + * @return The wrapped {@link OneInputStreamProcessFunction}. + */ + public static OneInputStreamProcessFunction wrapProcessFunction( + OneInputEventTimeStreamProcessFunction processFunction) { + try { + return (OneInputStreamProcessFunction) + getEventTimeExtensionImplClass() + .getMethod( + "wrapProcessFunction", + OneInputEventTimeStreamProcessFunction.class) + .invoke(null, processFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the user-defined {@link TwoOutputStreamProcessFunction}, which will provide related + * components such as {@link EventTimeManager} and declare the necessary built-in state required + * for the Timer, etc. Note that registering event timers of {@link EventTimeProcessFunction} + * can only be used with {@link KeyedPartitionStream}. + * + * @param processFunction The user-defined {@link TwoOutputEventTimeStreamProcessFunction} that + * needs to be wrapped. + * @return The wrapped {@link TwoOutputStreamProcessFunction}. + */ + public static + TwoOutputStreamProcessFunction wrapProcessFunction( + TwoOutputEventTimeStreamProcessFunction processFunction) { + try { + return (TwoOutputStreamProcessFunction) + getEventTimeExtensionImplClass() + .getMethod( + "wrapProcessFunction", + TwoOutputEventTimeStreamProcessFunction.class) + .invoke(null, processFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the user-defined {@link TwoInputNonBroadcastEventTimeStreamProcessFunction}, which will + * provide related components such as {@link EventTimeManager} and declare the necessary + * built-in state required for the Timer, etc. Note that registering event timers of {@link + * EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}. + * + * @param processFunction The user-defined {@link + * TwoInputNonBroadcastEventTimeStreamProcessFunction} that needs to be wrapped. + * @return The wrapped {@link TwoInputNonBroadcastStreamProcessFunction}. + */ + public static + TwoInputNonBroadcastStreamProcessFunction wrapProcessFunction( + TwoInputNonBroadcastEventTimeStreamProcessFunction + processFunction) { + try { + return (TwoInputNonBroadcastStreamProcessFunction) + getEventTimeExtensionImplClass() + .getMethod( + "wrapProcessFunction", + TwoInputNonBroadcastEventTimeStreamProcessFunction.class) + .invoke(null, processFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the user-defined {@link TwoInputBroadcastEventTimeStreamProcessFunction}, which will + * provide related components such as {@link EventTimeManager} and declare the necessary + * built-in state required for the Timer, etc. Note that registering event timers of {@link + * EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}. + * + * @param processFunction The user-defined {@link + * TwoInputBroadcastEventTimeStreamProcessFunction} that needs to be wrapped. + * @return The wrapped {@link TwoInputBroadcastStreamProcessFunction}. + */ + public static + TwoInputBroadcastStreamProcessFunction wrapProcessFunction( + TwoInputBroadcastEventTimeStreamProcessFunction + processFunction) { + try { + return (TwoInputBroadcastStreamProcessFunction) + getEventTimeExtensionImplClass() + .getMethod( + "wrapProcessFunction", + TwoInputBroadcastEventTimeStreamProcessFunction.class) + .invoke(null, processFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Get the implementation class of EventTimeExtension. */ + private static Class getEventTimeExtensionImplClass() { + try { + return Class.forName( + "org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Please ensure that flink-datastream in your class path"); + } + } } diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java new file mode 100644 index 00000000000000..705cedcf321558 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.ProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** + * The base interface for event time processing, indicating that the {@link ProcessFunction} will be + * enriched with event time processing functions, such as registering event timers and handle event + * time watermarks. + * + *

    Note that registering event timers can only be used with {@link KeyedPartitionStream}. + */ +@Experimental +public interface EventTimeProcessFunction extends ProcessFunction { + /** + * Initialize the {@link EventTimeProcessFunction} with an instance of {@link EventTimeManager}. + * Note that this method should be invoked before the open method. + */ + void initEventTimeProcessFunction(EventTimeManager eventTimeManager); +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java new file mode 100644 index 00000000000000..86da2a867d0d43 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** The {@link OneInputStreamProcessFunction} that extends with event time support. */ +@Experimental +public interface OneInputEventTimeStreamProcessFunction + extends EventTimeProcessFunction, OneInputStreamProcessFunction { + + /** + * The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has + * received an EventTimeWatermark. Other types of watermarks will be processed by the {@code + * ProcessFunction#onWatermark} method. + */ + default void onEventTimeWatermark( + long watermarkTimestamp, Collector output, NonPartitionedContext ctx) + throws Exception {} + + /** + * Invoked when an event-time timer fires. Note that it is only used in {@link + * KeyedPartitionStream}. + */ + default void onEventTimer(long timestamp, Collector output, PartitionedContext ctx) {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java new file mode 100644 index 00000000000000..efa62f8a56fa33 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** The {@link TwoInputBroadcastStreamProcessFunction} that extends with event time support. */ +@Experimental +public interface TwoInputBroadcastEventTimeStreamProcessFunction + extends EventTimeProcessFunction, TwoInputBroadcastStreamProcessFunction { + + /** + * The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has + * received an EventTimeWatermark. Other types of watermarks will be processed by the {@code + * ProcessFunction#onWatermark} method. + */ + default void onEventTimeWatermark( + long watermarkTimestamp, Collector output, NonPartitionedContext ctx) + throws Exception {} + + /** + * Invoked when an event-time timer fires. Note that it is only used in {@link + * KeyedPartitionStream}. + */ + default void onEventTimer(long timestamp, Collector output, PartitionedContext ctx) {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java new file mode 100644 index 00000000000000..1502825f781bf6 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** The {@link TwoInputNonBroadcastStreamProcessFunction} that extends with event time support. */ +@Experimental +public interface TwoInputNonBroadcastEventTimeStreamProcessFunction + extends EventTimeProcessFunction, TwoInputNonBroadcastStreamProcessFunction { + + /** + * The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has + * received an EventTimeWatermark. Other types of watermarks will be processed by the {@code + * ProcessFunction#onWatermark} method. + */ + default void onEventTimeWatermark( + long watermarkTimestamp, Collector output, NonPartitionedContext ctx) + throws Exception {} + + /** + * Invoked when an event-time timer fires. Note that it is only used in {@link + * KeyedPartitionStream}. + */ + default void onEventTimer(long timestamp, Collector output, PartitionedContext ctx) {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java new file mode 100644 index 00000000000000..56e33db636f08d --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; +import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** The {@link TwoOutputStreamProcessFunction} that extends with event time support. */ +@Experimental +public interface TwoOutputEventTimeStreamProcessFunction + extends EventTimeProcessFunction, TwoOutputStreamProcessFunction { + + /** + * The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has + * received an EventTimeWatermark. Other types of watermarks will be processed by the {@code + * ProcessFunction#onWatermark} method. + */ + default void onEventTimeWatermark( + long watermarkTimestamp, + Collector output1, + Collector output2, + TwoOutputNonPartitionedContext ctx) + throws Exception {} + + /** + * Invoked when an event-time timer fires. Note that it is only used in {@link + * KeyedPartitionStream}. + */ + default void onEventTimer( + long timestamp, + Collector output1, + Collector output2, + TwoOutputPartitionedContext ctx) {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java new file mode 100644 index 00000000000000..c5df8e26dea466 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.timer; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** + * This class is responsible for managing stuff related to event-time/timer. For example, register + * and delete event timers, as well as retrieve event time. Note that methods for timer can only be + * used in {@link KeyedPartitionStream}. + */ +@Experimental +public interface EventTimeManager { + /** + * Register an event timer for this process function. The {@code onEventTimer} method will be + * invoked when the event time is reached. + * + * @param timestamp to trigger timer callback. + */ + void registerTimer(long timestamp); + + /** + * Deletes the event-time timer with the given trigger timestamp. This method has only an effect + * if such a timer was previously registered and did not already expire. + * + * @param timestamp indicates the timestamp of the timer to delete. + */ + void deleteTimer(long timestamp); + + /** + * Get the current event time. + * + * @return current event time. + */ + long currentTime(); +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java index a4a26bc949031e..f01972559d65a8 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java @@ -19,8 +19,19 @@ import org.apache.flink.api.common.watermark.Watermark; import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction; import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy; import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction; import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction; /** The implementation of {@link EventTimeExtension}. */ @@ -37,6 +48,33 @@ public static OneInputStreamProcessFunction buildAsProcessFunction( return new ExtractEventTimeProcessFunction<>(strategy); } + // ============= Wrap Event Time Process Function ============= + + public static OneInputStreamProcessFunction wrapProcessFunction( + OneInputEventTimeStreamProcessFunction processFunction) { + return new EventTimeWrappedOneInputStreamProcessFunction<>(processFunction); + } + + public static + TwoOutputStreamProcessFunction wrapProcessFunction( + TwoOutputEventTimeStreamProcessFunction processFunction) { + return new EventTimeWrappedTwoOutputStreamProcessFunction<>(processFunction); + } + + public static + TwoInputNonBroadcastStreamProcessFunction wrapProcessFunction( + TwoInputNonBroadcastEventTimeStreamProcessFunction + processFunction) { + return new EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<>(processFunction); + } + + public static + TwoInputBroadcastStreamProcessFunction wrapProcessFunction( + TwoInputBroadcastEventTimeStreamProcessFunction + processFunction) { + return new EventTimeWrappedTwoInputBroadcastStreamProcessFunction<>(processFunction); + } + // ============= Other Utils ============= public static boolean isEventTimeExtensionWatermark(Watermark watermark) { return EventTimeExtension.isEventTimeWatermark(watermark) diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java new file mode 100644 index 00000000000000..b1867af7a39b70 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.function.Supplier; + +/** + * The wrapped {@link OneInputEventTimeStreamProcessFunction} that take care of event-time alignment + * with idleness. + */ +public class EventTimeWrappedOneInputStreamProcessFunction + implements OneInputStreamProcessFunction { + + private final OneInputEventTimeStreamProcessFunction wrappedUserFunction; + + private transient EventTimeManager eventTimeManager; + + private transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + + public EventTimeWrappedOneInputStreamProcessFunction( + OneInputEventTimeStreamProcessFunction wrappedUserFunction) { + this.wrappedUserFunction = Preconditions.checkNotNull(wrappedUserFunction); + } + + @Override + public void open(NonPartitionedContext ctx) throws Exception { + wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager); + wrappedUserFunction.open(ctx); + } + + /** + * Initialize the event time extension, note that this method should be invoked before open + * method. + */ + public void initEventTimeExtension( + @Nullable InternalTimerService timerService, + Supplier eventTimeSupplier, + EventTimeWatermarkHandler eventTimeWatermarkHandler) { + this.eventTimeManager = new DefaultEventTimeManager(timerService, eventTimeSupplier); + this.eventTimeWatermarkHandler = eventTimeWatermarkHandler; + } + + @Override + public void processRecord(IN record, Collector output, PartitionedContext ctx) + throws Exception { + wrappedUserFunction.processRecord(record, output, ctx); + } + + @Override + public void endInput(NonPartitionedContext ctx) { + wrappedUserFunction.endInput(ctx); + } + + @Override + public void onProcessingTimer( + long timestamp, Collector output, PartitionedContext ctx) { + wrappedUserFunction.onProcessingTimer(timestamp, output, ctx); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, Collector output, NonPartitionedContext ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 0); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermark(watermark, output, ctx); + } + } + + public void onEventTime(long timestamp, Collector output, PartitionedContext ctx) { + wrappedUserFunction.onEventTimer(timestamp, output, ctx); + } + + @Override + public Set usesStates() { + return wrappedUserFunction.usesStates(); + } + + @Override + public Set declareWatermarks() { + return wrappedUserFunction.declareWatermarks(); + } + + @Override + public void close() throws Exception { + wrappedUserFunction.close(); + } + + public OneInputStreamProcessFunction getWrappedUserFunction() { + return wrappedUserFunction; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java new file mode 100644 index 00000000000000..db3c256482b31c --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.function.Supplier; + +/** + * The wrapped {@link TwoInputBroadcastEventTimeStreamProcessFunction} that take care of event-time + * alignment with idleness. + */ +public class EventTimeWrappedTwoInputBroadcastStreamProcessFunction + implements TwoInputBroadcastStreamProcessFunction { + + private final TwoInputBroadcastEventTimeStreamProcessFunction + wrappedUserFunction; + + private transient EventTimeManager eventTimeManager; + + private transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + + public EventTimeWrappedTwoInputBroadcastStreamProcessFunction( + TwoInputBroadcastEventTimeStreamProcessFunction wrappedUserFunction) { + this.wrappedUserFunction = Preconditions.checkNotNull(wrappedUserFunction); + } + + @Override + public void open(NonPartitionedContext ctx) throws Exception { + wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager); + wrappedUserFunction.open(ctx); + } + + /** + * Initialize the event time extension, note that this method should be invoked before open + * method. + */ + public void initEventTimeExtension( + @Nullable InternalTimerService timerService, + Supplier eventTimeSupplier, + EventTimeWatermarkHandler eventTimeWatermarkHandler) { + this.eventTimeManager = new DefaultEventTimeManager(timerService, eventTimeSupplier); + this.eventTimeWatermarkHandler = eventTimeWatermarkHandler; + } + + @Override + public void processRecordFromNonBroadcastInput( + IN1 record, Collector output, PartitionedContext ctx) throws Exception { + wrappedUserFunction.processRecordFromNonBroadcastInput(record, output, ctx); + } + + @Override + public void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext ctx) + throws Exception { + wrappedUserFunction.processRecordFromBroadcastInput(record, ctx); + } + + @Override + public void endBroadcastInput(NonPartitionedContext ctx) { + wrappedUserFunction.endBroadcastInput(ctx); + } + + @Override + public void endNonBroadcastInput(NonPartitionedContext ctx) { + wrappedUserFunction.endNonBroadcastInput(ctx); + } + + @Override + public void onProcessingTimer( + long timestamp, Collector output, PartitionedContext ctx) { + wrappedUserFunction.onProcessingTimer(timestamp, output, ctx); + } + + @Override + public WatermarkHandlingResult onWatermarkFromBroadcastInput( + Watermark watermark, Collector output, NonPartitionedContext ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 0); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermarkFromBroadcastInput(watermark, output, ctx); + } + } + + @Override + public WatermarkHandlingResult onWatermarkFromNonBroadcastInput( + Watermark watermark, Collector output, NonPartitionedContext ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 1); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermarkFromBroadcastInput(watermark, output, ctx); + } + } + + public void onEventTime(long timestamp, Collector output, PartitionedContext ctx) { + wrappedUserFunction.onEventTimer(timestamp, output, ctx); + } + + @Override + public void close() throws Exception { + wrappedUserFunction.close(); + } + + @Override + public Set usesStates() { + return wrappedUserFunction.usesStates(); + } + + @Override + public Set declareWatermarks() { + return wrappedUserFunction.declareWatermarks(); + } + + public TwoInputBroadcastStreamProcessFunction getWrappedUserFunction() { + return wrappedUserFunction; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java new file mode 100644 index 00000000000000..a8dc809a90e61e --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.function.Supplier; + +/** + * The wrapped {@link TwoInputNonBroadcastEventTimeStreamProcessFunction} that take care of + * event-time alignment with idleness. + */ +public class EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction + implements TwoInputNonBroadcastStreamProcessFunction { + + private final TwoInputNonBroadcastEventTimeStreamProcessFunction + wrappedUserFunction; + + private transient EventTimeManager eventTimeManager; + + private transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + + public EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction( + TwoInputNonBroadcastEventTimeStreamProcessFunction wrappedUserFunction) { + this.wrappedUserFunction = Preconditions.checkNotNull(wrappedUserFunction); + } + + @Override + public void open(NonPartitionedContext ctx) throws Exception { + wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager); + wrappedUserFunction.open(ctx); + } + + /** + * Initialize the event time extension, note that this method should be invoked before open + * method. + */ + public void initEventTimeExtension( + @Nullable InternalTimerService timerService, + Supplier eventTimeSupplier, + EventTimeWatermarkHandler eventTimeWatermarkHandler) { + this.eventTimeManager = new DefaultEventTimeManager(timerService, eventTimeSupplier); + this.eventTimeWatermarkHandler = eventTimeWatermarkHandler; + } + + @Override + public void processRecordFromFirstInput( + IN1 record, Collector output, PartitionedContext ctx) throws Exception { + wrappedUserFunction.processRecordFromFirstInput(record, output, ctx); + } + + @Override + public void processRecordFromSecondInput( + IN2 record, Collector output, PartitionedContext ctx) throws Exception { + wrappedUserFunction.processRecordFromSecondInput(record, output, ctx); + } + + @Override + public void endFirstInput(NonPartitionedContext ctx) { + wrappedUserFunction.endFirstInput(ctx); + } + + @Override + public void endSecondInput(NonPartitionedContext ctx) { + wrappedUserFunction.endSecondInput(ctx); + } + + @Override + public void onProcessingTimer( + long timestamp, Collector output, PartitionedContext ctx) { + wrappedUserFunction.onProcessingTimer(timestamp, output, ctx); + } + + @Override + public WatermarkHandlingResult onWatermarkFromFirstInput( + Watermark watermark, Collector output, NonPartitionedContext ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 0); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermarkFromFirstInput(watermark, output, ctx); + } + } + + @Override + public WatermarkHandlingResult onWatermarkFromSecondInput( + Watermark watermark, Collector output, NonPartitionedContext ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 1); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermarkFromSecondInput(watermark, output, ctx); + } + } + + public void onEventTime(long timestamp, Collector output, PartitionedContext ctx) { + wrappedUserFunction.onEventTimer(timestamp, output, ctx); + } + + @Override + public void close() throws Exception { + wrappedUserFunction.close(); + } + + @Override + public Set usesStates() { + return wrappedUserFunction.usesStates(); + } + + @Override + public Set declareWatermarks() { + return wrappedUserFunction.declareWatermarks(); + } + + public TwoInputNonBroadcastStreamProcessFunction getWrappedUserFunction() { + return wrappedUserFunction; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java new file mode 100644 index 00000000000000..5a272f31588813 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; +import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.function.Supplier; + +/** + * The wrapped {@link TwoOutputEventTimeStreamProcessFunction} that take care of event-time + * alignment with idleness. + */ +public class EventTimeWrappedTwoOutputStreamProcessFunction + implements TwoOutputStreamProcessFunction { + + private final TwoOutputEventTimeStreamProcessFunction wrappedUserFunction; + + private transient EventTimeManager eventTimeManager; + + private transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + + public EventTimeWrappedTwoOutputStreamProcessFunction( + TwoOutputEventTimeStreamProcessFunction wrappedUserFunction) { + this.wrappedUserFunction = Preconditions.checkNotNull(wrappedUserFunction); + } + + @Override + public void open(TwoOutputNonPartitionedContext ctx) throws Exception { + wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager); + wrappedUserFunction.open(ctx); + } + + /** + * Initialize the event time extension, note that this method should be invoked before open + * method. + */ + public void initEventTimeExtension( + @Nullable InternalTimerService timerService, + Supplier eventTimeSupplier, + EventTimeWatermarkHandler eventTimeWatermarkHandler) { + this.eventTimeManager = new DefaultEventTimeManager(timerService, eventTimeSupplier); + this.eventTimeWatermarkHandler = eventTimeWatermarkHandler; + } + + @Override + public void processRecord( + IN record, + Collector output1, + Collector output2, + TwoOutputPartitionedContext ctx) + throws Exception { + wrappedUserFunction.processRecord(record, output1, output2, ctx); + } + + @Override + public void endInput(TwoOutputNonPartitionedContext ctx) { + wrappedUserFunction.endInput(ctx); + } + + @Override + public void onProcessingTimer( + long timestamp, + Collector output1, + Collector output2, + TwoOutputPartitionedContext ctx) { + wrappedUserFunction.onProcessingTimer(timestamp, output1, output2, ctx); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector output1, + Collector output2, + TwoOutputNonPartitionedContext ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 0); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output1, output2, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermark(watermark, output1, output2, ctx); + } + } + + public void onEventTime( + long timestamp, + Collector output1, + Collector output2, + TwoOutputPartitionedContext ctx) { + wrappedUserFunction.onEventTimer(timestamp, output1, output2, ctx); + } + + @Override + public Set usesStates() { + return wrappedUserFunction.usesStates(); + } + + @Override + public Set declareWatermarks() { + return wrappedUserFunction.declareWatermarks(); + } + + @Override + public void close() throws Exception { + wrappedUserFunction.close(); + } + + public TwoOutputStreamProcessFunction getWrappedUserFunction() { + return wrappedUserFunction; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java new file mode 100644 index 00000000000000..8beb78a7cc93cb --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.timer; + +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; + +import javax.annotation.Nullable; + +import java.util.function.Supplier; + +/** The implementation of {@link EventTimeManager}. */ +public class DefaultEventTimeManager implements EventTimeManager { + + /** + * The timer service of operator, used in register event timer. Note that it cloud be null if + * the operator is not a keyed operator. + */ + @Nullable private final InternalTimerService timerService; + + /** The supplier of the current event time. */ + private final Supplier eventTimeSupplier; + + public DefaultEventTimeManager( + @Nullable InternalTimerService timerService, + Supplier eventTimeSupplier) { + this.timerService = timerService; + this.eventTimeSupplier = eventTimeSupplier; + } + + @Override + public void registerTimer(long timestamp) { + if (timerService == null) { + throw new UnsupportedOperationException( + "Registering event timer is not allowed in NonKeyed Stream."); + } + + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp); + } + + @Override + public void deleteTimer(long timestamp) { + if (timerService == null) { + throw new UnsupportedOperationException( + "Deleting event timer is not allowed in NonKeyed Stream."); + } + + timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, timestamp); + } + + @Override + public long currentTime() { + return eventTimeSupplier.get(); + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java index 069b4991f444cc..23bae4b9921f27 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -39,6 +40,7 @@ import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; /** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */ public class KeyedProcessOperator extends ProcessOperator @@ -84,7 +86,10 @@ protected Object currentKey() { @Override public void onEventTime(InternalTimer timer) throws Exception { - // do nothing at the moment. + if (userFunction instanceof EventTimeWrappedOneInputStreamProcessFunction) { + ((EventTimeWrappedOneInputStreamProcessFunction) userFunction) + .onEventTime(timer.getTimestamp(), getOutputCollector(), partitionedContext); + } } @Override @@ -121,4 +126,14 @@ public void setKeyContextElement1(StreamRecord record) throws Exception { public boolean isAsyncStateProcessingEnabled() { return true; } + + @Override + protected InternalTimerService getTimerService() { + return timerService; + } + + @Override + protected Supplier getEventTimeSupplier() { + return () -> timerService.currentWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java index 53cb2dea7d6f47..bb985643e76df3 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -39,6 +40,7 @@ import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; /** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. */ public class KeyedTwoInputBroadcastProcessOperator @@ -90,7 +92,10 @@ protected ProcessingTimeManager getProcessingTimeManager() { @Override public void onEventTime(InternalTimer timer) throws Exception { - // do nothing at the moment. + if (userFunction instanceof EventTimeWrappedTwoInputBroadcastStreamProcessFunction) { + ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction) userFunction) + .onEventTime(timer.getTimestamp(), getOutputCollector(), partitionedContext); + } } @Override @@ -123,4 +128,14 @@ public void setKeyContextElement1(StreamRecord record) throws Exception { public boolean isAsyncStateProcessingEnabled() { return true; } + + @Override + protected InternalTimerService getTimerService() { + return timerService; + } + + @Override + protected Supplier getEventTimeSupplier() { + return () -> timerService.currentWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java index 2644dc11b6aae3..f6e0f8b71ecf27 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -39,6 +40,7 @@ import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; /** * Operator for {@link TwoInputNonBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. @@ -92,7 +94,11 @@ protected ProcessingTimeManager getProcessingTimeManager() { @Override public void onEventTime(InternalTimer timer) throws Exception { - // do nothing at the moment. + if (userFunction instanceof EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) { + ((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) + userFunction) + .onEventTime(timer.getTimestamp(), getOutputCollector(), partitionedContext); + } } @Override @@ -131,4 +137,14 @@ public void setKeyContextElement2(StreamRecord record) throws Exception { public boolean isAsyncStateProcessingEnabled() { return true; } + + @Override + protected InternalTimerService getTimerService() { + return timerService; + } + + @Override + protected Supplier getEventTimeSupplier() { + return () -> timerService.currentWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java index aefc1b1b60df8a..db9dfc2cb42b28 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java @@ -27,6 +27,7 @@ import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager; import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -40,6 +41,7 @@ import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; /** */ public class KeyedTwoOutputProcessOperator @@ -113,7 +115,14 @@ protected ProcessingTimeManager getProcessingTimeManager() { @Override public void onEventTime(InternalTimer timer) throws Exception { - // do nothing at the moment. + if (userFunction instanceof EventTimeWrappedTwoOutputStreamProcessFunction) { + ((EventTimeWrappedTwoOutputStreamProcessFunction) userFunction) + .onEventTime( + timer.getTimestamp(), + getMainCollector(), + getSideCollector(), + partitionedContext); + } } @Override @@ -146,4 +155,14 @@ public void setKeyContextElement1(StreamRecord record) throws Exception { public boolean isAsyncStateProcessingEnabled() { return true; } + + @Override + protected InternalTimerService getTimerService() { + return timerService; + } + + @Override + protected Supplier getEventTimeSupplier() { + return () -> timerService.currentWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java index 1782e043414097..efb95e1ebcc81e 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java @@ -31,10 +31,13 @@ import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction; import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -44,6 +47,7 @@ import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; /** Operator for {@link OneInputStreamProcessFunction}. */ @@ -114,6 +118,12 @@ public void open() throws Exception { getExecutionConfig(), partitionedContext.getNonPartitionedContext().getWatermarkManager(), getProcessingTimeService()); + } else if (userFunction instanceof EventTimeWrappedOneInputStreamProcessFunction) { + // note that the {@code initEventTimeExtension} in EventTimeWrappedProcessFunction + // should be invoked before the {@code open}. + ((EventTimeWrappedOneInputStreamProcessFunction) userFunction) + .initEventTimeExtension( + getTimerService(), getEventTimeSupplier(), eventTimeWatermarkHandler); } userFunction.open(nonPartitionedContext); @@ -201,4 +211,12 @@ public boolean isAsyncStateProcessingEnabled() { // For non-keyed operators, we disable async state processing. return false; } + + protected InternalTimerService getTimerService() { + return null; + } + + protected Supplier getEventTimeSupplier() { + return () -> eventTimeWatermarkHandler.getLastEmitWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java index 926c133f0fe714..0a661fdf1e2c90 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java @@ -31,9 +31,12 @@ import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -43,6 +46,7 @@ import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; @@ -108,6 +112,14 @@ public void open() throws Exception { this.eventTimeWatermarkHandler = new EventTimeWatermarkHandler(2, output, timeServiceManager); + if (userFunction instanceof EventTimeWrappedTwoInputBroadcastStreamProcessFunction) { + // note that the {@code initEventTimeExtension} in EventTimeWrappedProcessFunction + // should be invoked before the {@code open}. + ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction) userFunction) + .initEventTimeExtension( + getTimerService(), getEventTimeSupplier(), eventTimeWatermarkHandler); + } + this.userFunction.open(this.nonPartitionedContext); } @@ -224,4 +236,12 @@ public boolean isAsyncStateProcessingEnabled() { // For non-keyed operators, we disable async state processing. return false; } + + protected InternalTimerService getTimerService() { + return null; + } + + protected Supplier getEventTimeSupplier() { + return () -> eventTimeWatermarkHandler.getLastEmitWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java index 3d699dbbf2d9b1..05c207a082cb68 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy; import org.apache.flink.datastream.api.context.NonPartitionedContext; import org.apache.flink.datastream.api.context.ProcessingTimeManager; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; @@ -31,10 +32,14 @@ import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -44,6 +49,7 @@ import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; @@ -111,6 +117,17 @@ public void open() throws Exception { this.eventTimeWatermarkHandler = new EventTimeWatermarkHandler(2, output, timeServiceManager); + if (userFunction instanceof EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) { + // note that the {@code initEventTimeExtension} in EventTimeWrappedProcessFunction + // should be invoked before the {@code open}. + EventTimeManager eventTimeManager = + new DefaultEventTimeManager(getTimerService(), getEventTimeSupplier()); + ((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) + userFunction) + .initEventTimeExtension( + getTimerService(), getEventTimeSupplier(), eventTimeWatermarkHandler); + } + this.userFunction.open(this.nonPartitionedContext); } @@ -227,4 +244,12 @@ public boolean isAsyncStateProcessingEnabled() { // For non-keyed operators, we disable async state processing. return false; } + + protected InternalTimerService getTimerService() { + return null; + } + + protected Supplier getEventTimeSupplier() { + return () -> eventTimeWatermarkHandler.getLastEmitWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java index 91d0c0f074f25b..cdf36e55ef6d44 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java @@ -32,9 +32,12 @@ import org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -46,6 +49,7 @@ import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -119,6 +123,13 @@ public void open() throws Exception { this.partitionedContext.setNonPartitionedContext(nonPartitionedContext); this.eventTimeWatermarkHandler = new EventTimeWatermarkHandler(1, output, timeServiceManager); + if (userFunction instanceof EventTimeWrappedTwoOutputStreamProcessFunction) { + // note that the {@code initEventTimeExtension} in EventTimeWrappedProcessFunction + // should be invoked before the {@code open}. + ((EventTimeWrappedTwoOutputStreamProcessFunction) userFunction) + .initEventTimeExtension( + getTimerService(), getEventTimeSupplier(), eventTimeWatermarkHandler); + } this.userFunction.open(this.nonPartitionedContext); } @@ -236,4 +247,12 @@ public boolean isAsyncStateProcessingEnabled() { // For non-keyed operators, we disable async state processing. return false; } + + protected InternalTimerService getTimerService() { + return null; + } + + protected Supplier getEventTimeSupplier() { + return () -> eventTimeWatermarkHandler.getLastEmitWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java index 8b02eabdaf36e1..724dd7b1a645fa 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java @@ -63,6 +63,11 @@ public ProcessConfigurableAndNonKeyedPartitionStream conn // no state redistribution mode check is required here, since all redistribution modes are // acceptable + other = + other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) other) + .getKeyedPartitionStream() + : other; TypeInformation outTypeInfo = StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction( processFunction, @@ -92,6 +97,11 @@ public ProcessConfigurableAndNonKeyedPartitionStream connect processFunction.usesStates(), new HashSet<>(Collections.singletonList(StateDeclaration.RedistributionMode.NONE))); + other = + other instanceof ProcessConfigurableAndNonKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndNonKeyedPartitionStreamImpl) other) + .getNonKeyedPartitionStream() + : other; TypeInformation outTypeInfo = StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction( processFunction, @@ -118,6 +128,11 @@ public ProcessConfigurableAndKeyedPartitionStream conn KeyedPartitionStream other, TwoInputBroadcastStreamProcessFunction processFunction, KeySelector newKeySelector) { + other = + other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) other) + .getKeyedPartitionStream() + : other; TypeInformation outTypeInfo = StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction( processFunction, diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java index 6ec420a8135b8b..3f73608dcf9dea 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java @@ -34,6 +34,10 @@ import org.apache.flink.datastream.api.stream.GlobalStream.ProcessConfigurableAndGlobalStream; import org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction; import org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction; import org.apache.flink.datastream.impl.stream.AbstractDataStream; import org.apache.flink.datastream.impl.stream.GlobalStreamImpl; @@ -66,6 +70,12 @@ public final class StreamUtils { public static TypeInformation getOutputTypeForOneInputProcessFunction( OneInputStreamProcessFunction processFunction, TypeInformation inTypeInformation) { + if (processFunction instanceof EventTimeWrappedOneInputStreamProcessFunction) { + processFunction = + ((EventTimeWrappedOneInputStreamProcessFunction) processFunction) + .getWrappedUserFunction(); + } + return TypeExtractor.getUnaryOperatorReturnType( processFunction, OneInputStreamProcessFunction.class, @@ -101,6 +111,12 @@ TypeInformation getOutputTypeForTwoInputNonBroadcastProcessFunction( true); } + if (processFunction instanceof EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) { + processFunction = + ((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) processFunction) + .getWrappedUserFunction(); + } + return TypeExtractor.getBinaryOperatorReturnType( processFunction, TwoInputNonBroadcastStreamProcessFunction.class, @@ -123,6 +139,12 @@ TypeInformation getOutputTypeForTwoInputBroadcastProcessFunction( TwoInputBroadcastStreamProcessFunction processFunction, TypeInformation in1TypeInformation, TypeInformation in2TypeInformation) { + if (processFunction instanceof EventTimeWrappedTwoInputBroadcastStreamProcessFunction) { + processFunction = + ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction) processFunction) + .getWrappedUserFunction(); + } + return TypeExtractor.getBinaryOperatorReturnType( processFunction, TwoInputBroadcastStreamProcessFunction.class, @@ -146,6 +168,15 @@ TypeInformation getOutputTypeForTwoInputBroadcastProcessFunction( TwoOutputStreamProcessFunction twoOutputStreamProcessFunction, TypeInformation inTypeInformation) { + + if (twoOutputStreamProcessFunction + instanceof EventTimeWrappedTwoOutputStreamProcessFunction) { + twoOutputStreamProcessFunction = + ((EventTimeWrappedTwoOutputStreamProcessFunction) + twoOutputStreamProcessFunction) + .getWrappedUserFunction(); + } + TypeInformation firstOutputType = TypeExtractor.getUnaryOperatorReturnType( twoOutputStreamProcessFunction, diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java new file mode 100644 index 00000000000000..a2bb6a78c0f0a7 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java @@ -0,0 +1,848 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream.extension.eventtime; + +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; +import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.Serializable; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** This ITCase class tests the behavior of {@link EventTimeExtension}. */ +class EventTimeExtensionITCase implements Serializable { + private ExecutionEnvironment env; + private List> inputRecords = + List.of(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), Tuple2.of(3L, "c")); + private List inputEventTimes = + inputRecords.stream().map(x -> x.f0).collect(Collectors.toList()); + + @BeforeEach + void before() throws Exception { + env = ExecutionEnvironment.getInstance(); + } + + @AfterEach + void after() { + // one input + TestOneInputStreamProcessFunction.receivedRecords.clear(); + TestOneInputStreamProcessFunction.receivedEventTimes.clear(); + TestOneInputEventTimeStreamProcessFunction.receivedRecords.clear(); + TestOneInputEventTimeStreamProcessFunction.receivedEventTimes.clear(); + TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes.clear(); + + // two output + TestTwoOutputStreamProcessFunction.receivedRecords.clear(); + TestTwoOutputStreamProcessFunction.receivedEventTimes.clear(); + TestTwoOutputEventTimeStreamProcessFunction.receivedRecords.clear(); + TestTwoOutputEventTimeStreamProcessFunction.receivedEventTimes.clear(); + TestTwoOutputEventTimeStreamProcessFunction.invokedTimerTimes.clear(); + + // two input broadcast + TestTwoInputBroadcastStreamProcessFunction.receivedRecords.clear(); + TestTwoInputBroadcastStreamProcessFunction.receivedEventTimes.clear(); + TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedRecords.clear(); + TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedEventTimes.clear(); + + // two input non-broadcast + TestTwoInputNonBroadcastStreamProcessFunction.receivedRecords.clear(); + TestTwoInputNonBroadcastStreamProcessFunction.receivedEventTimes.clear(); + TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedRecords.clear(); + TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedEventTimes.clear(); + TestTwoInputNonBroadcastEventTimeStreamProcessFunction.invokedTimerTimes.clear(); + } + + @Test + void testWatermarkGeneratorGenerateEventTimeWatermark() throws Exception { + NonKeyedPartitionStream> source = getSourceWithWatermarkGenerator(); + source.process(new TestOneInputStreamProcessFunction(true)); + env.execute("testWatermarkGeneratorGenerateEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + // ============== test one input ================= + + @Test + void testOneInputProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream> source = getSourceWithWatermarkGenerator(); + source.process(new TestOneInputStreamProcessFunction(false)) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testOneInputProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testOneInputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer() + throws Exception { + NonKeyedPartitionStream> source = getSourceWithWatermarkGenerator(); + source.keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(true))); + env.execute( + "testOneInputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer"); + + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes) + .containsExactlyElementsOf( + inputEventTimes.stream().map(x -> x + 1).collect(Collectors.toList())); + } + + @Test + void testOneInputEventTimeProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream> source = getSourceWithWatermarkGenerator(); + source.process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(false))) + .keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(true))); + env.execute("testOneInputEventTimeProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes) + .containsExactlyElementsOf( + inputEventTimes.stream().map(x -> x + 1).collect(Collectors.toList())); + } + + // ============== test two output ================= + + @Test + void testTwoOutputProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream> source = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream< + Tuple2, Tuple2> + twoOutputStream = source.process(new TestTwoOutputStreamProcessFunction(false)); + twoOutputStream.getFirst().process(new TestOneInputStreamProcessFunction(true)); + twoOutputStream + .getFirst() + .keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(true))); + env.execute("testTwoOutputProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testTwoOutputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer() + throws Exception { + NonKeyedPartitionStream> source = getSourceWithWatermarkGenerator(); + source.keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestTwoOutputEventTimeStreamProcessFunction(true))); + env.execute( + "testTwoOutputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer"); + + assertThat(TestTwoOutputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestTwoOutputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestTwoOutputEventTimeStreamProcessFunction.invokedTimerTimes) + .containsExactlyElementsOf( + inputEventTimes.stream().map(x -> x + 1).collect(Collectors.toList())); + } + + @Test + void testTwoOutputEventTimeProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream> source = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream< + Tuple2, Tuple2> + twoOutputStream = + source.process( + EventTimeExtension.wrapProcessFunction( + new TestTwoOutputEventTimeStreamProcessFunction(false))); + twoOutputStream.getFirst().process(new TestOneInputStreamProcessFunction(true)); + twoOutputStream + .getFirst() + .keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(true))); + env.execute("testTwoOutputEventTimeProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + // ============== test two input broadcast ================= + + @Test + void testTwoInputBroadcastProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream> source2 = getSourceWithWatermarkGenerator(); + source1.broadcast() + .connectAndProcess(source2, new TestTwoInputBroadcastStreamProcessFunction(false)) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testTwoInputBroadcastProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testTwoInputBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer() + throws Exception { + NonKeyedPartitionStream> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream> source2 = getSourceWithWatermarkGenerator(); + source1.broadcast() + .connectAndProcess( + source2, + EventTimeExtension.wrapProcessFunction( + new TestTwoInputBroadcastEventTimeStreamProcessFunction(true))); + env.execute( + "testTwoInputBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer"); + + assertThat(TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testTwoInputBroadcastEventTimeProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream> source2 = getSourceWithWatermarkGenerator(); + source1.broadcast() + .connectAndProcess( + source2, + EventTimeExtension.wrapProcessFunction( + new TestTwoInputBroadcastEventTimeStreamProcessFunction(false))) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testTwoInputBroadcastEventTimeProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + // ============== test two input non-broadcast ================= + + @Test + void testTwoInputNonBroadcastProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream> source2 = getSourceWithWatermarkGenerator(); + source1.connectAndProcess(source2, new TestTwoInputNonBroadcastStreamProcessFunction(false)) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testTwoInputNonBroadcastProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testTwoInputNonBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer() + throws Exception { + NonKeyedPartitionStream> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream> source2 = getSourceWithWatermarkGenerator(); + source1.keyBy(x -> x.f0) + .connectAndProcess( + source2.keyBy(x -> x.f0), + EventTimeExtension.wrapProcessFunction( + new TestTwoInputNonBroadcastEventTimeStreamProcessFunction(true))); + env.execute( + "testTwoInputNonBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer"); + + assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.invokedTimerTimes) + .containsExactlyElementsOf( + inputEventTimes.stream().map(x -> x + 1).collect(Collectors.toList())); + } + + @Test + void testTwoInputNonBroadcastEventTimeProcessFunctionForwardEventTimeWatermark() + throws Exception { + NonKeyedPartitionStream> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream> source2 = getSourceWithWatermarkGenerator(); + source1.keyBy(x -> x.f0) + .connectAndProcess( + source2.keyBy(x -> x.f0), + EventTimeExtension.wrapProcessFunction( + new TestTwoInputNonBroadcastEventTimeStreamProcessFunction(false))) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testTwoInputNonBroadcastEventTimeProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + private static class TestOneInputStreamProcessFunction + implements OneInputStreamProcessFunction, Tuple2> { + public static ConcurrentLinkedQueue> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + + public TestOneInputStreamProcessFunction(boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + + @Override + public void processRecord( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output.collect(record); + } + } + + private static class TestOneInputEventTimeStreamProcessFunction + implements OneInputEventTimeStreamProcessFunction< + Tuple2, Tuple2> { + public static ConcurrentLinkedQueue> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue receivedEventTimes = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue invokedTimerTimes = new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + private EventTimeManager eventTimeManager; + + public TestOneInputEventTimeStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void processRecord( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + eventTimeManager.registerTimer(record.f0 + 1); + } + output.collect(record); + } + + @Override + public void onEventTimeWatermark( + long watermarkTimestamp, + Collector> output, + NonPartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(watermarkTimestamp); + } + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public void onEventTimer( + long timestamp, + Collector> output, + PartitionedContext> ctx) { + if (needCollectReceivedWatermarkAndRecord) { + invokedTimerTimes.add(timestamp); + } + } + } + + private static class TestTwoOutputStreamProcessFunction + implements TwoOutputStreamProcessFunction< + Tuple2, Tuple2, Tuple2> { + public static ConcurrentLinkedQueue> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + + public TestTwoOutputStreamProcessFunction(boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector> output1, + Collector> output2, + TwoOutputNonPartitionedContext, Tuple2> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + + @Override + public void processRecord( + Tuple2 record, + Collector> output1, + Collector> output2, + TwoOutputPartitionedContext, Tuple2> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output1.collect(record); + output2.collect(record); + } + } + + private static class TestTwoOutputEventTimeStreamProcessFunction + implements TwoOutputEventTimeStreamProcessFunction< + Tuple2, Tuple2, Tuple2> { + public static ConcurrentLinkedQueue> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue receivedEventTimes = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue invokedTimerTimes = new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + private EventTimeManager eventTimeManager; + + public TestTwoOutputEventTimeStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void processRecord( + Tuple2 record, + Collector> output1, + Collector> output2, + TwoOutputPartitionedContext, Tuple2> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + eventTimeManager.registerTimer(record.f0 + 1); + } + output1.collect(record); + output2.collect(record); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector> output1, + Collector> output2, + TwoOutputNonPartitionedContext, Tuple2> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public void onEventTimeWatermark( + long watermarkTimestamp, + Collector> output1, + Collector> output2, + TwoOutputNonPartitionedContext, Tuple2> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(watermarkTimestamp); + } + } + + @Override + public void onEventTimer( + long timestamp, + Collector> output1, + Collector> output2, + TwoOutputPartitionedContext, Tuple2> ctx) { + if (needCollectReceivedWatermarkAndRecord) { + invokedTimerTimes.add(timestamp); + } + } + } + + private static class TestTwoInputBroadcastStreamProcessFunction + implements TwoInputBroadcastStreamProcessFunction< + Tuple2, Tuple2, Tuple2> { + public static ConcurrentLinkedQueue> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + + public TestTwoInputBroadcastStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void processRecordFromNonBroadcastInput( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output.collect(record); + } + + @Override + public void processRecordFromBroadcastInput( + Tuple2 record, NonPartitionedContext> ctx) + throws Exception { + // do nothing + } + + @Override + public WatermarkHandlingResult onWatermarkFromBroadcastInput( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + + @Override + public WatermarkHandlingResult onWatermarkFromNonBroadcastInput( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + } + + private static class TestTwoInputBroadcastEventTimeStreamProcessFunction + implements TwoInputBroadcastEventTimeStreamProcessFunction< + Tuple2, Tuple2, Tuple2> { + public static ConcurrentLinkedQueue> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + private EventTimeManager eventTimeManager; + + public TestTwoInputBroadcastEventTimeStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void processRecordFromNonBroadcastInput( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output.collect(record); + } + + @Override + public void processRecordFromBroadcastInput( + Tuple2 record, NonPartitionedContext> ctx) + throws Exception { + // do nothing + } + + @Override + public WatermarkHandlingResult onWatermarkFromBroadcastInput( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public WatermarkHandlingResult onWatermarkFromNonBroadcastInput( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public void onEventTimeWatermark( + long watermarkTimestamp, + Collector> output, + NonPartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(watermarkTimestamp); + } + } + + @Override + public void onEventTimer( + long timestamp, + Collector> output, + PartitionedContext> ctx) { + throw new UnsupportedOperationException("This function shouldn't be invoked."); + } + } + + private static class TestTwoInputNonBroadcastStreamProcessFunction + implements TwoInputNonBroadcastStreamProcessFunction< + Tuple2, Tuple2, Tuple2> { + public static ConcurrentLinkedQueue> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + + public TestTwoInputNonBroadcastStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void processRecordFromFirstInput( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output.collect(record); + } + + @Override + public void processRecordFromSecondInput( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception {} + + @Override + public WatermarkHandlingResult onWatermarkFromFirstInput( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + + @Override + public WatermarkHandlingResult onWatermarkFromSecondInput( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + return WatermarkHandlingResult.PEEK; + } + } + + private static class TestTwoInputNonBroadcastEventTimeStreamProcessFunction + implements TwoInputNonBroadcastEventTimeStreamProcessFunction< + Tuple2, Tuple2, Tuple2> { + public static ConcurrentLinkedQueue> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue receivedEventTimes = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue invokedTimerTimes = new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + private EventTimeManager eventTimeManager; + + public TestTwoInputNonBroadcastEventTimeStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void onEventTimeWatermark( + long watermarkTimestamp, + Collector> output, + NonPartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(watermarkTimestamp); + } + } + + @Override + public void onEventTimer( + long timestamp, + Collector> output, + PartitionedContext> ctx) { + if (needCollectReceivedWatermarkAndRecord) { + invokedTimerTimes.add(timestamp); + } + } + + @Override + public void processRecordFromFirstInput( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + eventTimeManager.registerTimer(record.f0 + 1); + } + output.collect(record); + } + + @Override + public void processRecordFromSecondInput( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + // do nothing + } + + @Override + public WatermarkHandlingResult onWatermarkFromFirstInput( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public WatermarkHandlingResult onWatermarkFromSecondInput( + Watermark watermark, + Collector> output, + NonPartitionedContext> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + } + + private NonKeyedPartitionStream> getSourceWithWatermarkGenerator() { + NonKeyedPartitionStream> source = + env.fromSource(DataStreamV2SourceUtils.fromData(inputRecords), "Source") + .withParallelism(1); + + return source.process( + EventTimeExtension.>newWatermarkGeneratorBuilder( + event -> event.f0) + .perEventWatermark() + .buildAsProcessFunction()); + } +}