Skip to content

Commit

Permalink
[FLINK-35795][API] Introduce the framework of ProcessFunction Attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup committed Sep 2, 2024
1 parent 2f93b79 commit 46523bb
Show file tree
Hide file tree
Showing 19 changed files with 545 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.attribute;

import org.apache.flink.annotation.Internal;

import java.io.Serializable;

/** {@link Attribute} contains the information about the process logic of a process function. */
@Internal
public class Attribute implements Serializable {

private boolean isNoOutputUntilEndOfInput;

private Attribute(Builder builder) {
this.isNoOutputUntilEndOfInput = builder.isNoOutputUntilEndOfInput;
}

public boolean isNoOutputUntilEndOfInput() {
return isNoOutputUntilEndOfInput;
}

public void setNoOutputUntilEndOfInput(boolean noOutputUntilEndOfInput) {
isNoOutputUntilEndOfInput = noOutputUntilEndOfInput;
}

@Internal
public static class Builder {

private boolean isNoOutputUntilEndOfInput = false;

public Builder setNoOutputUntilEndOfInput(boolean isNoOutputUntilEndOfInput) {
this.isNoOutputUntilEndOfInput = isNoOutputUntilEndOfInput;
return this;
}

public Attribute build() {
return new Attribute(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.attribute.Attribute;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.SlotSharingGroup;
Expand Down Expand Up @@ -188,6 +189,8 @@ public static int getNewNodeId() {

@Nullable private String coLocationGroupKey;

private Attribute attribute = new Attribute.Builder().build();

/**
* Creates a new {@code Transformation} with the given name, output type and parallelism.
*
Expand Down Expand Up @@ -649,4 +652,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(id, name, outputType, parallelism, bufferTimeout);
}

public void setAttribute(Attribute attribute) {
this.attribute = attribute;
}

public Attribute getAttribute() {
return attribute;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.attribute;

import org.apache.flink.annotation.Experimental;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* {@link NoOutputUntilEndOfInput} indicates that the process function will only output records
* after all inputs are ended. If this annotation is applied to a process function with an unbounded
* source, a compilation error will occur.
*/
@Experimental
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NoOutputUntilEndOfInput {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.impl.attribute;

import org.apache.flink.api.common.attribute.Attribute;
import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
import org.apache.flink.datastream.api.function.ProcessFunction;

/** {@link AttributeParser} is used to parse {@link Attribute} from {@link ProcessFunction}. */
public class AttributeParser {

public static Attribute parseAttribute(ProcessFunction function) {
Class<? extends ProcessFunction> functionClass = function.getClass();
Attribute.Builder attributeBuilder = new Attribute.Builder();
if (functionClass.isAnnotationPresent(NoOutputUntilEndOfInput.class)) {
attributeBuilder.setNoOutputUntilEndOfInput(true);
}
return attributeBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
import org.apache.flink.datastream.impl.attribute.AttributeParser;
import org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator;
import org.apache.flink.datastream.impl.operators.TwoInputBroadcastProcessOperator;
import org.apache.flink.datastream.impl.utils.StreamUtils;
Expand Down Expand Up @@ -77,6 +78,7 @@ public <K, T_OTHER, OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> conn
this,
outTypeInfo,
processOperator);
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(outTransformation);
return StreamUtils.wrapWithConfigureHandle(
new NonKeyedPartitionStreamImpl<>(environment, outTransformation));
Expand Down Expand Up @@ -105,6 +107,7 @@ public <T_OTHER, OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> connect
this,
outTypeInfo,
processOperator);
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(outTransformation);
return StreamUtils.wrapWithConfigureHandle(
new NonKeyedPartitionStreamImpl<>(environment, outTransformation));
Expand Down Expand Up @@ -133,6 +136,7 @@ public <K, T_OTHER, OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> conn

NonKeyedPartitionStreamImpl<OUT> outputStream =
new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(outTransformation);
// Construct a keyed stream directly without partitionTransformation to avoid shuffle.
return StreamUtils.wrapWithConfigureHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.datastream.impl.stream;

import org.apache.flink.api.common.attribute.Attribute;
import org.apache.flink.api.common.state.StateDeclaration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
Expand All @@ -33,6 +34,7 @@
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;
import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
import org.apache.flink.datastream.impl.attribute.AttributeParser;
import org.apache.flink.datastream.impl.operators.ProcessOperator;
import org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator;
import org.apache.flink.datastream.impl.operators.TwoOutputProcessOperator;
Expand Down Expand Up @@ -69,7 +71,12 @@ public <OUT> ProcessConfigurableAndGlobalStream<OUT> process(
TypeInformation<OUT> outType =
StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType());
ProcessOperator<T, OUT> operator = new ProcessOperator<>(processFunction);
return StreamUtils.wrapWithConfigureHandle(transform("Global Process", outType, operator));
return StreamUtils.wrapWithConfigureHandle(
transform(
"Global Process",
outType,
operator,
AttributeParser.parseAttribute(processFunction)));
}

@Override
Expand All @@ -89,7 +96,11 @@ public <OUT1, OUT2> TwoGlobalStreams<OUT1, OUT2> process(
TwoOutputProcessOperator<T, OUT1, OUT2> operator =
new TwoOutputProcessOperator<>(processFunction, secondOutputTag);
GlobalStreamImpl<OUT1> firstStream =
transform("Two-Output-Operator", firstOutputType, operator);
transform(
"Two-Output-Operator",
firstOutputType,
operator,
AttributeParser.parseAttribute(processFunction));
GlobalStreamImpl<OUT2> secondStream =
new GlobalStreamImpl<>(
environment, firstStream.getSideOutputTransform(secondOutputTag));
Expand Down Expand Up @@ -122,6 +133,7 @@ public <T_OTHER, OUT> ProcessConfigurableAndGlobalStream<OUT> connectAndProcess(
// Operator parallelism should always be 1 for global stream.
// parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler.
outTransformation.setParallelism(1, true);
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(outTransformation);
return StreamUtils.wrapWithConfigureHandle(
new GlobalStreamImpl<>(environment, outTransformation));
Expand Down Expand Up @@ -162,7 +174,8 @@ public BroadcastStream<T> broadcast() {
private <R> GlobalStreamImpl<R> transform(
String operatorName,
TypeInformation<R> outputTypeInfo,
OneInputStreamOperator<T, R> operator) {
OneInputStreamOperator<T, R> operator,
Attribute attribute) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();

Expand All @@ -177,7 +190,7 @@ private <R> GlobalStreamImpl<R> transform(
// parallelismConfigured should be true to avoid overwritten by
// AdaptiveBatchScheduler.
true);

resultTransform.setAttribute(attribute);
GlobalStreamImpl<R> returnStream = new GlobalStreamImpl<>(environment, resultTransform);

environment.addOperator(resultTransform);
Expand All @@ -191,17 +204,17 @@ private static class TwoGlobalStreamsImpl<OUT1, OUT2> implements TwoGlobalStream

private final GlobalStreamImpl<OUT2> secondStream;

public static <OUT1, OUT2> TwoGlobalStreamsImpl<OUT1, OUT2> of(
GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) {
return new TwoGlobalStreamsImpl<>(firstStream, secondStream);
}

private TwoGlobalStreamsImpl(
GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) {
this.firstStream = firstStream;
this.secondStream = secondStream;
}

public static <OUT1, OUT2> TwoGlobalStreamsImpl<OUT1, OUT2> of(
GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) {
return new TwoGlobalStreamsImpl<>(firstStream, secondStream);
}

@Override
public ProcessConfigurableAndGlobalStream<OUT1> getFirst() {
return StreamUtils.wrapWithConfigureHandle(firstStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@
import org.apache.flink.datastream.api.stream.GlobalStream;
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;
import org.apache.flink.datastream.impl.attribute.AttributeParser;
import org.apache.flink.datastream.impl.operators.KeyedProcessOperator;
import org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator;
import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator;
Expand Down Expand Up @@ -118,6 +119,7 @@ public <OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> process(
Transformation<OUT> transform =
StreamUtils.getOneInputKeyedTransformation(
"KeyedProcess", this, outType, operator, keySelector, keyType);
transform.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(transform);
return StreamUtils.wrapWithConfigureHandle(
new NonKeyedPartitionStreamImpl<>(environment, transform));
Expand All @@ -141,6 +143,7 @@ public <OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> process(
"KeyedProcess", this, outType, operator, keySelector, keyType);
NonKeyedPartitionStreamImpl<OUT> outputStream =
new NonKeyedPartitionStreamImpl<>(environment, transform);
transform.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(transform);
// Construct a keyed stream directly without partitionTransformation to avoid shuffle.
return StreamUtils.wrapWithConfigureHandle(
Expand Down Expand Up @@ -178,6 +181,7 @@ public <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
operator,
keySelector,
keyType);
mainOutputTransform.setAttribute(AttributeParser.parseAttribute(processFunction));
NonKeyedPartitionStreamImpl<OUT1> nonKeyedMainOutputStream =
new NonKeyedPartitionStreamImpl<>(environment, mainOutputTransform);
Transformation<OUT2> sideOutputTransform =
Expand Down Expand Up @@ -228,6 +232,7 @@ public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<V, OUT1, OU
operator,
keySelector,
keyType);
firstTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
NonKeyedPartitionStreamImpl<OUT1> firstStream =
new NonKeyedPartitionStreamImpl<>(environment, firstTransformation);
NonKeyedPartitionStreamImpl<OUT2> secondStream =
Expand Down Expand Up @@ -266,6 +271,7 @@ public <T_OTHER, OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> connect
(KeyedPartitionStreamImpl<K, T_OTHER>) other,
outTypeInfo,
processOperator);
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(outTransformation);
return StreamUtils.wrapWithConfigureHandle(
new NonKeyedPartitionStreamImpl<>(environment, outTransformation));
Expand Down Expand Up @@ -300,6 +306,7 @@ public <T_OTHER, OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> connect
(KeyedPartitionStreamImpl<K, T_OTHER>) other,
outTypeInfo,
processOperator);
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
NonKeyedPartitionStreamImpl<OUT> nonKeyedOutputStream =
new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
environment.addOperator(outTransformation);
Expand Down Expand Up @@ -335,6 +342,7 @@ public <T_OTHER, OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> connect
(BroadcastStreamImpl<T_OTHER>) other,
outTypeInfo,
processOperator);
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(outTransformation);
return StreamUtils.wrapWithConfigureHandle(
new NonKeyedPartitionStreamImpl<>(environment, outTransformation));
Expand Down Expand Up @@ -367,6 +375,7 @@ public <T_OTHER, OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> connect

NonKeyedPartitionStreamImpl<OUT> outputStream =
new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
environment.addOperator(outTransformation);
// Construct a keyed stream directly without partitionTransformation to avoid shuffle.
return StreamUtils.wrapWithConfigureHandle(
Expand Down
Loading

0 comments on commit 46523bb

Please sign in to comment.