Skip to content

Commit

Permalink
[FLINK-36069][runtime/rest] Extending job detail rest API to expose j…
Browse files Browse the repository at this point in the history
…son stream graph
  • Loading branch information
yuchen-ecnu authored and JunRuiLee committed Dec 28, 2024
1 parent cb75a58 commit 08990c7
Show file tree
Hide file tree
Showing 22 changed files with 821 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,9 @@ void testJobDetailsContainsSlotSharingGroupId() throws Exception {
Collections.singletonMap(JobStatus.RUNNING, 1L),
jobVertexDetailsInfos,
Collections.singletonMap(ExecutionState.RUNNING, 1),
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"));
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"),
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"),
0);
final TestJobDetailsInfoHandler jobDetailsInfoHandler =
new TestJobDetailsInfoHandler(jobDetailsInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@
package org.apache.flink.runtime.webmonitor;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -35,9 +44,13 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
Expand Down Expand Up @@ -523,6 +536,63 @@ void testCancelYarn(
BlockingInvokable.reset();
}

@Test
void getStreamGraphFromBatchJobDetailsHandler(
@InjectClusterClient ClusterClient<?> clusterClient,
@InjectClusterRESTAddress URI restAddress)
throws Exception {
// this only works if there is no active job at this point
assertThat(getRunningJobs(clusterClient).isEmpty());

// Create a task
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.fromSource(
new BlockingNumberSequenceSource(),
WatermarkStrategy.noWatermarks(),
"block-source")
.setParallelism(2)
.addSink(new SinkFunction<>() {});
StreamGraph streamGraph = env.getStreamGraph();
final JobID jid = streamGraph.getJobID();

clusterClient.submitJob(streamGraph).get();

// wait for job to show up
while (getRunningJobs(clusterClient).isEmpty()) {
Thread.sleep(10);
}

// wait for tasks to be properly running
BlockingInvokable.latch.await();

final Duration testTimeout = Duration.ofMinutes(2);
final Deadline deadline = Deadline.fromNow(testTimeout);
try (HttpTestClient client = new HttpTestClient("localhost", restAddress.getPort())) {
// Request the file from the web server
client.sendGetRequest("/jobs/" + jid, deadline.timeLeft());

HttpTestClient.SimpleHttpResponse response =
client.getNextResponse(deadline.timeLeft());

assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.OK);
assertThat(response.getType()).isEqualTo("application/json; charset=UTF-8");
String content = response.getContent();
JsonNode jsonNode = OBJECT_MAPPER.readTree(content);
assertThat(jsonNode.has("stream-graph")).isTrue();
assertThat(jsonNode.has("pending-operators")).isTrue();
}

clusterClient.cancel(jid).get();

// ensure cancellation is finished
while (!getRunningJobs(clusterClient).isEmpty()) {
Thread.sleep(20);
}

BlockingInvokable.reset();
}

private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
Collection<JobStatusMessage> statusMessages = client.listJobs().get();
return statusMessages.stream()
Expand Down Expand Up @@ -616,6 +686,55 @@ private static String postAndGetHttpResponse(String url, String jsonData) throws
"Could not get HTTP response in time since the service is still unavailable.");
}

/**
* Test sequence source with blocker, which blocks reader by blocker at EOI and only exit when
* job cancelled.
*/
private static class BlockingNumberSequenceSource extends NumberSequenceSource {
public BlockingNumberSequenceSource() {
super(0, 1);
}

@Override
public SourceReader<Long, NumberSequenceSplit> createReader(
SourceReaderContext readerContext) {
return new BlockingIteratorSourceReader<>(readerContext);
}
}

private static class BlockingIteratorSourceReader<
E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReader<E, IterT, SplitT> {
private transient BlockingInvokable blocker;

public BlockingIteratorSourceReader(SourceReaderContext context) {
super(context);
}

@Override
public void start() {
super.start();
blocker = new BlockingInvokable(new DummyEnvironment());
}

@Override
public InputStatus pollNext(ReaderOutput<E> output) {
InputStatus inputStatus = super.pollNext(output);
if (inputStatus == InputStatus.END_OF_INPUT) {
try {
blocker.invoke();
} catch (Exception ignored) {
}
}
return inputStatus;
}

@Override
public void close() {
blocker.cancel();
}
}

/** Test invokable that allows waiting for all subtasks to be running. */
public static class BlockingInvokable extends AbstractInvokable {

Expand Down
7 changes: 7 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,13 @@
"plan" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson"
},
"stream-graph" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson"
},
"pending-operators" : {
"type" : "integer"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ public interface AccessExecutionGraph extends JobStatusProvider {
*/
String getJsonPlan();

/**
* Returns the stream graph as a JSON string.
*
* @return stream graph as a JSON string, or null if the job is submitted with a JobGraph or if
* it's a streaming job.
*/
@Nullable
String getStreamGraphJson();

/**
* Returns the {@link JobID} for this execution graph.
*
Expand Down Expand Up @@ -199,4 +208,13 @@ public interface AccessExecutionGraph extends JobStatusProvider {
* @return The changelog storage name, or an empty Optional in the case of batch jobs
*/
Optional<String> getChangelogStorageName();

/**
* Retrieves the count of pending operators waiting to be transferred to job vertices in the
* adaptive execution of batch jobs. This value will be zero if the job is submitted with a
* JobGraph or if it's a streaming job.
*
* @return the number of pending operators.
*/
int getPendingOperatorCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl

@Nullable private final String changelogStorageName;

@Nullable private final String streamGraphJson;

private final int pendingOperatorCount;

public ArchivedExecutionGraph(
JobID jobID,
String jobName,
Expand All @@ -132,7 +136,9 @@ public ArchivedExecutionGraph(
@Nullable String stateBackendName,
@Nullable String checkpointStorageName,
@Nullable TernaryBoolean stateChangelogEnabled,
@Nullable String changelogStorageName) {
@Nullable String changelogStorageName,
@Nullable String streamGraphJson,
int pendingOperatorCount) {

this.jobID = Preconditions.checkNotNull(jobID);
this.jobName = Preconditions.checkNotNull(jobName);
Expand All @@ -153,6 +159,8 @@ public ArchivedExecutionGraph(
this.checkpointStorageName = checkpointStorageName;
this.stateChangelogEnabled = stateChangelogEnabled;
this.changelogStorageName = changelogStorageName;
this.streamGraphJson = streamGraphJson;
this.pendingOperatorCount = pendingOperatorCount;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -162,6 +170,11 @@ public String getJsonPlan() {
return jsonPlan;
}

@Override
public String getStreamGraphJson() {
return streamGraphJson;
}

@Override
public JobID getJobID() {
return jobID;
Expand Down Expand Up @@ -298,6 +311,11 @@ public Optional<String> getChangelogStorageName() {
return Optional.ofNullable(changelogStorageName);
}

@Override
public int getPendingOperatorCount() {
return pendingOperatorCount;
}

/**
* Create a {@link ArchivedExecutionGraph} from the given {@link ExecutionGraph}.
*
Expand Down Expand Up @@ -366,7 +384,9 @@ public static ArchivedExecutionGraph createFrom(
executionGraph.getStateBackendName().orElse(null),
executionGraph.getCheckpointStorageName().orElse(null),
executionGraph.isChangelogStateBackendEnabled(),
executionGraph.getChangelogStorageName().orElse(null));
executionGraph.getChangelogStorageName().orElse(null),
executionGraph.getStreamGraphJson(),
executionGraph.getPendingOperatorCount());
}

/**
Expand Down Expand Up @@ -487,6 +507,8 @@ private static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
checkpointingSettings == null
? TernaryBoolean.UNDEFINED
: checkpointingSettings.isChangelogStateBackendEnabled(),
checkpointingSettings == null ? null : "Unknown");
checkpointingSettings == null ? null : "Unknown",
null,
0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ public Optional<String> getChangelogStorageName() {
return Optional.ofNullable(changelogStorageName);
}

@Override
public int getPendingOperatorCount() {
return executionPlanSchedulingContext.getPendingOperatorCount();
}

@Override
public void enableCheckpointing(
CheckpointCoordinatorConfiguration chkConfig,
Expand Down Expand Up @@ -612,6 +617,11 @@ public void setJsonPlan(String jsonPlan) {
this.jsonPlan = jsonPlan;
}

@Override
public String getStreamGraphJson() {
return executionPlanSchedulingContext.getStreamGraphJson();
}

@Override
public String getJsonPlan() {
return jsonPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
Expand All @@ -34,6 +37,7 @@

import java.io.StringWriter;
import java.util.List;
import java.util.Map;

@Internal
public class JsonPlanGenerator {
Expand Down Expand Up @@ -170,4 +174,54 @@ public static String generatePlan(
throw new RuntimeException("Failed to generate plan", e);
}
}

public static String generateStreamGraphJson(
StreamGraph sg, Map<Integer, JobVertexID> jobVertexIdMap) {
try (final StringWriter writer = new StringWriter(1024)) {
try (final JsonGenerator gen = new JsonFactory().createGenerator(writer)) {
// start of everything
gen.writeStartObject();

gen.writeArrayFieldStart("nodes");

// info per vertex
for (StreamNode node : sg.getStreamNodes()) {
gen.writeStartObject();
gen.writeStringField("id", String.valueOf(node.getId()));
gen.writeNumberField("parallelism", node.getParallelism());
gen.writeStringField("operator", node.getOperatorName());
gen.writeStringField("description", node.getOperatorDescription());
if (jobVertexIdMap.containsKey(node.getId())) {
gen.writeStringField(
"job_vertex_id", jobVertexIdMap.get(node.getId()).toString());
}

// write the input edge properties
gen.writeArrayFieldStart("inputs");

List<StreamEdge> inEdges = node.getInEdges();
for (int inputNum = 0; inputNum < inEdges.size(); inputNum++) {
StreamEdge edge = inEdges.get(inputNum);
gen.writeStartObject();
gen.writeNumberField("num", inputNum);
gen.writeStringField("id", String.valueOf(edge.getSourceId()));
gen.writeStringField("ship_strategy", edge.getPartitioner().toString());
gen.writeStringField("exchange", edge.getExchangeMode().name());
gen.writeEndObject();
}

gen.writeEndArray();

gen.writeEndObject();
}

// end of everything
gen.writeEndArray();
gen.writeEndObject();
}
return writer.toString();
} catch (Exception e) {
throw new RuntimeException("Failed to generate json stream plan", e);
}
}
}
Loading

0 comments on commit 08990c7

Please sign in to comment.