Skip to content

Commit

Permalink
[FLINK-37136] save
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup committed Jan 17, 2025
1 parent 8134e33 commit 0328fb3
Show file tree
Hide file tree
Showing 40 changed files with 6,504 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.extension.join.JoinFunction;
import org.apache.flink.datastream.api.extension.join.JoinType;
import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
import org.apache.flink.datastream.api.extension.window.function.TwoInputNonBroadcastWindowStreamProcessFunction;
import org.apache.flink.datastream.api.extension.window.function.TwoOutputWindowStreamProcessFunction;
import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;

Expand Down Expand Up @@ -121,4 +127,89 @@ public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
joinFunction,
joinType);
}

// =================== Window ===========================

static final Class<?> WINDOW_FUNCS_INSTANCE;

static {
try {
WINDOW_FUNCS_INSTANCE =
Class.forName("org.apache.flink.datastream.impl.builtin.BuiltinWindowFuncs");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Please ensure that flink-datastream in your class path");
}
}

/**
* Wrap the WindowStrategy and OneInputWindowStreamProcessFunction within a
* OneInputStreamProcessFunction to perform the window operation.
*
* @param windowStrategy the window strategy
* @param windowProcessFunction the window process function
* @return the wrapped process function
*/
public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> window(
WindowStrategy windowStrategy,
OneInputWindowStreamProcessFunction<IN, OUT> windowProcessFunction) {
try {
return (OneInputStreamProcessFunction<IN, OUT>)
WINDOW_FUNCS_INSTANCE
.getMethod(
"window",
WindowStrategy.class,
OneInputWindowStreamProcessFunction.class)
.invoke(null, windowStrategy, windowProcessFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Wrap the WindowStrategy and TwoInputNonBroadcastWindowStreamProcessFunction within a
* TwoInputNonBroadcastStreamProcessFunction to perform the window operation.
*
* @param windowStrategy the window strategy
* @param windowProcessFunction the window process function
* @return the wrapped process function
*/
public static <IN1, IN2, OUT> TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> window(
WindowStrategy windowStrategy,
TwoInputNonBroadcastWindowStreamProcessFunction<IN1, IN2, OUT> windowProcessFunction) {
try {
return (TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>)
WINDOW_FUNCS_INSTANCE
.getMethod(
"window",
WindowStrategy.class,
TwoInputNonBroadcastWindowStreamProcessFunction.class)
.invoke(null, windowStrategy, windowProcessFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Wrap the WindowStrategy and TwoOutputWindowStreamProcessFunction within a
* TwoOutputStreamProcessFunction to perform the window operation.
*
* @param windowStrategy the window strategy
* @param windowProcessFunction the window process function
* @return the wrapped process function
*/
public static <IN, OUT1, OUT2> TwoOutputStreamProcessFunction<IN, OUT1, OUT2> window(
WindowStrategy windowStrategy,
TwoOutputWindowStreamProcessFunction<IN, OUT1, OUT2> windowProcessFunction) {
try {
return (TwoOutputStreamProcessFunction<IN, OUT1, OUT2>)
WINDOW_FUNCS_INSTANCE
.getMethod(
"window",
WindowStrategy.class,
TwoOutputWindowStreamProcessFunction.class)
.invoke(null, windowStrategy, windowProcessFunction);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.window.context;

import org.apache.flink.annotation.Experimental;

/**
* This interface extends {@link WindowContext} and provides additional functionality for writing
* and reading window data of one input window.
*
* @param <IN> Type of the input elements
*/
@Experimental
public interface OneInputWindowContext<IN> extends WindowContext {

/**
* Write records into the window's state.
*
* @param record The record to be written into the window's state.
*/
void putRecord(IN record);

/**
* Read records from the window's state.
*
* @return Iterable of records, which could be null if the window is empty.
*/
Iterable<IN> getAllRecords();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.window.context;

import org.apache.flink.annotation.Experimental;

/**
* This interface extends {@link WindowContext} and provides additional functionality for writing
* and reading window data of two input window.
*/
@Experimental
public interface TwoInputWindowContext<IN1, IN2> extends WindowContext {

/**
* Write records from input1 into the window's state.
*
* @param record The record from input1 to be written into the window's state.
*/
void putRecord1(IN1 record);

/**
* Read input1's records from the window's state.
*
* @return Iterable of input1's records, which could be null if input1 has no data.
*/
Iterable<IN1> getAllRecords1();

/**
* Write records from input2 into the window's state.
*
* @param record The record from input2 to be written into the window's state.
*/
void putRecord2(IN2 record);

/**
* Read input2's records from the window's state.
*
* @return Iterable of input2's records, which could be null if input2 has no data.
*/
Iterable<IN2> getAllRecords2();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.window.context;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.state.AggregatingStateDeclaration;
import org.apache.flink.api.common.state.ListStateDeclaration;
import org.apache.flink.api.common.state.MapStateDeclaration;
import org.apache.flink.api.common.state.ReducingStateDeclaration;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.state.v2.AggregatingState;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.ValueState;

import java.util.Optional;

/**
* This interface represents a context for window operations and provides methods to interact with
* state that is scoped to the window.
*/
@Experimental
public interface WindowContext {

/**
* Gets the starting timestamp of the window. This is the first timestamp that belongs to this
* window.
*
* @return The starting timestamp of this window, or -1 if the window is not a time window or a
* session window.
*/
long getStartTime();

/**
* Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the
* first timestamp that does not belong to this window anymore.
*
* @return The exclusive end timestamp of this window, or -1 if the window is a session window
* or not a time window.
*/
long getEndTime();

/**
* Retrieves a {@link ListState} object that can be used to interact with fault-tolerant state
* that is scoped to the window.
*/
<T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> stateDeclaration)
throws Exception;

/**
* Retrieves a {@link MapState} object that can be used to interact with fault-tolerant state
* that is scoped to the window.
*/
<KEY, V> Optional<MapState<KEY, V>> getWindowState(MapStateDeclaration<KEY, V> stateDeclaration)
throws Exception;

/**
* Retrieves a {@link ValueState} object that can be used to interact with fault-tolerant state
* that is scoped to the window.
*/
<T> Optional<ValueState<T>> getWindowState(ValueStateDeclaration<T> stateDeclaration)
throws Exception;

/**
* Retrieves a {@link ReducingState} object that can be used to interact with fault-tolerant
* state that is scoped to the window.
*/
<T> Optional<ReducingState<T>> getWindowState(ReducingStateDeclaration<T> stateDeclaration)
throws Exception;

/**
* Retrieves a {@link AggregatingState} object that can be used to interact with fault-tolerant
* state that is scoped to the window.
*/
<T, ACC, OUT> Optional<AggregatingState<T, OUT>> getWindowState(
AggregatingStateDeclaration<T, ACC, OUT> stateDeclaration) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.extension.window.function;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;

/**
* A type of {@link WindowProcessFunction} for one-input window processing.
*
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
*/
@Experimental
public interface OneInputWindowStreamProcessFunction<IN, OUT> extends WindowProcessFunction {

/**
* This method will be invoked when a record is received. Its default behavior is to store data
* in built-in window state by {@code WindowContext#putRecord}. If the user overrides this
* method, they will need to update the window state as necessary.
*/
default void onRecord(
IN record,
Collector<OUT> output,
PartitionedContext<OUT> ctx,
OneInputWindowContext<IN> windowContext)
throws Exception {
windowContext.putRecord(record);
}

/**
* This method will be invoked when the Window is triggered, you can obtain all the input
* records in the Window by {@link OneInputWindowContext#getAllRecords()}.
*/
void onTrigger(
Collector<OUT> output,
PartitionedContext<OUT> ctx,
OneInputWindowContext<IN> windowContext)
throws Exception;

/**
* Callback when a window is about to be cleaned up. It is the time to deletes any state in the
* {@code windowContext} when the Window expires (the event time or processing time passes its
* {@code maxTimestamp} + {@code allowedLateness}).
*/
default void onClear(
Collector<OUT> output,
PartitionedContext<OUT> ctx,
OneInputWindowContext<IN> windowContext)
throws Exception {}

/** This method will be invoked when a record is received after the window has been cleaned. */
default void onLateRecord(IN record, Collector<OUT> output, PartitionedContext<OUT> ctx)
throws Exception {}
}
Loading

0 comments on commit 0328fb3

Please sign in to comment.