diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/attribute/Attribute.java b/flink-core-api/src/main/java/org/apache/flink/api/common/attribute/Attribute.java new file mode 100644 index 00000000000000..c1a463e868ca37 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/attribute/Attribute.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.attribute; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** {@link Attribute} contains the information about the process logic of a process function. */ +@Internal +public class Attribute implements Serializable { + + private boolean isNoOutputUntilEndOfInput; + + private Attribute(Builder builder) { + this.isNoOutputUntilEndOfInput = builder.isNoOutputUntilEndOfInput; + } + + public boolean isNoOutputUntilEndOfInput() { + return isNoOutputUntilEndOfInput; + } + + public void setNoOutputUntilEndOfInput(boolean noOutputUntilEndOfInput) { + isNoOutputUntilEndOfInput = noOutputUntilEndOfInput; + } + + @Internal + public static class Builder { + + private boolean isNoOutputUntilEndOfInput = false; + + public Builder setNoOutputUntilEndOfInput(boolean isNoOutputUntilEndOfInput) { + this.isNoOutputUntilEndOfInput = isNoOutputUntilEndOfInput; + return this; + } + + public Attribute build() { + return new Attribute(this); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java index b38f52c20df57d..804638c30fd4b4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.SlotSharingGroup; @@ -188,6 +189,8 @@ public static int getNewNodeId() { @Nullable private String coLocationGroupKey; + private Attribute attribute = new Attribute.Builder().build(); + /** * Creates a new {@code Transformation} with the given name, output type and parallelism. * @@ -649,4 +652,12 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(id, name, outputType, parallelism, bufferTimeout); } + + public void setAttribute(Attribute attribute) { + this.attribute = attribute; + } + + public Attribute getAttribute() { + return attribute; + } } diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java new file mode 100644 index 00000000000000..63adbfa4b6f1dd --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.attribute; + +import org.apache.flink.annotation.Experimental; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * {@link NoOutputUntilEndOfInput} indicates that the process function will only output records + * after all inputs are ended. If this annotation is applied to a process function with an unbounded + * source, a compilation error will occur. + */ +@Experimental +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface NoOutputUntilEndOfInput {} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java new file mode 100644 index 00000000000000..cfc1d21f10175a --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.common.attribute.Attribute; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.function.ProcessFunction; + +/** {@link AttributeParser} is used to parse {@link Attribute} from {@link ProcessFunction}. */ +public class AttributeParser { + + public static Attribute parseAttribute(ProcessFunction function) { + Class functionClass = function.getClass(); + Attribute.Builder attributeBuilder = new Attribute.Builder(); + if (functionClass.isAnnotationPresent(NoOutputUntilEndOfInput.class)) { + attributeBuilder.setNoOutputUntilEndOfInput(true); + } + return attributeBuilder.build(); + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java index defd870be68117..8b02eabdaf36e1 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java @@ -30,6 +30,7 @@ import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.attribute.AttributeParser; import org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.TwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.utils.StreamUtils; @@ -77,6 +78,7 @@ public ProcessConfigurableAndNonKeyedPartitionStream conn this, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -105,6 +107,7 @@ public ProcessConfigurableAndNonKeyedPartitionStream connect this, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -133,6 +136,7 @@ public ProcessConfigurableAndKeyedPartitionStream conn NonKeyedPartitionStreamImpl outputStream = new NonKeyedPartitionStreamImpl<>(environment, outTransformation); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); // Construct a keyed stream directly without partitionTransformation to avoid shuffle. return StreamUtils.wrapWithConfigureHandle( diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java index 8dddc935318623..7ec2e08465119f 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.datastream.impl.stream; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.dsv2.Sink; @@ -33,6 +34,7 @@ import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.attribute.AttributeParser; import org.apache.flink.datastream.impl.operators.ProcessOperator; import org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.TwoOutputProcessOperator; @@ -69,7 +71,12 @@ public ProcessConfigurableAndGlobalStream process( TypeInformation outType = StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType()); ProcessOperator operator = new ProcessOperator<>(processFunction); - return StreamUtils.wrapWithConfigureHandle(transform("Global Process", outType, operator)); + return StreamUtils.wrapWithConfigureHandle( + transform( + "Global Process", + outType, + operator, + AttributeParser.parseAttribute(processFunction))); } @Override @@ -89,7 +96,11 @@ public TwoGlobalStreams process( TwoOutputProcessOperator operator = new TwoOutputProcessOperator<>(processFunction, secondOutputTag); GlobalStreamImpl firstStream = - transform("Two-Output-Operator", firstOutputType, operator); + transform( + "Two-Output-Operator", + firstOutputType, + operator, + AttributeParser.parseAttribute(processFunction)); GlobalStreamImpl secondStream = new GlobalStreamImpl<>( environment, firstStream.getSideOutputTransform(secondOutputTag)); @@ -122,6 +133,7 @@ public ProcessConfigurableAndGlobalStream connectAndProcess( // Operator parallelism should always be 1 for global stream. // parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler. outTransformation.setParallelism(1, true); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new GlobalStreamImpl<>(environment, outTransformation)); @@ -162,7 +174,8 @@ public BroadcastStream broadcast() { private GlobalStreamImpl transform( String operatorName, TypeInformation outputTypeInfo, - OneInputStreamOperator operator) { + OneInputStreamOperator operator, + Attribute attribute) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); @@ -177,7 +190,7 @@ private GlobalStreamImpl transform( // parallelismConfigured should be true to avoid overwritten by // AdaptiveBatchScheduler. true); - + resultTransform.setAttribute(attribute); GlobalStreamImpl returnStream = new GlobalStreamImpl<>(environment, resultTransform); environment.addOperator(resultTransform); @@ -191,17 +204,17 @@ private static class TwoGlobalStreamsImpl implements TwoGlobalStream private final GlobalStreamImpl secondStream; - public static TwoGlobalStreamsImpl of( - GlobalStreamImpl firstStream, GlobalStreamImpl secondStream) { - return new TwoGlobalStreamsImpl<>(firstStream, secondStream); - } - private TwoGlobalStreamsImpl( GlobalStreamImpl firstStream, GlobalStreamImpl secondStream) { this.firstStream = firstStream; this.secondStream = secondStream; } + public static TwoGlobalStreamsImpl of( + GlobalStreamImpl firstStream, GlobalStreamImpl secondStream) { + return new TwoGlobalStreamsImpl<>(firstStream, secondStream); + } + @Override public ProcessConfigurableAndGlobalStream getFirst() { return StreamUtils.wrapWithConfigureHandle(firstStream); diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java index f0834b4de558ea..529110a9ab6464 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java @@ -33,9 +33,10 @@ import org.apache.flink.datastream.api.stream.GlobalStream; import org.apache.flink.datastream.api.stream.KeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; -import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; +import org.apache.flink.datastream.impl.attribute.AttributeParser; import org.apache.flink.datastream.impl.operators.KeyedProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator; @@ -118,6 +119,7 @@ public ProcessConfigurableAndNonKeyedPartitionStream process( Transformation transform = StreamUtils.getOneInputKeyedTransformation( "KeyedProcess", this, outType, operator, keySelector, keyType); + transform.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(transform); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, transform)); @@ -141,6 +143,7 @@ public ProcessConfigurableAndKeyedPartitionStream process( "KeyedProcess", this, outType, operator, keySelector, keyType); NonKeyedPartitionStreamImpl outputStream = new NonKeyedPartitionStreamImpl<>(environment, transform); + transform.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(transform); // Construct a keyed stream directly without partitionTransformation to avoid shuffle. return StreamUtils.wrapWithConfigureHandle( @@ -178,6 +181,7 @@ public TwoKeyedPartitionStreams process( operator, keySelector, keyType); + mainOutputTransform.setAttribute(AttributeParser.parseAttribute(processFunction)); NonKeyedPartitionStreamImpl nonKeyedMainOutputStream = new NonKeyedPartitionStreamImpl<>(environment, mainOutputTransform); Transformation sideOutputTransform = @@ -228,6 +232,7 @@ public ProcessConfigurableAndTwoNonKeyedPartitionStream firstStream = new NonKeyedPartitionStreamImpl<>(environment, firstTransformation); NonKeyedPartitionStreamImpl secondStream = @@ -266,6 +271,7 @@ public ProcessConfigurableAndNonKeyedPartitionStream connect (KeyedPartitionStreamImpl) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -300,6 +306,7 @@ public ProcessConfigurableAndKeyedPartitionStream connect (KeyedPartitionStreamImpl) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); NonKeyedPartitionStreamImpl nonKeyedOutputStream = new NonKeyedPartitionStreamImpl<>(environment, outTransformation); environment.addOperator(outTransformation); @@ -335,6 +342,7 @@ public ProcessConfigurableAndNonKeyedPartitionStream connect (BroadcastStreamImpl) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -367,6 +375,7 @@ public ProcessConfigurableAndKeyedPartitionStream connect NonKeyedPartitionStreamImpl outputStream = new NonKeyedPartitionStreamImpl<>(environment, outTransformation); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); // Construct a keyed stream directly without partitionTransformation to avoid shuffle. return StreamUtils.wrapWithConfigureHandle( diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java index 10d4af6e1bab5c..20f32cfaf5f8b7 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java @@ -34,6 +34,7 @@ import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.attribute.AttributeParser; import org.apache.flink.datastream.impl.operators.ProcessOperator; import org.apache.flink.datastream.impl.operators.TwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator; @@ -75,6 +76,7 @@ public ProcessConfigurableAndNonKeyedPartitionStream process( ProcessOperator operator = new ProcessOperator<>(processFunction); OneInputTransformation outputTransform = StreamUtils.getOneInputTransformation("Process", this, outType, operator); + outputTransform.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outputTransform); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outputTransform)); @@ -101,6 +103,7 @@ public ProcessConfigurableAndTwoNonKeyedPartitionStream outTransformation = StreamUtils.getOneInputTransformation( "Two-Output-Operator", this, firstOutputType, operator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); NonKeyedPartitionStreamImpl firstStream = new NonKeyedPartitionStreamImpl<>(environment, outTransformation); NonKeyedPartitionStreamImpl secondStream = @@ -141,6 +144,7 @@ public ProcessConfigurableAndNonKeyedPartitionStream connect (NonKeyedPartitionStreamImpl) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -170,6 +174,7 @@ public ProcessConfigurableAndNonKeyedPartitionStream connect (BroadcastStreamImpl) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableTwoNonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableTwoNonKeyedPartitionStreamImpl.java index 672b6ca4123013..46e9a5e728e036 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableTwoNonKeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableTwoNonKeyedPartitionStreamImpl.java @@ -19,8 +19,8 @@ package org.apache.flink.datastream.impl.stream; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; import org.apache.flink.datastream.impl.utils.StreamUtils; diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java new file mode 100644 index 00000000000000..e29240e85e8f6d --- /dev/null +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask2()) + .withParallelism(2) + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream< + Integer, Integer, Integer> + twoOutputStream = + source.process(new TestMapTask2()) + .withParallelism(2) + .process(new TestTwoOutputProcessFunction()) + .withParallelism(2); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream firstStream = + twoOutputStream.getFirst(); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream + secondStream = twoOutputStream.getSecond(); + firstStream + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + secondStream + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Process -> Two-Output-Operator -> (Process, Process)"), + ResultPartitionType.BLOCKING); + assertThat( + vertexMap + .get("Process -> Two-Output-Operator -> (Process, Process)") + .isAnyOutputBlocking()) + .isTrue(); + } + + @Test + void testNoOutputUntilEndOfInputWithoutOperatorChain() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false); + NonKeyedPartitionStream source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(4); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType(vertexMap.get("KeyedProcess"), ResultPartitionType.BLOCKING); + assertHasOutputPartitionType( + vertexMap.get("Process"), ResultPartitionType.PIPELINED_BOUNDED); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Process").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + private void assertHasOutputPartitionType( + JobVertex jobVertex, ResultPartitionType partitionType) { + assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType); + } + + private void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) { + assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize); + } + + @NoOutputUntilEndOfInput + private static class TestMapTask1 implements OneInputStreamProcessFunction { + + @Override + public void processRecord( + Integer record, Collector output, PartitionedContext ctx) { + output.collect(record + 1); + } + } + + private static class TestMapTask2 implements OneInputStreamProcessFunction { + + @Override + public void processRecord( + Integer record, Collector output, PartitionedContext ctx) { + if (record != 2) { + output.collect(record + 1); + } + } + } + + /** The test {@link TwoOutputStreamProcessFunction}. */ + private static class TestTwoOutputProcessFunction + implements TwoOutputStreamProcessFunction { + + @Override + public void processRecord( + Integer record, + Collector output1, + Collector output2, + PartitionedContext ctx) { + output1.collect(record + 1); + output2.collect(record - 1); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index e56ffdbf10ab1c..aa466e16b39485 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.CheckpointingOptions; @@ -119,6 +120,9 @@ public class StreamConfig implements Serializable { private static final String TIME_CHARACTERISTIC = "timechar"; private static final String MANAGED_MEMORY_FRACTION_PREFIX = "managedMemFraction."; + + private static final String ATTRIBUTE = "attribute"; + private static final ConfigOption STATE_BACKEND_USE_MANAGED_MEMORY = ConfigOptions.key("statebackend.useManagedMemory") .booleanType() @@ -800,6 +804,20 @@ public boolean isGraphContainingLoops() { return config.getBoolean(GRAPH_CONTAINING_LOOPS, false); } + public void setAttribute(Attribute attribute) { + if (attribute != null) { + toBeSerializedConfigObjects.put(ATTRIBUTE, attribute); + } + } + + public Attribute getAttribute(ClassLoader cl) { + try { + return InstantiationUtil.readObjectFromConfig(this.config, ATTRIBUTE, cl); + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate checkpoint storage.", e); + } + } + /** * In general, we don't clear any configuration. However, the {@link #SERIALIZED_UDF} may be * very large when operator includes some large objects, the SERIALIZED_UDF is used to create a diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index fc794022bf8c27..0823ed29e32196 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; @@ -1085,4 +1086,10 @@ public void setSupportsConcurrentExecutionAttempts( streamNode.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts); } } + + public void setAttribute(Integer vertexId, Attribute attribute) { + if (getStreamNode(vertexId) != null) { + getStreamNode(vertexId).setAttribute(attribute); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index d86cec1059e3ca..f47f1814332f2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.ResourceSpec; @@ -98,6 +99,8 @@ public class StreamNode { private boolean parallelismConfigured = false; + private Attribute attribute = new Attribute.Builder().build(); + @VisibleForTesting public StreamNode( Integer id, @@ -189,6 +192,14 @@ public int getId() { return id; } + public void setAttribute(Attribute attribute) { + this.attribute = attribute; + } + + public Attribute getAttribute() { + return attribute; + } + public int getParallelism() { return parallelism; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 7b0a6cf90b4f54..4ad87b35f84964 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.operators.ResourceSpec; @@ -187,9 +188,6 @@ public static JobGraph createJobGraph( private final Map chainedInputOutputFormats; - // the ids of nodes whose output result partition type should be set to BLOCKING - private final Set outputBlockingNodesID; - private final StreamGraphHasher defaultStreamGraphHasher; private final List legacyStreamGraphHashers; @@ -230,7 +228,6 @@ private StreamingJobGraphGenerator( this.chainedMinResources = new HashMap<>(); this.chainedPreferredResources = new HashMap<>(); this.chainedInputOutputFormats = new HashMap<>(); - this.outputBlockingNodesID = new HashSet<>(); this.physicalEdgesInOrder = new ArrayList<>(); this.serializationExecutor = Preconditions.checkNotNull(serializationExecutor); this.chainInfos = new HashMap<>(); @@ -682,10 +679,12 @@ private List createChain( List nonChainableOutputs = new ArrayList(); StreamNode currentNode = streamGraph.getStreamNode(currentNodeId); - - boolean isOutputOnlyAfterEndOfStream = currentNode.isOutputOnlyAfterEndOfStream(); - if (isOutputOnlyAfterEndOfStream) { - outputBlockingNodesID.add(currentNode.getId()); + Attribute attribute = currentNode.getAttribute(); + boolean isNoOutputUntilEndOfInput = + currentNode.isOutputOnlyAfterEndOfStream() + || attribute.isNoOutputUntilEndOfInput(); + if (isNoOutputUntilEndOfInput) { + attribute.setNoOutputUntilEndOfInput(true); } for (StreamEdge outEdge : currentNode.getOutEdges()) { @@ -697,9 +696,13 @@ private List createChain( } for (StreamEdge chainable : chainableOutputs) { - // Mark downstream nodes in the same chain as outputBlocking - if (isOutputOnlyAfterEndOfStream) { - outputBlockingNodesID.add(chainable.getTargetId()); + // Only modify the attribute of downstream nodes in the same chain. + if (isNoOutputUntilEndOfInput) { + StreamNode targetNode = streamGraph.getStreamNode(chainable.getTargetId()); + Attribute targetNodeAttribute = targetNode.getAttribute(); + if (targetNodeAttribute != null) { + targetNodeAttribute.setNoOutputUntilEndOfInput(true); + } } transitiveOutEdges.addAll( createChain( @@ -707,10 +710,6 @@ private List createChain( chainIndex + 1, chainInfo, chainEntryPoints)); - // Mark upstream nodes in the same chain as outputBlocking - if (outputBlockingNodesID.contains(chainable.getTargetId())) { - outputBlockingNodesID.add(currentNodeId); - } } for (StreamEdge nonChainable : nonChainableOutputs) { @@ -757,7 +756,7 @@ private List createChain( : new StreamConfig(new Configuration()); tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs); - + config.setAttribute(attribute); setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources()); setOperatorChainedOutputsConfig(config, chainableOutputs); @@ -1504,7 +1503,9 @@ private ResultPartitionType getResultPartitionType(StreamEdge edge) { } private ResultPartitionType determineUndefinedResultPartitionType(StreamEdge edge) { - if (outputBlockingNodesID.contains(edge.getSourceId())) { + Attribute sourceNodeAttribute = + streamGraph.getStreamNode(edge.getSourceId()).getAttribute(); + if (sourceNodeAttribute.isNoOutputUntilEndOfInput()) { edge.setBufferTimeout(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT); return ResultPartitionType.BLOCKING; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java index 54cfda86339c87..3f5269c97a77b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java @@ -69,7 +69,7 @@ protected Collection translateInternal( inputType, transformation.getOutputType(), transformation.getName()); - + streamGraph.setAttribute(transformationId, transformation.getAttribute()); if (stateKeySelector != null) { TypeSerializer keySerializer = stateKeyType.createSerializer(executionConfig.getSerializerConfig()); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java index 134c10ab806b18..653c1f430243e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java @@ -75,7 +75,7 @@ protected Collection translateInternal( secondInputTransformation.getOutputType(), transformation.getOutputType(), transformation.getName()); - + streamGraph.setAttribute(transformationId, transformation.getAttribute()); if (firstKeySelector != null || secondKeySelector != null) { checkState( keyTypeInfo != null, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java index be2980f3ab609c..c52ade1d638ebf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java @@ -96,7 +96,7 @@ private Collection translateInternal( transformation.getInputTypes(), transformation.getOutputType(), transformation.getName()); - + streamGraph.setAttribute(transformationId, transformation.getAttribute()); final int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? transformation.getParallelism() diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java index 6d7ae8103f1266..e17941d6551ffc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java @@ -67,7 +67,8 @@ public Collection translateForStreamingInternal( transformation.getStateKeyType(), context); - if (transformation.isOutputOnlyAfterEndOfStream()) { + if (transformation.isOutputOnlyAfterEndOfStream() + || transformation.getAttribute().isNoOutputUntilEndOfInput()) { maybeApplyBatchExecutionSettings(transformation, context); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java index 94c9e2ec17347c..09f17da0959d9b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java @@ -49,8 +49,9 @@ /** Tests for {@link StreamingJobGraphGenerator} with internal sorter. */ public class StreamingJobGraphGeneratorWithOperatorAttributesTest { + @Test - void testOutputOnlyAfterEndOfStream() { + void testOutputOnlyAfterEndOfStreamCase1() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); @@ -97,10 +98,46 @@ void testOutputOnlyAfterEndOfStream() { assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse(); assertThat(vertexMap.get("transform -> Map").isAnyOutputBlocking()).isTrue(); assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testOutputOnlyAfterEndOfStreamCase2() { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + + final DataStream source = env.fromData(1, 2, 3).name("source"); + source.keyBy(x -> x) + .transform( + "transform", + Types.INT, + new StreamOperatorWithConfigurableOperatorAttributes<>( + x -> x, + new OperatorAttributesBuilder() + .setOutputOnlyAfterEndOfStream(true) + .build())) + .map(x -> x) + .sinkTo(new DiscardingSink<>()) + .disableChaining() + .name("sink"); + + final StreamGraph streamGraph = env.getStreamGraph(false); + Map nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertThat(nodeMap.get("Source: source").isOutputOnlyAfterEndOfStream()).isFalse(); + assertThat(nodeMap.get("transform").isOutputOnlyAfterEndOfStream()).isTrue(); + assertThat(nodeMap.get("Map").isOutputOnlyAfterEndOfStream()).isFalse(); + assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse(); + assertManagedMemoryWeightsSize(nodeMap.get("Source: source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("transform"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Map"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0); env.disableOperatorChaining(); - jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph(false)); - vertexMap = new HashMap<>(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph(false)); + Map vertexMap = new HashMap<>(); for (JobVertex vertex : jobGraph.getVertices()) { vertexMap.put(vertex.getName(), vertex); } @@ -146,10 +183,6 @@ void testApplyBatchExecutionSettingsOnTwoInputOperator() { assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0); } - private static void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) { - assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize); - } - @Test void testOneInputOperatorWithInternalSorterSupported() { final StreamExecutionEnvironment env = @@ -361,6 +394,10 @@ public void processElement2( IN2 value, CoProcessFunction.Context ctx, Collector out) {} } + private void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) { + assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize); + } + private void assertHasOutputPartitionType( JobVertex jobVertex, ResultPartitionType partitionType) { assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);