Skip to content

Commit

Permalink
[FLINK-36927][table] Introduce slicing & one stage Window Aggregate w…
Browse files Browse the repository at this point in the history
…ith Async State API

This closes apache#25861
  • Loading branch information
xuyangzhong authored Jan 17, 2025
1 parent 4be6d69 commit 8134e33
Show file tree
Hide file tree
Showing 45 changed files with 2,916 additions and 954 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.table.planner.plan.logical;

import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingSyncStateWindowProcessor;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -31,7 +31,7 @@
* A windowing strategy that gets windows from input columns as window slice have been assigned and
* attached to the physical columns. The window slice is usually identified by slice end timestamp.
*
* @see SlicingWindowProcessor for more details about which windows can apply slicing.
* @see SlicingSyncStateWindowProcessor for more details about which windows can apply slicing.
*/
@JsonTypeName("SliceAttached")
public class SliceAttachedWindowingStrategy extends WindowingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
Expand All @@ -39,6 +40,7 @@
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.WindowUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
Expand Down Expand Up @@ -198,7 +200,7 @@ protected Transformation<RowData> translateToPlanInternal(
InternalTypeInfo.of(inputRowType));
final LogicalType[] accTypes = convertToLogicalTypes(aggInfoList.getAccTypes());

final OneInputStreamOperator<RowData, RowData> windowOperator =
final WindowAggOperatorBuilder windowAggOperatorBuilder =
WindowAggOperatorBuilder.builder()
.inputSerializer(new RowDataSerializer(inputRowType))
.shiftTimeZone(shiftTimeZone)
Expand All @@ -207,8 +209,20 @@ protected Transformation<RowData> translateToPlanInternal(
selector.getProducedType().toSerializer())
.assigner(windowAssigner)
.countStarIndex(aggInfoList.getIndexOfCountStar())
.aggregate(generatedAggsHandler, new RowDataSerializer(accTypes))
.build();
.aggregate(generatedAggsHandler, new RowDataSerializer(accTypes));

if (WindowUtil.isAsyncStateEnabled(config, windowAssigner, aggInfoList)) {
windowAggOperatorBuilder
.enableAsyncState()
.generatedKeyEqualiser(
new EqualiserCodeGenerator(
selector.getProducedType().toRowType(),
planner.getFlinkContext().getClassLoader())
.generateRecordEqualiser("WindowKeyEqualiser"));
}

final OneInputStreamOperator<RowData, RowData> windowOperator =
windowAggOperatorBuilder.build();

final OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
*/
package org.apache.flink.table.planner.plan.utils

import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.table.api.{DataTypes, TableConfig, TableException, ValidationException}
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.JBigDecimal
import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall}
import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlWindowTableFunction}
import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
import org.apache.flink.table.planner.plan.logical._
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalMatch, FlinkLogicalOverAggregate, FlinkLogicalRank, FlinkLogicalTableFunctionScan}
import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.runtime.groupwindow._
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
import org.apache.flink.table.types.logical.TimestampType
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType
Expand Down Expand Up @@ -343,6 +347,21 @@ object WindowUtil {
}
}

def isAsyncStateEnabled(
config: ReadableConfig,
windowAssigner: WindowAssigner,
aggInfoList: AggregateInfoList): Boolean = {
if (!config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) {
return false
}
// currently, unsliced assigner does not support async state
if (!windowAssigner.isInstanceOf[SliceAssigner]) {
return false
}

AggregateUtil.isAsyncStateEnabled(config, aggInfoList)
}

// ------------------------------------------------------------------------------------------
// Private Helpers
// ------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 8134e33

Please sign in to comment.