From 08990c7d21260dff55e3a499354f5a5980ae68e6 Mon Sep 17 00:00:00 2001 From: Yu Chen Date: Sun, 1 Sep 2024 14:12:53 +0800 Subject: [PATCH] [FLINK-36069][runtime/rest] Extending job detail rest API to expose json stream graph --- .../program/rest/RestClusterClientTest.java | 4 +- .../runtime/webmonitor/WebFrontendITCase.java | 119 +++++++++ .../src/test/resources/rest_api_v1.snapshot | 7 + .../executiongraph/AccessExecutionGraph.java | 18 ++ .../ArchivedExecutionGraph.java | 28 ++- .../executiongraph/DefaultExecutionGraph.java | 10 + .../jobgraph/jsonplan/JsonPlanGenerator.java | 54 +++++ .../rest/handler/job/JobDetailsHandler.java | 9 +- .../rest/messages/job/JobDetailsInfo.java | 48 +++- ...daptiveExecutionPlanSchedulingContext.java | 5 + .../ExecutionPlanSchedulingContext.java | 11 + ...daptiveExecutionPlanSchedulingContext.java | 5 + .../api/graph/AdaptiveGraphManager.java | 29 ++- .../api/graph/DefaultStreamGraphContext.java | 26 ++ .../api/graph/StreamGraphContext.java | 7 + .../jobgraph/jsonplan/JsonGeneratorTest.java | 77 ++++++ .../jsonplan/StreamGraphJsonSchema.java | 228 ++++++++++++++++++ .../handler/job/JobDetailsHandlerTest.java | 119 +++++++++ .../utils/ArchivedExecutionGraphBuilder.java | 10 +- .../rest/messages/job/JobDetailsInfoTest.java | 4 +- .../adapter/DefaultExecutionTopologyTest.java | 5 + .../StateTrackingMockExecutionGraph.java | 10 + 22 files changed, 821 insertions(+), 12 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/StreamGraphJsonSchema.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 29968f192b944..19e5b2684c8bb 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -1277,7 +1277,9 @@ void testJobDetailsContainsSlotSharingGroupId() throws Exception { Collections.singletonMap(JobStatus.RUNNING, 1L), jobVertexDetailsInfos, Collections.singletonMap(ExecutionState.RUNNING, 1), - new JobPlanInfo.RawJson("{\"id\":\"1234\"}")); + new JobPlanInfo.RawJson("{\"id\":\"1234\"}"), + new JobPlanInfo.RawJson("{\"id\":\"1234\"}"), + 0); final TestJobDetailsInfoHandler jobDetailsInfoHandler = new TestJobDetailsInfoHandler(jobDetailsInfo); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index 0f70ba2f5a2bb..5fd8033747314 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -19,13 +19,22 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -35,9 +44,13 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest; import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator; @@ -523,6 +536,63 @@ void testCancelYarn( BlockingInvokable.reset(); } + @Test + void getStreamGraphFromBatchJobDetailsHandler( + @InjectClusterClient ClusterClient clusterClient, + @InjectClusterRESTAddress URI restAddress) + throws Exception { + // this only works if there is no active job at this point + assertThat(getRunningJobs(clusterClient).isEmpty()); + + // Create a task + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.fromSource( + new BlockingNumberSequenceSource(), + WatermarkStrategy.noWatermarks(), + "block-source") + .setParallelism(2) + .addSink(new SinkFunction<>() {}); + StreamGraph streamGraph = env.getStreamGraph(); + final JobID jid = streamGraph.getJobID(); + + clusterClient.submitJob(streamGraph).get(); + + // wait for job to show up + while (getRunningJobs(clusterClient).isEmpty()) { + Thread.sleep(10); + } + + // wait for tasks to be properly running + BlockingInvokable.latch.await(); + + final Duration testTimeout = Duration.ofMinutes(2); + final Deadline deadline = Deadline.fromNow(testTimeout); + try (HttpTestClient client = new HttpTestClient("localhost", restAddress.getPort())) { + // Request the file from the web server + client.sendGetRequest("/jobs/" + jid, deadline.timeLeft()); + + HttpTestClient.SimpleHttpResponse response = + client.getNextResponse(deadline.timeLeft()); + + assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.OK); + assertThat(response.getType()).isEqualTo("application/json; charset=UTF-8"); + String content = response.getContent(); + JsonNode jsonNode = OBJECT_MAPPER.readTree(content); + assertThat(jsonNode.has("stream-graph")).isTrue(); + assertThat(jsonNode.has("pending-operators")).isTrue(); + } + + clusterClient.cancel(jid).get(); + + // ensure cancellation is finished + while (!getRunningJobs(clusterClient).isEmpty()) { + Thread.sleep(20); + } + + BlockingInvokable.reset(); + } + private static List getRunningJobs(ClusterClient client) throws Exception { Collection statusMessages = client.listJobs().get(); return statusMessages.stream() @@ -616,6 +686,55 @@ private static String postAndGetHttpResponse(String url, String jsonData) throws "Could not get HTTP response in time since the service is still unavailable."); } + /** + * Test sequence source with blocker, which blocks reader by blocker at EOI and only exit when + * job cancelled. + */ + private static class BlockingNumberSequenceSource extends NumberSequenceSource { + public BlockingNumberSequenceSource() { + super(0, 1); + } + + @Override + public SourceReader createReader( + SourceReaderContext readerContext) { + return new BlockingIteratorSourceReader<>(readerContext); + } + } + + private static class BlockingIteratorSourceReader< + E, IterT extends Iterator, SplitT extends IteratorSourceSplit> + extends IteratorSourceReader { + private transient BlockingInvokable blocker; + + public BlockingIteratorSourceReader(SourceReaderContext context) { + super(context); + } + + @Override + public void start() { + super.start(); + blocker = new BlockingInvokable(new DummyEnvironment()); + } + + @Override + public InputStatus pollNext(ReaderOutput output) { + InputStatus inputStatus = super.pollNext(output); + if (inputStatus == InputStatus.END_OF_INPUT) { + try { + blocker.invoke(); + } catch (Exception ignored) { + } + } + return inputStatus; + } + + @Override + public void close() { + blocker.cancel(); + } + } + /** Test invokable that allows waiting for all subtasks to be running. */ public static class BlockingInvokable extends AbstractInvokable { diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 2c3601bf15b81..c1801407d1c6c 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -967,6 +967,13 @@ "plan" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson" + }, + "stream-graph" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson" + }, + "pending-operators" : { + "type" : "integer" } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index e9998b1a78fc8..1cac672a8969b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -47,6 +47,15 @@ public interface AccessExecutionGraph extends JobStatusProvider { */ String getJsonPlan(); + /** + * Returns the stream graph as a JSON string. + * + * @return stream graph as a JSON string, or null if the job is submitted with a JobGraph or if + * it's a streaming job. + */ + @Nullable + String getStreamGraphJson(); + /** * Returns the {@link JobID} for this execution graph. * @@ -199,4 +208,13 @@ public interface AccessExecutionGraph extends JobStatusProvider { * @return The changelog storage name, or an empty Optional in the case of batch jobs */ Optional getChangelogStorageName(); + + /** + * Retrieves the count of pending operators waiting to be transferred to job vertices in the + * adaptive execution of batch jobs. This value will be zero if the job is submitted with a + * JobGraph or if it's a streaming job. + * + * @return the number of pending operators. + */ + int getPendingOperatorCount(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index f322523bcbf2a..f93eaa4a6aef4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -113,6 +113,10 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl @Nullable private final String changelogStorageName; + @Nullable private final String streamGraphJson; + + private final int pendingOperatorCount; + public ArchivedExecutionGraph( JobID jobID, String jobName, @@ -132,7 +136,9 @@ public ArchivedExecutionGraph( @Nullable String stateBackendName, @Nullable String checkpointStorageName, @Nullable TernaryBoolean stateChangelogEnabled, - @Nullable String changelogStorageName) { + @Nullable String changelogStorageName, + @Nullable String streamGraphJson, + int pendingOperatorCount) { this.jobID = Preconditions.checkNotNull(jobID); this.jobName = Preconditions.checkNotNull(jobName); @@ -153,6 +159,8 @@ public ArchivedExecutionGraph( this.checkpointStorageName = checkpointStorageName; this.stateChangelogEnabled = stateChangelogEnabled; this.changelogStorageName = changelogStorageName; + this.streamGraphJson = streamGraphJson; + this.pendingOperatorCount = pendingOperatorCount; } // -------------------------------------------------------------------------------------------- @@ -162,6 +170,11 @@ public String getJsonPlan() { return jsonPlan; } + @Override + public String getStreamGraphJson() { + return streamGraphJson; + } + @Override public JobID getJobID() { return jobID; @@ -298,6 +311,11 @@ public Optional getChangelogStorageName() { return Optional.ofNullable(changelogStorageName); } + @Override + public int getPendingOperatorCount() { + return pendingOperatorCount; + } + /** * Create a {@link ArchivedExecutionGraph} from the given {@link ExecutionGraph}. * @@ -366,7 +384,9 @@ public static ArchivedExecutionGraph createFrom( executionGraph.getStateBackendName().orElse(null), executionGraph.getCheckpointStorageName().orElse(null), executionGraph.isChangelogStateBackendEnabled(), - executionGraph.getChangelogStorageName().orElse(null)); + executionGraph.getChangelogStorageName().orElse(null), + executionGraph.getStreamGraphJson(), + executionGraph.getPendingOperatorCount()); } /** @@ -487,6 +507,8 @@ private static ArchivedExecutionGraph createSparseArchivedExecutionGraph( checkpointingSettings == null ? TernaryBoolean.UNDEFINED : checkpointingSettings.isChangelogStateBackendEnabled(), - checkpointingSettings == null ? null : "Unknown"); + checkpointingSettings == null ? null : "Unknown", + null, + 0); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 1114671a1b56c..2d551ec6dfd09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -457,6 +457,11 @@ public Optional getChangelogStorageName() { return Optional.ofNullable(changelogStorageName); } + @Override + public int getPendingOperatorCount() { + return executionPlanSchedulingContext.getPendingOperatorCount(); + } + @Override public void enableCheckpointing( CheckpointCoordinatorConfiguration chkConfig, @@ -612,6 +617,11 @@ public void setJsonPlan(String jsonPlan) { this.jsonPlan = jsonPlan; } + @Override + public String getStreamGraphJson() { + return executionPlanSchedulingContext.getStreamGraphJson(); + } + @Override public String getJsonPlan() { return jsonPlan; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java index 6cdcc77cc537a..6005c32464272 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java @@ -26,6 +26,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -34,6 +37,7 @@ import java.io.StringWriter; import java.util.List; +import java.util.Map; @Internal public class JsonPlanGenerator { @@ -170,4 +174,54 @@ public static String generatePlan( throw new RuntimeException("Failed to generate plan", e); } } + + public static String generateStreamGraphJson( + StreamGraph sg, Map jobVertexIdMap) { + try (final StringWriter writer = new StringWriter(1024)) { + try (final JsonGenerator gen = new JsonFactory().createGenerator(writer)) { + // start of everything + gen.writeStartObject(); + + gen.writeArrayFieldStart("nodes"); + + // info per vertex + for (StreamNode node : sg.getStreamNodes()) { + gen.writeStartObject(); + gen.writeStringField("id", String.valueOf(node.getId())); + gen.writeNumberField("parallelism", node.getParallelism()); + gen.writeStringField("operator", node.getOperatorName()); + gen.writeStringField("description", node.getOperatorDescription()); + if (jobVertexIdMap.containsKey(node.getId())) { + gen.writeStringField( + "job_vertex_id", jobVertexIdMap.get(node.getId()).toString()); + } + + // write the input edge properties + gen.writeArrayFieldStart("inputs"); + + List inEdges = node.getInEdges(); + for (int inputNum = 0; inputNum < inEdges.size(); inputNum++) { + StreamEdge edge = inEdges.get(inputNum); + gen.writeStartObject(); + gen.writeNumberField("num", inputNum); + gen.writeStringField("id", String.valueOf(edge.getSourceId())); + gen.writeStringField("ship_strategy", edge.getPartitioner().toString()); + gen.writeStringField("exchange", edge.getExchangeMode().name()); + gen.writeEndObject(); + } + + gen.writeEndArray(); + + gen.writeEndObject(); + } + + // end of everything + gen.writeEndArray(); + gen.writeEndObject(); + } + return writer.toString(); + } catch (Exception e) { + throw new RuntimeException("Failed to generate json stream plan", e); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index efc49c1d8c711..0ee703cfa51e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -141,6 +141,11 @@ private static JobDetailsInfo createJobDetailsInfo( executionState, jobVerticesPerState[executionState.ordinal()]); } + JobPlanInfo.RawJson streamGraphJson = null; + if (executionGraph.getStreamGraphJson() != null) { + streamGraphJson = new JobPlanInfo.RawJson(executionGraph.getStreamGraphJson()); + } + return new JobDetailsInfo( executionGraph.getJobID(), executionGraph.getJobName(), @@ -155,7 +160,9 @@ private static JobDetailsInfo createJobDetailsInfo( timestamps, jobVertexInfos, jobVerticesPerStateMap, - new JobPlanInfo.RawJson(executionGraph.getJsonPlan())); + new JobPlanInfo.RawJson(executionGraph.getJsonPlan()), + streamGraphJson, + executionGraph.getPendingOperatorCount()); } private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java index 250407005c155..250afd3a0415a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java @@ -37,12 +37,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; import io.swagger.v3.oas.annotations.media.Schema; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.Map; import java.util.Objects; @@ -79,6 +82,14 @@ public class JobDetailsInfo implements ResponseBody { public static final String FIELD_NAME_JSON_PLAN = "plan"; + /** + * The {@link JobPlanInfo.RawJson} of the submitted stream graph, or null if the job is + * submitted with a JobGraph or if it's a streaming job. + */ + public static final String FIELD_NAME_STREAM_GRAPH_JSON = "stream-graph"; + + public static final String FIELD_NAME_PENDING_OPERATORS = "pending-operators"; + @JsonProperty(FIELD_NAME_JOB_ID) @JsonSerialize(using = JobIDSerializer.class) private final JobID jobId; @@ -122,6 +133,14 @@ public class JobDetailsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_JSON_PLAN) private final JobPlanInfo.RawJson jsonPlan; + @JsonProperty(FIELD_NAME_STREAM_GRAPH_JSON) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + private final JobPlanInfo.RawJson streamGraphJson; + + @JsonProperty(FIELD_NAME_PENDING_OPERATORS) + private final int pendingOperators; + @JsonCreator public JobDetailsInfo( @JsonDeserialize(using = JobIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_ID) @@ -140,7 +159,10 @@ public JobDetailsInfo( Collection jobVertexInfos, @JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE) Map jobVerticesPerState, - @JsonProperty(FIELD_NAME_JSON_PLAN) JobPlanInfo.RawJson jsonPlan) { + @JsonProperty(FIELD_NAME_JSON_PLAN) JobPlanInfo.RawJson jsonPlan, + @JsonProperty(FIELD_NAME_STREAM_GRAPH_JSON) @Nullable + JobPlanInfo.RawJson streamGraphJson, + @JsonProperty(FIELD_NAME_PENDING_OPERATORS) int pendingOperators) { this.jobId = Preconditions.checkNotNull(jobId); this.name = Preconditions.checkNotNull(name); this.isStoppable = isStoppable; @@ -155,6 +177,8 @@ public JobDetailsInfo( this.jobVertexInfos = Preconditions.checkNotNull(jobVertexInfos); this.jobVerticesPerState = Preconditions.checkNotNull(jobVerticesPerState); this.jsonPlan = Preconditions.checkNotNull(jsonPlan); + this.streamGraphJson = streamGraphJson; + this.pendingOperators = pendingOperators; } @Override @@ -179,7 +203,9 @@ public boolean equals(Object o) { && Objects.equals(timestamps, that.timestamps) && Objects.equals(jobVertexInfos, that.jobVertexInfos) && Objects.equals(jobVerticesPerState, that.jobVerticesPerState) - && Objects.equals(jsonPlan, that.jsonPlan); + && Objects.equals(jsonPlan, that.jsonPlan) + && Objects.equals(streamGraphJson, that.streamGraphJson) + && Objects.equals(pendingOperators, that.pendingOperators); } @Override @@ -198,7 +224,9 @@ public int hashCode() { timestamps, jobVertexInfos, jobVerticesPerState, - jsonPlan); + jsonPlan, + streamGraphJson, + pendingOperators); } @JsonIgnore @@ -271,6 +299,20 @@ public String getJsonPlan() { return jsonPlan.toString(); } + @JsonIgnore + @Nullable + public String getStreamGraphJson() { + if (streamGraphJson != null) { + return streamGraphJson.toString(); + } + return null; + } + + @JsonIgnore + public int getPendingOperators() { + return pendingOperators; + } + // --------------------------------------------------- // Static inner classes // --------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java index 5575f4e27be42..e96c97c1c82a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java @@ -118,6 +118,11 @@ public int getPendingOperatorCount() { return adaptiveGraphManager.getPendingOperatorsCount(); } + @Override + public String getStreamGraphJson() { + return adaptiveGraphManager.getStreamGraphJson(); + } + private int getParallelism(int streamNodeId) { return adaptiveGraphManager .getStreamGraphContext() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java index 86919814e9c9a..ed00382dc4832 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java @@ -21,6 +21,8 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.JobVertexID; +import javax.annotation.Nullable; + import java.util.function.Function; /** Interface for retrieving stream graph context details for adaptive batch jobs. */ @@ -58,4 +60,13 @@ int getConsumersMaxParallelism( * @return the number of pending operators. */ int getPendingOperatorCount(); + + /** + * Retrieves the JSON representation of the stream graph for the original job. + * + * @return the JSON representation of the stream graph, or null if the stream graph is not + * available. + */ + @Nullable + String getStreamGraphJson(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java index 72de8edec66ec..53fae3075eccb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java @@ -107,4 +107,9 @@ public int getConsumersMaxParallelism( public int getPendingOperatorCount() { return 0; } + + @Override + public String getStreamGraphJson() { + return null; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java index f683f586d4ebb..965187fae5b0f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup; +import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext; import org.apache.flink.streaming.api.graph.util.OperatorChainInfo; @@ -74,7 +75,8 @@ /** Default implementation for {@link AdaptiveGraphGenerator}. */ @Internal -public class AdaptiveGraphManager implements AdaptiveGraphGenerator { +public class AdaptiveGraphManager + implements AdaptiveGraphGenerator, StreamGraphContext.StreamGraphUpdateListener { private final StreamGraph streamGraph; @@ -128,6 +130,8 @@ public class AdaptiveGraphManager implements AdaptiveGraphGenerator { // We need cache all job vertices to create JobEdge for downstream vertex. private final Map startNodeToJobVertexMap; + private final Map streamNodeIdsToJobVertexMap; + // Records the ID of the job vertex that has completed execution. private final Set finishedJobVertices; @@ -135,6 +139,8 @@ public class AdaptiveGraphManager implements AdaptiveGraphGenerator { private final SlotSharingGroup defaultSlotSharingGroup; + private String streamGraphJson; + public AdaptiveGraphManager( ClassLoader userClassloader, StreamGraph streamGraph, Executor serializationExecutor) { preValidate(streamGraph, userClassloader); @@ -162,6 +168,7 @@ public AdaptiveGraphManager( this.jobVertexToStartNodeMap = new HashMap<>(); this.jobVertexToChainedStreamNodeIdsMap = new HashMap<>(); + this.streamNodeIdsToJobVertexMap = new HashMap<>(); this.finishedJobVertices = new HashSet<>(); @@ -171,7 +178,8 @@ public AdaptiveGraphManager( steamNodeIdToForwardGroupMap, frozenNodeToStartNodeMap, intermediateOutputsCaches, - consumerEdgeIdToIntermediateDataSetMap); + consumerEdgeIdToIntermediateDataSetMap, + this); this.jobGraph = createAndInitializeJobGraph(streamGraph, streamGraph.getJobID()); @@ -295,6 +303,8 @@ private List createJobVerticesAndUpdateGraph(List streamN generateConfigForJobVertices(jobVertexBuildContext); + generateStreamGraphJson(); + return new ArrayList<>(jobVertexBuildContext.getJobVerticesInOrder().values()); } @@ -438,6 +448,7 @@ private void recordCreatedJobVerticesInfo(JobVertexBuildContext jobVertexBuildCo .computeIfAbsent( jobVertex.getID(), key -> new ArrayList<>()) .add(node.getId()); + streamNodeIdsToJobVertexMap.put(node.getId(), jobVertex.getID()); }); } } @@ -661,4 +672,18 @@ private boolean isNodeFrozen(Integer streamNodeId) { private Integer getStartNodeId(Integer streamNodeId) { return frozenNodeToStartNodeMap.get(streamNodeId); } + + private void generateStreamGraphJson() { + streamGraphJson = + JsonPlanGenerator.generateStreamGraphJson(streamGraph, streamNodeIdsToJobVertexMap); + } + + public String getStreamGraphJson() { + return streamGraphJson; + } + + @Override + public void onStreamGraphUpdated() { + generateStreamGraphJson(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java index 324f3466e7cb9..f0ce8f8cbd766 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup; @@ -74,12 +75,31 @@ public class DefaultStreamGraphContext implements StreamGraphContext { private final Map consumerEdgeIdToIntermediateDataSetMap; + @Nullable private final StreamGraphUpdateListener streamGraphUpdateListener; + + @VisibleForTesting public DefaultStreamGraphContext( StreamGraph streamGraph, Map steamNodeIdToForwardGroupMap, Map frozenNodeToStartNodeMap, Map> opIntermediateOutputsCaches, Map consumerEdgeIdToIntermediateDataSetMap) { + this( + streamGraph, + steamNodeIdToForwardGroupMap, + frozenNodeToStartNodeMap, + opIntermediateOutputsCaches, + consumerEdgeIdToIntermediateDataSetMap, + null); + } + + public DefaultStreamGraphContext( + StreamGraph streamGraph, + Map steamNodeIdToForwardGroupMap, + Map frozenNodeToStartNodeMap, + Map> opIntermediateOutputsCaches, + Map consumerEdgeIdToIntermediateDataSetMap, + @Nullable StreamGraphUpdateListener streamGraphUpdateListener) { this.streamGraph = checkNotNull(streamGraph); this.steamNodeIdToForwardGroupMap = checkNotNull(steamNodeIdToForwardGroupMap); this.frozenNodeToStartNodeMap = checkNotNull(frozenNodeToStartNodeMap); @@ -87,6 +107,7 @@ public DefaultStreamGraphContext( this.immutableStreamGraph = new ImmutableStreamGraph(this.streamGraph); this.consumerEdgeIdToIntermediateDataSetMap = checkNotNull(consumerEdgeIdToIntermediateDataSetMap); + this.streamGraphUpdateListener = streamGraphUpdateListener; } @Override @@ -121,6 +142,11 @@ public boolean modifyStreamEdge(List requestInfos) } } + // Notify the listener that the StreamGraph has been updated. + if (streamGraphUpdateListener != null) { + streamGraphUpdateListener.onStreamGraphUpdated(); + } + return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java index 82cbac5613520..13d1c8aa5131c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java @@ -62,4 +62,11 @@ public interface StreamGraphContext { * @return true if all modifications were successful and applied atomically, false otherwise. */ boolean modifyStreamEdge(List requestInfos); + + /** Interface for observers that monitor the status of a StreamGraph. */ + @Internal + interface StreamGraphUpdateListener { + /** This method is called whenever the StreamGraph is updated. */ + void onStreamGraphUpdated(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java index 5eaf28a7bc1b5..b187e997f00fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java @@ -25,15 +25,25 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.util.jackson.JacksonMapperFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.runtime.util.JobVertexConnectionUtils.connectNewDataSetAsInput; import static org.junit.Assert.assertEquals; @@ -151,4 +161,71 @@ private void checkVertexExists(String vertexId, JobGraph graph) { } fail("could not find vertex with id " + vertexId + " in JobGraph"); } + + @Test + public void testGenerateStreamGraphJson() throws JsonProcessingException { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromSequence(0L, 1L).disableChaining().print(); + StreamGraph streamGraph = env.getStreamGraph(); + Map jobVertexIdMap = new HashMap<>(); + String streamGraphJson = + JsonPlanGenerator.generateStreamGraphJson(streamGraph, jobVertexIdMap); + + ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); + StreamGraphJsonSchema parsedStreamGraph = + mapper.readValue(streamGraphJson, StreamGraphJsonSchema.class); + + List expectedJobVertexIds = new ArrayList<>(); + expectedJobVertexIds.add(null); + expectedJobVertexIds.add(null); + validateStreamGraph(streamGraph, parsedStreamGraph, expectedJobVertexIds); + + jobVertexIdMap.put(1, new JobVertexID()); + jobVertexIdMap.put(2, new JobVertexID()); + streamGraphJson = JsonPlanGenerator.generateStreamGraphJson(streamGraph, jobVertexIdMap); + + parsedStreamGraph = mapper.readValue(streamGraphJson, StreamGraphJsonSchema.class); + validateStreamGraph( + streamGraph, + parsedStreamGraph, + jobVertexIdMap.values().stream() + .map(JobVertexID::toString) + .collect(Collectors.toList())); + } + + private static void validateStreamGraph( + StreamGraph streamGraph, + StreamGraphJsonSchema parsedStreamGraph, + List expectedJobVertexIds) { + List realJobVertexIds = new ArrayList<>(); + parsedStreamGraph + .getNodes() + .forEach( + node -> { + StreamNode streamNode = + streamGraph.getStreamNode(Integer.parseInt(node.getId())); + assertEquals(node.getOperator(), streamNode.getOperatorName()); + assertEquals( + node.getParallelism(), (Integer) streamNode.getParallelism()); + assertEquals( + node.getDescription(), streamNode.getOperatorDescription()); + validateStreamEdge(node.getInputs(), streamNode.getInEdges()); + realJobVertexIds.add(node.getJobVertexId()); + }); + assertEquals(expectedJobVertexIds, realJobVertexIds); + } + + private static void validateStreamEdge( + List jsonStreamEdges, + List streamEdges) { + assertEquals(jsonStreamEdges.size(), streamEdges.size()); + for (int i = 0; i < jsonStreamEdges.size(); i++) { + StreamGraphJsonSchema.JsonStreamEdgeSchema edgeToValidate = jsonStreamEdges.get(i); + StreamEdge expectedEdge = streamEdges.get(i); + assertEquals(String.valueOf(expectedEdge.getSourceId()), edgeToValidate.getId()); + assertEquals( + expectedEdge.getPartitioner().toString(), edgeToValidate.getShipStrategy()); + assertEquals(expectedEdge.getExchangeMode().name(), edgeToValidate.getExchange()); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/StreamGraphJsonSchema.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/StreamGraphJsonSchema.java new file mode 100644 index 0000000000000..37774266e8385 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/StreamGraphJsonSchema.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.jsonplan; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** A utility class for deserializing the JSON string of a stream graph. */ +public class StreamGraphJsonSchema { + public static final String FIELD_NAME_NODES = "nodes"; + + @JsonProperty(FIELD_NAME_NODES) + private final List nodes; + + @JsonCreator + public StreamGraphJsonSchema(@JsonProperty(FIELD_NAME_NODES) List nodes) { + this.nodes = nodes; + } + + @JsonIgnore + public List getNodes() { + return nodes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StreamGraphJsonSchema that = (StreamGraphJsonSchema) o; + return Objects.equals(nodes, that.nodes); + } + + @Override + public int hashCode() { + return Objects.hash(nodes); + } + + public static class JsonStreamNodeSchema { + public static final String FIELD_NAME_NODE_ID = "id"; + public static final String FIELD_NAME_NODE_PARALLELISM = "parallelism"; + public static final String FIELD_NAME_NODE_OPERATOR = "operator"; + public static final String FIELD_NAME_NODE_DESCRIPTION = "description"; + public static final String FIELD_NAME_NODE_JOB_VERTEX_ID = "job_vertex_id"; + public static final String FIELD_NAME_NODE_INPUTS = "inputs"; + + @JsonProperty(FIELD_NAME_NODE_ID) + private final String id; + + @JsonProperty(FIELD_NAME_NODE_PARALLELISM) + private final Integer parallelism; + + @JsonProperty(FIELD_NAME_NODE_OPERATOR) + private final String operator; + + @JsonProperty(FIELD_NAME_NODE_DESCRIPTION) + private final String description; + + @JsonProperty(FIELD_NAME_NODE_JOB_VERTEX_ID) + private final String jobVertexId; + + @JsonProperty(FIELD_NAME_NODE_INPUTS) + private final List inputs; + + @JsonCreator + public JsonStreamNodeSchema( + @JsonProperty(FIELD_NAME_NODE_ID) String id, + @JsonProperty(FIELD_NAME_NODE_PARALLELISM) Integer parallelism, + @JsonProperty(FIELD_NAME_NODE_OPERATOR) String operator, + @JsonProperty(FIELD_NAME_NODE_DESCRIPTION) String description, + @JsonProperty(FIELD_NAME_NODE_JOB_VERTEX_ID) String jobVertexId, + @JsonProperty(FIELD_NAME_NODE_INPUTS) List inputs) { + this.id = id; + this.parallelism = parallelism; + this.operator = operator; + this.description = description; + this.jobVertexId = jobVertexId; + this.inputs = inputs; + } + + @JsonIgnore + public String getId() { + return id; + } + + @JsonIgnore + public Integer getParallelism() { + return parallelism; + } + + @JsonIgnore + public String getOperator() { + return operator; + } + + @JsonIgnore + public String getDescription() { + return description; + } + + @JsonIgnore + public String getJobVertexId() { + return jobVertexId; + } + + @JsonIgnore + public List getInputs() { + return inputs; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JsonStreamNodeSchema that = (JsonStreamNodeSchema) o; + return Objects.equals(id, that.id) + && Objects.equals(parallelism, that.parallelism) + && Objects.equals(operator, that.operator) + && Objects.equals(description, that.description) + && Objects.equals(jobVertexId, that.jobVertexId) + && Objects.equals(inputs, that.inputs); + } + + @Override + public int hashCode() { + return Objects.hash(id, parallelism, operator, description, jobVertexId, inputs); + } + } + + public static class JsonStreamEdgeSchema { + public static final String FIELD_NAME_EDGE_INPUT_NUM = "num"; + public static final String FIELD_NAME_EDGE_ID = "id"; + public static final String FIELD_NAME_EDGE_SHIP_STRATEGY = "ship_strategy"; + public static final String FIELD_NAME_EDGE_EXCHANGE = "exchange"; + + @JsonProperty(FIELD_NAME_EDGE_INPUT_NUM) + private final Integer num; + + @JsonProperty(FIELD_NAME_EDGE_ID) + private final String id; + + @JsonProperty(FIELD_NAME_EDGE_SHIP_STRATEGY) + private final String shipStrategy; + + @JsonProperty(FIELD_NAME_EDGE_EXCHANGE) + private final String exchange; + + @JsonCreator + public JsonStreamEdgeSchema( + @JsonProperty(FIELD_NAME_EDGE_INPUT_NUM) Integer num, + @JsonProperty(FIELD_NAME_EDGE_ID) String id, + @JsonProperty(FIELD_NAME_EDGE_SHIP_STRATEGY) String shipStrategy, + @JsonProperty(FIELD_NAME_EDGE_EXCHANGE) String exchange) { + this.num = num; + this.id = id; + this.shipStrategy = shipStrategy; + this.exchange = exchange; + } + + @JsonIgnore + public Integer getNum() { + return num; + } + + @JsonIgnore + public String getId() { + return id; + } + + @JsonIgnore + public String getShipStrategy() { + return shipStrategy; + } + + @JsonIgnore + public String getExchange() { + return exchange; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JsonStreamEdgeSchema that = (JsonStreamEdgeSchema) o; + return Objects.equals(num, that.num) + && Objects.equals(id, that.id) + && Objects.equals(shipStrategy, that.shipStrategy) + && Objects.equals(exchange, that.exchange); + } + + @Override + public int hashCode() { + return Objects.hash(num, id, shipStrategy, exchange); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java new file mode 100644 index 0000000000000..1ecb68cdf7da8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionConfigBuilder; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the {@link JobDetailsHandler}. */ +class JobDetailsHandlerTest { + private JobDetailsHandler jobDetailsHandler; + private HandlerRequest handlerRequest; + private AccessExecutionGraph archivedExecutionGraph; + private final String expectedStreamGraphJson = + "{\"pending_operators:2,\":\"nodes\":[{\"id\":\"1\",\"parallelism\":1,\"operator\":\"Source: Sequence Source\",\"description\":\"Source: Sequence Source\",\"inputs\":[]},{\"id\":\"2\",\"parallelism\":1,\"operator\":\"Sink: Print to Std. Out\",\"description\":\"Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"1\",\"ship_strategy\":\"FORWARD\",\"exchange\":\"UNDEFINED\"}]}]}"; + + private static HandlerRequest createRequest(JobID jobId) + throws HandlerRequestException { + Map pathParameters = new HashMap<>(); + pathParameters.put(JobIDPathParameter.KEY, jobId.toString()); + return HandlerRequest.resolveParametersAndCreate( + EmptyRequestBody.getInstance(), + new TaskManagerMessageParameters(), + pathParameters, + Collections.emptyMap(), + Collections.emptyList()); + } + + @BeforeEach + void setUp() throws HandlerRequestException { + GatewayRetriever leaderRetriever = + () -> CompletableFuture.completedFuture(null); + final RestHandlerConfiguration restHandlerConfiguration = + RestHandlerConfiguration.fromConfiguration(new Configuration()); + final MetricFetcher metricFetcher = + new MetricFetcherImpl<>( + () -> null, + address -> null, + Executors.directExecutor(), + Duration.ofMillis(1000L), + MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue().toMillis()); + final ArchivedExecutionConfig archivedExecutionConfig = + new ArchivedExecutionConfigBuilder().build(); + + archivedExecutionGraph = + new ArchivedExecutionGraphBuilder() + .setArchivedExecutionConfig(archivedExecutionConfig) + .setStreamGraphJson(expectedStreamGraphJson) + .build(); + jobDetailsHandler = + new JobDetailsHandler( + leaderRetriever, + TestingUtils.TIMEOUT, + Collections.emptyMap(), + JobDetailsHeaders.getInstance(), + new DefaultExecutionGraphCache( + restHandlerConfiguration.getTimeout(), + Duration.ofMillis(restHandlerConfiguration.getRefreshInterval())), + Executors.directExecutor(), + metricFetcher); + handlerRequest = createRequest(archivedExecutionGraph.getJobID()); + } + + @Test + void testGetJobDetailsWithStreamGraphJson() throws RestHandlerException { + JobDetailsInfo jobDetailsInfo = + jobDetailsHandler.handleRequest(handlerRequest, archivedExecutionGraph); + assertThat(jobDetailsInfo.getStreamGraphJson()) + .isEqualTo(new JobPlanInfo.RawJson(expectedStreamGraphJson).toString()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java index 58255a5b6dc28..da2b84c3f3142 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java @@ -57,6 +57,7 @@ public class ArchivedExecutionGraphBuilder { private boolean isStoppable; private Map>> serializedUserAccumulators; private CheckpointStatsSnapshot checkpointStatsSnapshot; + private String streamGraphJson; public ArchivedExecutionGraphBuilder setJobID(JobID jobID) { this.jobID = jobID; @@ -101,6 +102,11 @@ public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) { return this; } + public ArchivedExecutionGraphBuilder setStreamGraphJson(String streamGraphJson) { + this.streamGraphJson = streamGraphJson; + return this; + } + public ArchivedExecutionGraphBuilder setArchivedUserAccumulators( StringifiedAccumulatorResult[] archivedUserAccumulators) { this.archivedUserAccumulators = archivedUserAccumulators; @@ -171,6 +177,8 @@ public ArchivedExecutionGraph build() { "stateBackendName", "checkpointStorageName", TernaryBoolean.UNDEFINED, - "changelogStorageName"); + "changelogStorageName", + streamGraphJson, + 0); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java index 6bb69b357e0cd..fa346bdfbadb8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java @@ -84,7 +84,9 @@ protected JobDetailsInfo getTestResponseInstance() throws Exception { timestamps, jobVertexInfos, jobVerticesPerState, - new JobPlanInfo.RawJson(jsonPlan)); + new JobPlanInfo.RawJson(jsonPlan), + null, + 0); } private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random random) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java index 2b4813b2e9300..e01bc6f248775 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java @@ -449,5 +449,10 @@ public int getConsumersMaxParallelism( public int getPendingOperatorCount() { return 0; } + + @Override + public String getStreamGraphJson() { + return null; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java index ce49ff5bded0e..becb10fdb8a77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java @@ -187,6 +187,11 @@ public String getJsonPlan() { return ""; } + @Override + public String getStreamGraphJson() { + return null; + } + @Override public void setJsonPlan(String jsonPlan) {} @@ -249,6 +254,11 @@ public Optional getChangelogStorageName() { return Optional.empty(); } + @Override + public int getPendingOperatorCount() { + return 0; + } + @Override public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() { return new StringifiedAccumulatorResult[0];