recoveredDirtyJobResults,
- DispatcherBootstrapFactory dispatcherBootstrapFactory,
- PartialDispatcherServicesWithJobPersistenceComponents
- partialDispatcherServicesWithJobPersistenceComponents)
- throws Exception {
- final ExecutionPlan recoveredExecutionPlan = Iterables.getOnlyElement(recoveredJobs, null);
- final JobResult recoveredDirtyJob =
- Iterables.getOnlyElement(recoveredDirtyJobResults, null);
-
- Preconditions.checkArgument(
- recoveredExecutionPlan == null ^ recoveredDirtyJob == null,
- "Either the ExecutionPlan or the recovered JobResult needs to be specified.");
-
- final Configuration configuration =
- partialDispatcherServicesWithJobPersistenceComponents.getConfiguration();
- final String executionModeValue = configuration.get(INTERNAL_CLUSTER_EXECUTION_MODE);
- final ClusterEntrypoint.ExecutionMode executionMode =
- ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
-
- return new MiniDispatcher(
- rpcService,
- fencingToken,
- DispatcherServices.from(
- partialDispatcherServicesWithJobPersistenceComponents,
- JobMasterServiceLeadershipRunnerFactory.INSTANCE,
- CheckpointResourcesCleanupRunnerFactory.INSTANCE),
- recoveredExecutionPlan,
- recoveredDirtyJob,
- dispatcherBootstrapFactory,
- executionMode);
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
deleted file mode 100644
index e74b12632eacd4..00000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
-import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.streaming.api.graph.ExecutionPlan;
-import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.FlinkException;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Mini Dispatcher which is instantiated as the dispatcher component by the {@link
- * JobClusterEntrypoint}.
- *
- * The mini dispatcher is initialized with a single {@link ExecutionPlan} which it runs.
- *
- *
Depending on the {@link ClusterEntrypoint.ExecutionMode}, the mini dispatcher will directly
- * terminate after job completion if its execution mode is {@link
- * ClusterEntrypoint.ExecutionMode#DETACHED}.
- */
-public class MiniDispatcher extends Dispatcher {
-
- private final JobClusterEntrypoint.ExecutionMode executionMode;
- private boolean jobCancelled = false;
-
- public MiniDispatcher(
- RpcService rpcService,
- DispatcherId fencingToken,
- DispatcherServices dispatcherServices,
- @Nullable ExecutionPlan executionPlan,
- @Nullable JobResult recoveredDirtyJob,
- DispatcherBootstrapFactory dispatcherBootstrapFactory,
- JobClusterEntrypoint.ExecutionMode executionMode)
- throws Exception {
- super(
- rpcService,
- fencingToken,
- CollectionUtil.ofNullable(executionPlan),
- CollectionUtil.ofNullable(recoveredDirtyJob),
- dispatcherBootstrapFactory,
- dispatcherServices);
-
- this.executionMode = checkNotNull(executionMode);
- }
-
- @VisibleForTesting
- public MiniDispatcher(
- RpcService rpcService,
- DispatcherId fencingToken,
- DispatcherServices dispatcherServices,
- @Nullable ExecutionPlan executionPlan,
- @Nullable JobResult recoveredDirtyJob,
- DispatcherBootstrapFactory dispatcherBootstrapFactory,
- JobManagerRunnerRegistry jobManagerRunnerRegistry,
- ResourceCleanerFactory resourceCleanerFactory,
- JobClusterEntrypoint.ExecutionMode executionMode)
- throws Exception {
- super(
- rpcService,
- fencingToken,
- CollectionUtil.ofNullable(executionPlan),
- CollectionUtil.ofNullable(recoveredDirtyJob),
- dispatcherBootstrapFactory,
- dispatcherServices,
- jobManagerRunnerRegistry,
- resourceCleanerFactory);
-
- this.executionMode = checkNotNull(executionMode);
- }
-
- @Override
- public CompletableFuture submitJob(ExecutionPlan executionPlan, Duration timeout) {
- final CompletableFuture acknowledgeCompletableFuture =
- super.submitJob(executionPlan, timeout);
-
- acknowledgeCompletableFuture.whenComplete(
- (Acknowledge ignored, Throwable throwable) -> {
- if (throwable != null) {
- onFatalError(
- new FlinkException(
- "Failed to submit job "
- + executionPlan.getJobID()
- + " in job mode.",
- throwable));
- }
- });
-
- return acknowledgeCompletableFuture;
- }
-
- @Override
- public CompletableFuture requestJobResult(JobID jobId, Duration timeout) {
- final CompletableFuture jobResultFuture = super.requestJobResult(jobId, timeout);
-
- if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
- // terminate the MiniDispatcher once we served the first JobResult successfully
- jobResultFuture.thenAccept(
- (JobResult result) -> {
- ApplicationStatus status =
- result.getSerializedThrowable().isPresent()
- ? ApplicationStatus.FAILED
- : ApplicationStatus.SUCCEEDED;
-
- if (!ApplicationStatus.UNKNOWN.equals(result.getApplicationStatus())) {
- log.info(
- "Shutting down cluster because someone retrieved the job result"
- + " and the status is globally terminal.");
- shutDownFuture.complete(status);
- }
- });
- } else {
- log.info("Not shutting down cluster after someone retrieved the job result.");
- }
-
- return jobResultFuture;
- }
-
- @Override
- public CompletableFuture cancelJob(JobID jobId, Duration timeout) {
- jobCancelled = true;
- return super.cancelJob(jobId, timeout);
- }
-
- @Override
- protected void runPostJobGloballyTerminated(JobID jobId, JobStatus jobStatus) {
- super.runPostJobGloballyTerminated(jobId, jobStatus);
-
- if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
- // shut down if job is cancelled or we don't have to wait for the execution
- // result retrieval
- log.info(
- "Shutting down cluster after job with state {}, jobCancelled: {}, executionMode: {}",
- jobStatus,
- jobCancelled,
- executionMode);
- shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
- }
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java
index a490e01f86b2a9..fdd9333df4740a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java
@@ -20,7 +20,6 @@
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
-import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -67,10 +66,4 @@ public static DefaultDispatcherRunnerFactory createSessionRunner(
return new DefaultDispatcherRunnerFactory(
SessionDispatcherLeaderProcessFactoryFactory.create(dispatcherFactory));
}
-
- public static DefaultDispatcherRunnerFactory createJobRunner(
- JobGraphRetriever jobGraphRetriever) {
- return new DefaultDispatcherRunnerFactory(
- JobDispatcherLeaderProcessFactoryFactory.create(jobGraphRetriever));
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java
deleted file mode 100644
index 4a48f821ec06a1..00000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcess.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher.runner;
-
-import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.highavailability.JobResultStore;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.ThrowingExecutionPlanWriter;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.UUID;
-
-/** {@link DispatcherLeaderProcess} implementation for the per-job mode. */
-public class JobDispatcherLeaderProcess extends AbstractDispatcherLeaderProcess {
-
- private final DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory;
-
- @Nullable private final JobGraph jobGraph;
- @Nullable private final JobResult recoveredDirtyJobResult;
-
- private final JobResultStore jobResultStore;
-
- JobDispatcherLeaderProcess(
- UUID leaderSessionId,
- DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory,
- @Nullable JobGraph jobGraph,
- @Nullable JobResult recoveredDirtyJobResult,
- JobResultStore jobResultStore,
- FatalErrorHandler fatalErrorHandler) {
- super(leaderSessionId, fatalErrorHandler);
- this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory;
- this.jobGraph = jobGraph;
- this.recoveredDirtyJobResult = recoveredDirtyJobResult;
- this.jobResultStore = Preconditions.checkNotNull(jobResultStore);
- }
-
- @Override
- protected void onStart() {
- final DispatcherGatewayService dispatcherService =
- dispatcherGatewayServiceFactory.create(
- DispatcherId.fromUuid(getLeaderSessionId()),
- CollectionUtil.ofNullable(jobGraph),
- CollectionUtil.ofNullable(recoveredDirtyJobResult),
- ThrowingExecutionPlanWriter.INSTANCE,
- jobResultStore);
-
- completeDispatcherSetup(dispatcherService);
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactory.java
deleted file mode 100644
index 1716a1e961a191..00000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactory.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher.runner;
-
-import org.apache.flink.runtime.highavailability.JobResultStore;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.UUID;
-
-/** Factory for the {@link JobDispatcherLeaderProcess}. */
-public class JobDispatcherLeaderProcessFactory implements DispatcherLeaderProcessFactory {
-
- private final AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory
- dispatcherGatewayServiceFactory;
-
- @Nullable private final JobGraph jobGraph;
- @Nullable private final JobResult recoveredDirtyJobResult;
-
- private final JobResultStore jobResultStore;
-
- private final FatalErrorHandler fatalErrorHandler;
-
- JobDispatcherLeaderProcessFactory(
- AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory
- dispatcherGatewayServiceFactory,
- @Nullable JobGraph jobGraph,
- @Nullable JobResult recoveredDirtyJobResult,
- JobResultStore jobResultStore,
- FatalErrorHandler fatalErrorHandler) {
- this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory;
- this.jobGraph = jobGraph;
- this.recoveredDirtyJobResult = recoveredDirtyJobResult;
- this.jobResultStore = Preconditions.checkNotNull(jobResultStore);
- this.fatalErrorHandler = fatalErrorHandler;
- }
-
- @Override
- public DispatcherLeaderProcess create(UUID leaderSessionID) {
- return new JobDispatcherLeaderProcess(
- leaderSessionID,
- dispatcherGatewayServiceFactory,
- jobGraph,
- recoveredDirtyJobResult,
- jobResultStore,
- fatalErrorHandler);
- }
-
- @Nullable
- JobGraph getJobGraph() {
- return this.jobGraph;
- }
-
- @Nullable
- JobResult getRecoveredDirtyJobResult() {
- return this.recoveredDirtyJobResult;
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
deleted file mode 100644
index 8d38825bc58d51..00000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher.runner;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
-import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
-import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
-import org.apache.flink.runtime.highavailability.JobResultStore;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-
-/** Factory for the {@link JobDispatcherLeaderProcessFactory}. */
-public class JobDispatcherLeaderProcessFactoryFactory
- implements DispatcherLeaderProcessFactoryFactory {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(JobDispatcherLeaderProcessFactoryFactory.class);
-
- private final JobGraphRetriever jobGraphRetriever;
-
- @VisibleForTesting
- JobDispatcherLeaderProcessFactoryFactory(JobGraphRetriever jobGraphRetriever) {
- this.jobGraphRetriever = jobGraphRetriever;
- }
-
- @Override
- public JobDispatcherLeaderProcessFactory createFactory(
- JobPersistenceComponentFactory jobPersistenceComponentFactory,
- Executor ioExecutor,
- RpcService rpcService,
- PartialDispatcherServices partialDispatcherServices,
- FatalErrorHandler fatalErrorHandler) {
-
- final JobGraph jobGraph;
-
- try {
- jobGraph =
- Preconditions.checkNotNull(
- jobGraphRetriever.retrieveJobGraph(
- partialDispatcherServices.getConfiguration()));
- } catch (FlinkException e) {
- throw new FlinkRuntimeException("Could not retrieve the JobGraph.", e);
- }
-
- final JobResultStore jobResultStore = jobPersistenceComponentFactory.createJobResultStore();
- final Collection recoveredDirtyJobResults = getDirtyJobResults(jobResultStore);
-
- final Optional maybeRecoveredDirtyJobResult =
- extractDirtyJobResult(recoveredDirtyJobResults, jobGraph);
- final Optional maybeJobGraph =
- getJobGraphBasedOnDirtyJobResults(jobGraph, recoveredDirtyJobResults);
-
- final DefaultDispatcherGatewayServiceFactory defaultDispatcherServiceFactory =
- new DefaultDispatcherGatewayServiceFactory(
- JobDispatcherFactory.INSTANCE, rpcService, partialDispatcherServices);
-
- return new JobDispatcherLeaderProcessFactory(
- defaultDispatcherServiceFactory,
- maybeJobGraph.orElse(null),
- maybeRecoveredDirtyJobResult.orElse(null),
- jobResultStore,
- fatalErrorHandler);
- }
-
- public static JobDispatcherLeaderProcessFactoryFactory create(
- JobGraphRetriever jobGraphRetriever) {
- return new JobDispatcherLeaderProcessFactoryFactory(jobGraphRetriever);
- }
-
- private static Collection getDirtyJobResults(JobResultStore jobResultStore) {
- try {
- return jobResultStore.getDirtyResults();
- } catch (IOException e) {
- throw new FlinkRuntimeException(
- "Could not retrieve the JobResults of dirty jobs from the underlying JobResultStore.",
- e);
- }
- }
-
- private static Optional extractDirtyJobResult(
- Collection dirtyJobResults, JobGraph jobGraph) {
- Optional actualDirtyJobResult = Optional.empty();
- for (JobResult dirtyJobResult : dirtyJobResults) {
- if (dirtyJobResult.getJobId().equals(jobGraph.getJobID())) {
- actualDirtyJobResult = Optional.of(dirtyJobResult);
- } else {
- LOG.warn(
- "Unexpected dirty JobResultStore entry: Job '{}' is listed as dirty, isn't part of this single-job cluster, though.",
- dirtyJobResult.getJobId());
- }
- }
-
- return actualDirtyJobResult;
- }
-
- private static Optional getJobGraphBasedOnDirtyJobResults(
- JobGraph jobGraph, Collection dirtyJobResults) {
- final Set jobIdsOfFinishedJobs =
- dirtyJobResults.stream().map(JobResult::getJobId).collect(Collectors.toSet());
- if (jobIdsOfFinishedJobs.contains(jobGraph.getJobID())) {
- LOG.info(
- "Skipping recovery of a job with job id {}, because it already reached a globally terminal state",
- jobGraph.getJobID());
- return Optional.empty();
- }
- return Optional.of(jobGraph);
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 122db8d639fc34..c311d76da73351 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -43,7 +43,6 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
-import org.apache.flink.runtime.dispatcher.MiniDispatcher;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
@@ -743,7 +742,7 @@ public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
System.exit(returnCode);
}
- /** Execution mode of the {@link MiniDispatcher}. */
+ /** Execution mode of the dispatcher. */
public enum ExecutionMode {
/** Waits until the job result has been served. */
NORMAL,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
deleted file mode 100644
index 629fcc58896c54..00000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.entrypoint;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
-import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-
-/**
- * Base class for per-job cluster entry points.
- *
- * @deprecated Per-job mode has been deprecated in Flink 1.15 and will be removed in the future.
- * Please use application mode instead.
- */
-@Deprecated
-public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
-
- public JobClusterEntrypoint(Configuration configuration) {
- super(configuration);
- }
-
- @Override
- protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
- Configuration configuration, ScheduledExecutor scheduledExecutor) {
- return new MemoryExecutionGraphInfoStore();
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index 094d37af3c2d51..c1178130c9f87f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -46,7 +46,6 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl;
-import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
@@ -301,12 +300,4 @@ public static DefaultDispatcherResourceManagerComponentFactory createSessionComp
resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);
}
-
- public static DefaultDispatcherResourceManagerComponentFactory createJobComponentFactory(
- ResourceManagerFactory> resourceManagerFactory, JobGraphRetriever jobGraphRetriever) {
- return new DefaultDispatcherResourceManagerComponentFactory(
- DefaultDispatcherRunnerFactory.createJobRunner(jobGraphRetriever),
- resourceManagerFactory,
- JobRestEndpointFactory.INSTANCE);
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
index ad258ab0d13474..30fe4ef2d12cc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
@@ -20,7 +20,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobService;
-import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
@@ -35,7 +34,7 @@
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
-/** REST endpoint for the {@link JobClusterEntrypoint}. */
+/** REST endpoint for the ApplicationClusterEntryPoint. */
public class MiniDispatcherRestEndpoint extends WebMonitorEndpoint {
public MiniDispatcherRestEndpoint(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
deleted file mode 100644
index 05bd86a53df99f..00000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
-import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
-import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
-import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
-import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
-import org.apache.flink.runtime.rest.JobRestEndpointFactory;
-import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorExtension;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLoggerExtension;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Supplier;
-
-import static java.nio.file.StandardOpenOption.CREATE;
-import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
-
-/** An integration test which recovers from checkpoint after regaining the leadership. */
-@ExtendWith(TestLoggerExtension.class)
-public class JobDispatcherITCase {
- @RegisterExtension
- static final TestExecutorExtension EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorExtension();
-
- private Supplier
- createJobModeDispatcherResourceManagerComponentFactorySupplier(
- Configuration configuration) {
- return () -> {
- try {
- return new DefaultDispatcherResourceManagerComponentFactory(
- new DefaultDispatcherRunnerFactory(
- JobDispatcherLeaderProcessFactoryFactory.create(
- FileJobGraphRetriever.createFrom(configuration, null))),
- StandaloneResourceManagerFactory.getInstance(),
- JobRestEndpointFactory.INSTANCE);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- };
- }
-
- @Test
- public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership(@TempDir Path tmpPath)
- throws Exception {
- final Configuration configuration = new Configuration();
- configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
- final TestingMiniClusterConfiguration clusterConfiguration =
- TestingMiniClusterConfiguration.newBuilder()
- .setConfiguration(configuration)
- .build();
- final EmbeddedHaServicesWithLeadershipControl haServices =
- new EmbeddedHaServicesWithLeadershipControl(EXECUTOR_RESOURCE.getExecutor());
-
- final Configuration newConfiguration =
- new Configuration(clusterConfiguration.getConfiguration());
- final long checkpointInterval = 100;
- final JobID jobID =
- generateAndPersistJobGraph(newConfiguration, checkpointInterval, tmpPath);
-
- final TestingMiniCluster.Builder clusterBuilder =
- TestingMiniCluster.newBuilder(clusterConfiguration)
- .setHighAvailabilityServicesSupplier(() -> haServices)
- .setDispatcherResourceManagerComponentFactorySupplier(
- createJobModeDispatcherResourceManagerComponentFactorySupplier(
- newConfiguration));
- AtLeastOneCheckpointInvokable.reset();
-
- try (final MiniCluster cluster = clusterBuilder.build()) {
- // start mini cluster and submit the job
- cluster.start();
-
- AtLeastOneCheckpointInvokable.atLeastOneCheckpointCompleted.await();
-
- final CompletableFuture firstJobResult = cluster.requestJobResult(jobID);
- // make sure requestJobResult was processed by job master
- cluster.getJobStatus(jobID).get();
-
- haServices.revokeDispatcherLeadership();
- // make sure the leadership is revoked to avoid race conditions
- Assertions.assertEquals(
- ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus());
-
- haServices.grantDispatcherLeadership();
-
- // job is suspended, wait until it's running
- awaitJobStatus(cluster, jobID, JobStatus.RUNNING);
-
- CommonTestUtils.waitUntilCondition(
- () ->
- cluster.getArchivedExecutionGraph(jobID)
- .get()
- .getCheckpointStatsSnapshot()
- .getLatestRestoredCheckpoint()
- != null);
- }
- }
-
- private JobID generateAndPersistJobGraph(
- Configuration configuration, long checkpointInterval, Path tmpPath) throws Exception {
- final JobVertex jobVertex = new JobVertex("jobVertex");
- jobVertex.setInvokableClass(AtLeastOneCheckpointInvokable.class);
- jobVertex.setParallelism(1);
-
- final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration =
- CheckpointCoordinatorConfiguration.builder()
- .setCheckpointInterval(checkpointInterval)
- .build();
- final JobCheckpointingSettings checkpointingSettings =
- new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
- final JobGraph jobGraph =
- JobGraphBuilder.newStreamingJobGraphBuilder()
- .addJobVertex(jobVertex)
- .setJobCheckpointingSettings(checkpointingSettings)
- .build();
-
- final Path jobGraphPath = tmpPath.resolve(JOB_GRAPH_FILE_PATH.defaultValue());
- try (ObjectOutputStream objectOutputStream =
- new ObjectOutputStream(Files.newOutputStream(jobGraphPath, CREATE))) {
- objectOutputStream.writeObject(jobGraph);
- }
- configuration.setString(JOB_GRAPH_FILE_PATH.key(), jobGraphPath.toString());
- return jobGraph.getJobID();
- }
-
- private static void awaitJobStatus(MiniCluster cluster, JobID jobId, JobStatus status)
- throws Exception {
- CommonTestUtils.waitUntilCondition(
- () -> {
- try {
- return cluster.getJobStatus(jobId).get() == status;
- } catch (ExecutionException e) {
- if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class)
- .isPresent()) {
- // job may not be yet submitted
- return false;
- }
- throw e;
- }
- });
- }
-
- /**
- * An invokable that supports checkpointing and counts down when there is at least one
- * checkpoint.
- */
- public static class AtLeastOneCheckpointInvokable
- extends AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable {
-
- private static volatile CountDownLatch atLeastOneCheckpointCompleted;
-
- private static void reset() {
- atLeastOneCheckpointCompleted = new CountDownLatch(1);
- }
-
- public AtLeastOneCheckpointInvokable(Environment environment) {
- super(environment);
- }
-
- @Override
- public Future notifyCheckpointCompleteAsync(long checkpointId) {
- atLeastOneCheckpointCompleted.countDown();
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public Future notifyCheckpointAbortAsync(
- long checkpointId, long latestCompletedCheckpointId) {
- return CompletableFuture.completedFuture(null);
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
deleted file mode 100644
index f0d1b86ffc8544..00000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory;
-import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory;
-import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.TestingJobResultStore;
-import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.concurrent.FutureUtils;
-
-import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeoutException;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-/** Tests for the {@link MiniDispatcher}. */
-public class MiniDispatcherTest extends TestLogger {
-
- private static final Duration timeout = Duration.ofSeconds(10L);
-
- @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Rule
- public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource =
- new TestingFatalErrorHandlerResource();
-
- private static JobGraph jobGraph;
-
- private static ExecutionGraphInfo executionGraphInfo;
-
- private static TestingRpcService rpcService;
-
- private static Configuration configuration;
-
- private static BlobServer blobServer;
-
- private final TestingResourceManagerGateway resourceManagerGateway =
- new TestingResourceManagerGateway();
-
- private final HeartbeatServices heartbeatServices = new HeartbeatServicesImpl(1000L, 1000L);
-
- private final ExecutionGraphInfoStore executionGraphInfoStore =
- new MemoryExecutionGraphInfoStore();
-
- private TestingHighAvailabilityServices highAvailabilityServices;
-
- private TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory;
- private TestingCleanupRunnerFactory testingCleanupRunnerFactory;
-
- private CompletableFuture localCleanupResultFuture;
- private CompletableFuture globalCleanupResultFuture;
-
- @BeforeClass
- public static void setupClass() throws IOException {
- jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
-
- executionGraphInfo =
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobGraph.getJobID())
- .setState(JobStatus.FINISHED)
- .build());
-
- rpcService = new TestingRpcService();
- configuration = new Configuration();
-
- blobServer =
- new BlobServer(configuration, temporaryFolder.newFolder(), new VoidBlobStore());
- }
-
- @Before
- public void setup() throws Exception {
- highAvailabilityServices = new TestingHighAvailabilityServicesBuilder().build();
-
- testingJobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
- testingCleanupRunnerFactory = new TestingCleanupRunnerFactory();
-
- // the default setting shouldn't block the cleanup
- localCleanupResultFuture = FutureUtils.completedVoidFuture();
- globalCleanupResultFuture = FutureUtils.completedVoidFuture();
- }
-
- @AfterClass
- public static void teardownClass()
- throws IOException, InterruptedException, ExecutionException, TimeoutException {
- if (blobServer != null) {
- blobServer.close();
- }
-
- if (rpcService != null) {
- RpcUtils.terminateRpcService(rpcService);
- }
- }
-
- /** Tests that the {@link MiniDispatcher} recovers the single job with which it was started. */
- @Test
- public void testSingleJobRecovery() throws Exception {
- final MiniDispatcher miniDispatcher =
- createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
-
- miniDispatcher.start();
-
- try {
- final TestingJobManagerRunner testingJobManagerRunner =
- testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
-
- assertThat(testingJobManagerRunner.getJobID(), is(jobGraph.getJobID()));
- } finally {
- RpcUtils.terminateRpcEndpoint(miniDispatcher);
- }
- }
-
- /** Tests that the {@link MiniDispatcher} recovers the single job with which it was started. */
- @Test
- public void testDirtyJobResultCleanup() throws Exception {
- final JobID jobId = new JobID();
- final MiniDispatcher miniDispatcher =
- createMiniDispatcher(
- ClusterEntrypoint.ExecutionMode.DETACHED,
- null,
- TestingJobResultStore.createSuccessfulJobResult(jobId));
-
- miniDispatcher.start();
-
- try {
- final TestingJobManagerRunner testingCleanupRunner =
- testingCleanupRunnerFactory.takeCreatedJobManagerRunner();
- assertThat(testingCleanupRunner.getJobID(), is(jobId));
- } finally {
- RpcUtils.terminateRpcEndpoint(miniDispatcher);
- }
- }
-
- /**
- * Tests that in detached mode, the {@link MiniDispatcher} will complete the future that signals
- * job termination.
- */
- @Test
- public void testTerminationAfterJobCompletion() throws Exception {
- globalCleanupResultFuture = new CompletableFuture<>();
- final MiniDispatcher miniDispatcher =
- createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
-
- miniDispatcher.start();
-
- try {
- // wait until we have submitted the job
- final TestingJobManagerRunner testingJobManagerRunner =
- testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
-
- testingJobManagerRunner.completeResultFuture(executionGraphInfo);
-
- CommonTestUtils.waitUntilCondition(
- () ->
- !highAvailabilityServices
- .getJobResultStore()
- .getDirtyResults()
- .isEmpty());
-
- assertFalse(
- "The shutdownFuture should not be completed before the cleanup is triggered.",
- miniDispatcher.getShutDownFuture().isDone());
-
- globalCleanupResultFuture.complete(null);
-
- miniDispatcher.getShutDownFuture().get();
- } finally {
- // we have to complete the future to make the job and, as a consequence, the
- // MiniDispatcher terminate
- globalCleanupResultFuture.complete(null);
- RpcUtils.terminateRpcEndpoint(miniDispatcher);
- }
- }
-
- /**
- * Tests that in detached mode, the {@link MiniDispatcher} will not complete the future that
- * signals job termination if the JobStatus is not globally terminal state.
- */
- @Test
- public void testNotTerminationWithoutGloballyTerminalState() throws Exception {
- final MiniDispatcher miniDispatcher =
- createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
- miniDispatcher.start();
-
- try {
- // wait until we have submitted the job
- final TestingJobManagerRunner testingJobManagerRunner =
- testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
-
- testingJobManagerRunner.completeResultFuture(
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobGraph.getJobID())
- .setState(JobStatus.SUSPENDED)
- .build()));
-
- testingJobManagerRunner.getTerminationFuture().get();
- Assertions.assertThat(miniDispatcher.getShutDownFuture()).isNotDone();
- } finally {
- RpcUtils.terminateRpcEndpoint(miniDispatcher);
- }
- }
-
- /**
- * Tests that the {@link MiniDispatcher} only terminates in {@link
- * ClusterEntrypoint.ExecutionMode#NORMAL} after it has served the {@link
- * org.apache.flink.runtime.jobmaster.JobResult} once.
- */
- @Test
- public void testJobResultRetrieval() throws Exception {
- final MiniDispatcher miniDispatcher =
- createMiniDispatcher(ClusterEntrypoint.ExecutionMode.NORMAL);
-
- miniDispatcher.start();
-
- try {
- // wait until we have submitted the job
- final TestingJobManagerRunner testingJobManagerRunner =
- testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
-
- testingJobManagerRunner.completeResultFuture(executionGraphInfo);
-
- assertFalse(miniDispatcher.getTerminationFuture().isDone());
-
- final DispatcherGateway dispatcherGateway =
- miniDispatcher.getSelfGateway(DispatcherGateway.class);
-
- final CompletableFuture jobResultFuture =
- dispatcherGateway.requestJobResult(jobGraph.getJobID(), timeout);
-
- final JobResult jobResult = jobResultFuture.get();
-
- assertThat(jobResult.getJobId(), is(jobGraph.getJobID()));
- } finally {
- RpcUtils.terminateRpcEndpoint(miniDispatcher);
- }
- }
-
- @Test
- public void testShutdownIfJobCancelledInNormalMode() throws Exception {
- final MiniDispatcher miniDispatcher =
- createMiniDispatcher(ClusterEntrypoint.ExecutionMode.NORMAL);
- miniDispatcher.start();
-
- try {
- // wait until we have submitted the job
- final TestingJobManagerRunner testingJobManagerRunner =
- testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
-
- assertFalse(miniDispatcher.getTerminationFuture().isDone());
-
- final DispatcherGateway dispatcherGateway =
- miniDispatcher.getSelfGateway(DispatcherGateway.class);
-
- dispatcherGateway.cancelJob(jobGraph.getJobID(), Duration.ofSeconds(10L));
- testingJobManagerRunner.completeResultFuture(
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setJobID(jobGraph.getJobID())
- .setState(JobStatus.CANCELED)
- .build()));
-
- ApplicationStatus applicationStatus = miniDispatcher.getShutDownFuture().get();
- assertThat(applicationStatus, is(ApplicationStatus.CANCELED));
- } finally {
- RpcUtils.terminateRpcEndpoint(miniDispatcher);
- }
- }
-
- // --------------------------------------------------------
- // Utilities
- // --------------------------------------------------------
-
- private MiniDispatcher createMiniDispatcher(ClusterEntrypoint.ExecutionMode executionMode)
- throws Exception {
- return createMiniDispatcher(executionMode, jobGraph, null);
- }
-
- private MiniDispatcher createMiniDispatcher(
- ClusterEntrypoint.ExecutionMode executionMode,
- @Nullable JobGraph recoveredJobGraph,
- @Nullable JobResult recoveredDirtyJob)
- throws Exception {
- final JobManagerRunnerRegistry jobManagerRunnerRegistry =
- new DefaultJobManagerRunnerRegistry(2);
- return new MiniDispatcher(
- rpcService,
- DispatcherId.generate(),
- new DispatcherServices(
- configuration,
- highAvailabilityServices,
- () -> CompletableFuture.completedFuture(resourceManagerGateway),
- blobServer,
- heartbeatServices,
- executionGraphInfoStore,
- testingFatalErrorHandlerResource.getFatalErrorHandler(),
- VoidHistoryServerArchivist.INSTANCE,
- null,
- new DispatcherOperationCaches(),
- UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
- highAvailabilityServices.getExecutionPlanStore(),
- highAvailabilityServices.getJobResultStore(),
- testingJobManagerRunnerFactory,
- testingCleanupRunnerFactory,
- ForkJoinPool.commonPool(),
- Collections.emptySet()),
- recoveredJobGraph,
- recoveredDirtyJob,
- (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
- jobManagerRunnerRegistry,
- TestingResourceCleanerFactory.builder()
- // JobManagerRunnerRegistry needs to be added explicitly
- // because cleaning it will trigger the closeAsync latch
- // provided by TestingJobManagerRunner
- .withLocallyCleanableResource(jobManagerRunnerRegistry)
- .withGloballyCleanableResource(
- (jobId, ignoredExecutor) -> globalCleanupResultFuture)
- .withLocallyCleanableResource(
- (jobId, ignoredExecutor) -> localCleanupResultFuture)
- .build(),
- executionMode);
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactoryTest.java
deleted file mode 100644
index b09d5da3cbd29c..00000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactoryTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher.runner;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.dispatcher.TestingPartialDispatcherServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
-import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
-import org.apache.flink.runtime.jobmanager.StandaloneExecutionPlanStore;
-import org.apache.flink.runtime.jobmanager.TestingJobPersistenceComponentFactory;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.testutils.TestingJobResultStore;
-import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.concurrent.Executors;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-@ExtendWith(TestLoggerExtension.class)
-class JobDispatcherLeaderProcessFactoryFactoryTest {
-
- @TempDir private Path temporaryFolder;
-
- @Test
- public void testJobGraphWithoutDirtyJobResult() throws IOException {
- final JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
-
- final JobDispatcherLeaderProcessFactory factory =
- createDispatcherLeaderProcessFactoryFromTestInstance(
- jobGraph, null, temporaryFolder);
-
- assertThat(factory.getJobGraph()).isEqualTo(jobGraph);
- assertThat(factory.getRecoveredDirtyJobResult()).isNull();
- }
-
- @Test
- public void testJobGraphWithMatchingDirtyJobResult() throws IOException {
- final JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
- final JobResult jobResult =
- TestingJobResultStore.createSuccessfulJobResult(jobGraph.getJobID());
-
- final JobDispatcherLeaderProcessFactory factory =
- createDispatcherLeaderProcessFactoryFromTestInstance(
- jobGraph, jobResult, temporaryFolder);
-
- assertThat(factory.getJobGraph()).isNull();
- assertThat(factory.getRecoveredDirtyJobResult()).isEqualTo(jobResult);
- }
-
- @Test
- public void testJobGraphWithNotMatchingDirtyJobResult() throws IOException {
- final JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
- final JobResult jobResult = TestingJobResultStore.createSuccessfulJobResult(new JobID());
-
- final JobDispatcherLeaderProcessFactory factory =
- createDispatcherLeaderProcessFactoryFromTestInstance(
- jobGraph, jobResult, temporaryFolder);
-
- assertThat(factory.getJobGraph()).isEqualTo(jobGraph);
- assertThat(factory.getRecoveredDirtyJobResult()).isNull();
- }
-
- @Test
- public void testMissingJobGraph() throws IOException {
- assertThatThrownBy(
- () ->
- createDispatcherLeaderProcessFactoryFromTestInstance(
- null,
- TestingJobResultStore.createSuccessfulJobResult(
- new JobID()),
- temporaryFolder))
- .isInstanceOf(NullPointerException.class);
- }
-
- private static JobDispatcherLeaderProcessFactory
- createDispatcherLeaderProcessFactoryFromTestInstance(
- @Nullable JobGraph jobGraph,
- @Nullable JobResult dirtyJobResult,
- Path storageDir)
- throws IOException {
- final JobDispatcherLeaderProcessFactoryFactory testInstance =
- new JobDispatcherLeaderProcessFactoryFactory(ignoredConfig -> jobGraph);
-
- final TestingJobResultStore jobResultStore =
- TestingJobResultStore.builder()
- .withGetDirtyResultsSupplier(
- () -> CollectionUtil.ofNullable(dirtyJobResult))
- .build();
- final ExecutionPlanStore executionPlanStore = new StandaloneExecutionPlanStore();
- return testInstance.createFactory(
- new TestingJobPersistenceComponentFactory(executionPlanStore, jobResultStore),
- Executors.directExecutor(),
- new TestingRpcService(),
- TestingPartialDispatcherServices.builder()
- .withHighAvailabilityServices(
- new TestingHighAvailabilityServicesBuilder()
- .setExecutionPlanStore(executionPlanStore)
- .setJobResultStore(jobResultStore)
- .build())
- .build(storageDir.toFile(), new Configuration()),
- NoOpFatalErrorHandler.INSTANCE);
- }
-}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
deleted file mode 100644
index 387343ffc7cb1b..00000000000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.RpcOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.yarn.util.TestUtils;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test cases for the deployment of Yarn Flink clusters with customized file replication numbers.
- */
-class YARNFileReplicationITCase extends YarnTestBase {
-
- private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
- private static final int sleepIntervalInMS = 100;
-
- @BeforeAll
- static void setup() {
- YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
- startYARNWithConfig(YARN_CONFIGURATION, true);
- }
-
- @Test
- void testPerJobModeWithCustomizedFileReplication() throws Exception {
- final Configuration configuration = getDefaultConfiguration();
- configuration.set(YarnConfigOptions.FILE_REPLICATION, 4);
-
- runTest(() -> deployPerJob(configuration, getTestingJobGraph()));
- }
-
- @Test
- void testPerJobModeWithDefaultFileReplication() throws Exception {
- runTest(() -> deployPerJob(getDefaultConfiguration(), getTestingJobGraph()));
- }
-
- private void deployPerJob(Configuration configuration, JobGraph jobGraph) throws Exception {
- try (final YarnClusterDescriptor yarnClusterDescriptor =
- createYarnClusterDescriptor(configuration)) {
-
- yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
- yarnClusterDescriptor.addShipFiles(
- Arrays.stream(Objects.requireNonNull(flinkLibFolder.listFiles()))
- .map(file -> new Path(file.toURI()))
- .collect(Collectors.toList()));
-
- final int masterMemory =
- yarnClusterDescriptor
- .getFlinkConfiguration()
- .get(JobManagerOptions.TOTAL_PROCESS_MEMORY)
- .getMebiBytes();
- final ClusterSpecification clusterSpecification =
- new ClusterSpecification.ClusterSpecificationBuilder()
- .setMasterMemoryMB(masterMemory)
- .setTaskManagerMemoryMB(1024)
- .setSlotsPerTaskManager(1)
- .createClusterSpecification();
-
- File testingJar =
- TestUtils.findFile("..", new TestUtils.TestJarFinder("flink-yarn-tests"));
-
- jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
- try (ClusterClient clusterClient =
- yarnClusterDescriptor
- .deployJobCluster(clusterSpecification, jobGraph, false)
- .getClusterClient()) {
-
- ApplicationId applicationId = clusterClient.getClusterId();
-
- extraVerification(configuration, applicationId);
-
- final CompletableFuture jobResultCompletableFuture =
- clusterClient.requestJobResult(jobGraph.getJobID());
-
- final JobResult jobResult = jobResultCompletableFuture.get();
-
- assertThat(jobResult).isNotNull();
- jobResult
- .getSerializedThrowable()
- .ifPresent(
- serializedThrowable -> {
- throw new AssertionError(
- "Job failed",
- serializedThrowable.deserializeError(
- YARNFileReplicationITCase.class
- .getClassLoader()));
- });
-
- waitApplicationFinishedElseKillIt(
- applicationId,
- yarnAppTerminateTimeout,
- yarnClusterDescriptor,
- sleepIntervalInMS);
- }
- }
- }
-
- private JobGraph getTestingJobGraph() {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
-
- env.addSource(new NoDataSource()).shuffle().sinkTo(new DiscardingSink<>());
-
- return env.getStreamGraph().getJobGraph();
- }
-
- private Configuration getDefaultConfiguration() {
- final Configuration configuration = new Configuration();
- configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768));
- configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
- configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30));
- configuration.set(CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED);
-
- return configuration;
- }
-
- private void extraVerification(Configuration configuration, ApplicationId applicationId)
- throws Exception {
- final FileSystem fs = FileSystem.get(getYarnConfiguration());
-
- String suffix = ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName();
-
- Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix);
-
- assertTrue(
- fs.exists(uberJarHDFSPath),
- "The Flink uber jar needs to exist. If it does not exist, then this "
- + "indicates that the Flink cluster has already terminated and Yarn has "
- + "already deleted the working directory.");
-
- FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath);
-
- final int flinkFileReplication = configuration.get(YarnConfigOptions.FILE_REPLICATION);
- final int replication =
- YARN_CONFIGURATION.getInt(
- DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT);
-
- // If YarnConfigOptions.FILE_REPLICATION is not set. The replication number should equals to
- // yarn configuration value.
- int expectedReplication = flinkFileReplication > 0 ? flinkFileReplication : replication;
- assertThat((int) fsStatus.getReplication()).isEqualTo(expectedReplication);
- }
-}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
deleted file mode 100644
index d975b1fd27f86c..00000000000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.RpcOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobType;
-import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.yarn.testjob.YarnTestArchiveJob;
-import org.apache.flink.yarn.testjob.YarnTestCacheJob;
-import org.apache.flink.yarn.util.TestUtils;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test cases for the deployment of Yarn Flink clusters. */
-class YARNITCase extends YarnTestBase {
-
- private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
- private static final int sleepIntervalInMS = 100;
-
- @BeforeAll
- static void setup() {
- YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
- startYARNWithConfig(YARN_CONFIGURATION, true);
- }
-
- @Test
- void testPerJobModeWithEnableSystemClassPathIncludeUserJar() throws Exception {
- runTest(
- () ->
- deployPerJob(
- createDefaultConfiguration(
- YarnConfigOptions.UserJarInclusion.FIRST),
- getTestingJobGraph(),
- true));
- }
-
- @Test
- void testPerJobModeWithDisableSystemClassPathIncludeUserJar() throws Exception {
- runTest(
- () ->
- deployPerJob(
- createDefaultConfiguration(
- YarnConfigOptions.UserJarInclusion.DISABLED),
- getTestingJobGraph(),
- true));
- }
-
- @Test
- void testPerJobModeWithDistributedCache(@TempDir File tempDir) throws Exception {
- runTest(
- () ->
- deployPerJob(
- createDefaultConfiguration(
- YarnConfigOptions.UserJarInclusion.DISABLED),
- YarnTestCacheJob.getDistributedCacheJobGraph(tempDir),
- true));
- }
-
- @Test
- void testPerJobWithProvidedLibDirs() throws Exception {
- final Path remoteLib =
- new Path(
- miniDFSCluster.getFileSystem().getUri().toString() + "/flink-provided-lib");
- miniDFSCluster
- .getFileSystem()
- .copyFromLocalFile(new Path(flinkLibFolder.toURI()), remoteLib);
- miniDFSCluster.getFileSystem().setPermission(remoteLib, new FsPermission("755"));
-
- final Configuration flinkConfig =
- createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
- flinkConfig.set(
- YarnConfigOptions.PROVIDED_LIB_DIRS,
- Collections.singletonList(remoteLib.toString()));
- runTest(() -> deployPerJob(flinkConfig, getTestingJobGraph(), false));
- }
-
- @Test
- void testPerJobWithArchive(@TempDir File tempDir) throws Exception {
- final Configuration flinkConfig =
- createDefaultConfiguration(YarnConfigOptions.UserJarInclusion.DISABLED);
- final JobGraph archiveJobGraph =
- YarnTestArchiveJob.getArchiveJobGraph(tempDir, flinkConfig);
- runTest(() -> deployPerJob(flinkConfig, archiveJobGraph, true));
- }
-
- private void deployPerJob(Configuration configuration, JobGraph jobGraph, boolean withDist)
- throws Exception {
- jobGraph.setJobType(JobType.STREAMING);
- try (final YarnClusterDescriptor yarnClusterDescriptor =
- withDist
- ? createYarnClusterDescriptor(configuration)
- : createYarnClusterDescriptorWithoutLibDir(configuration)) {
-
- final int masterMemory =
- yarnClusterDescriptor
- .getFlinkConfiguration()
- .get(JobManagerOptions.TOTAL_PROCESS_MEMORY)
- .getMebiBytes();
- final ClusterSpecification clusterSpecification =
- new ClusterSpecification.ClusterSpecificationBuilder()
- .setMasterMemoryMB(masterMemory)
- .setTaskManagerMemoryMB(1024)
- .setSlotsPerTaskManager(1)
- .createClusterSpecification();
-
- File testingJar =
- TestUtils.findFile("..", new TestUtils.TestJarFinder("flink-yarn-tests"));
-
- jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
- try (ClusterClient clusterClient =
- yarnClusterDescriptor
- .deployJobCluster(clusterSpecification, jobGraph, false)
- .getClusterClient()) {
-
- for (DistributedCache.DistributedCacheEntry entry :
- jobGraph.getUserArtifacts().values()) {
- assertThat(Utils.isRemotePath(entry.filePath)).isTrue();
- }
-
- ApplicationId applicationId = clusterClient.getClusterId();
-
- final CompletableFuture jobResultCompletableFuture =
- clusterClient.requestJobResult(jobGraph.getJobID());
-
- final JobResult jobResult = jobResultCompletableFuture.get();
-
- assertThat(jobResult).isNotNull();
- assertThat(jobResult.getSerializedThrowable()).isNotPresent();
-
- checkStagingDirectory(configuration, applicationId);
-
- waitApplicationFinishedElseKillIt(
- applicationId,
- yarnAppTerminateTimeout,
- yarnClusterDescriptor,
- sleepIntervalInMS);
- }
- }
- }
-
- private void checkStagingDirectory(Configuration flinkConfig, ApplicationId appId)
- throws IOException {
- final List providedLibDirs = flinkConfig.get(YarnConfigOptions.PROVIDED_LIB_DIRS);
- final boolean isProvidedLibDirsConfigured =
- providedLibDirs != null && !providedLibDirs.isEmpty();
-
- try (final FileSystem fs = FileSystem.get(YARN_CONFIGURATION)) {
- final Path stagingDirectory =
- new Path(fs.getHomeDirectory(), ".flink/" + appId.toString());
- if (isProvidedLibDirsConfigured) {
- assertThat(fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())))
- .isFalse();
- } else {
- assertThat(fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())))
- .isTrue();
- }
- }
- }
-
- private JobGraph getTestingJobGraph() {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
-
- env.addSource(new NoDataSource()).shuffle().sinkTo(new DiscardingSink<>());
-
- return env.getStreamGraph().getJobGraph();
- }
-
- private Configuration createDefaultConfiguration(
- YarnConfigOptions.UserJarInclusion userJarInclusion) {
- Configuration configuration = new Configuration();
- configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768));
- configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
- configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30));
- configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
-
- return configuration;
- }
-}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 9af20b0d2e6c76..c10f947dc1f382 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -51,10 +51,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
@@ -95,6 +93,9 @@ class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
+ private static final ApplicationId TEST_YARN_APPLICATION_ID =
+ ApplicationId.newInstance(System.currentTimeMillis(), 42);
+
/** RestClient to query Flink cluster. */
private static RestClient restClient;
@@ -173,100 +174,6 @@ void testStartYarnSessionClusterInQaTeamQueue() throws Exception {
0));
}
- /**
- * Test per-job yarn cluster
- *
- * This also tests the prefixed CliFrontend options for the YARN case We also test if the
- * requested parallelism of 2 is passed through. The parallelism is requested at the YARN client
- * (-ys).
- */
- @Test
- @Disabled("per-job mode is deprecated, we will be removed in 2.0 version.")
- void perJobYarnCluster() throws Exception {
- runTest(
- () -> {
- LOG.info("Starting perJobYarnCluster()");
- File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
- runWithArgs(
- new String[] {
- "run",
- "-m",
- "yarn-cluster",
- "-yj",
- flinkUberjar.getAbsolutePath(),
- "-yt",
- flinkLibFolder.getAbsolutePath(),
- "-ys",
- "2", // test that the job is executed with a DOP of 2
- "-yjm",
- "768m",
- "-ytm",
- "1024m",
- exampleJarLocation.getAbsolutePath()
- },
- /* test succeeded after this string */
- "Program execution finished",
- /* prohibited strings: (to verify the parallelism) */
- // (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)"
- // instead)
- new String[] {"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
- RunTypes.CLI_FRONTEND,
- 0,
- cliLoggerAuditingExtension::getMessages);
- LOG.info("Finished perJobYarnCluster()");
- });
- }
-
- /**
- * Test per-job yarn cluster and memory calculations for off-heap use (see FLINK-7400) with the
- * same job as {@link #perJobYarnCluster()}.
- *
- *
This ensures that with (any) pre-allocated off-heap memory by us, there is some off-heap
- * memory remaining for Flink's libraries. Creating task managers will thus fail if no off-heap
- * memory remains.
- */
- @Test
- @Disabled("per-job mode is deprecated, we will be removed in 2.0 version.")
- void perJobYarnClusterOffHeap() throws Exception {
- runTest(
- () -> {
- LOG.info("Starting perJobYarnCluster()");
- File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
-
- // set memory constraints (otherwise this is the same test as
- // perJobYarnCluster() above)
- final long taskManagerMemoryMB = 1024;
-
- runWithArgs(
- new String[] {
- "run",
- "-m",
- "yarn-cluster",
- "-yj",
- flinkUberjar.getAbsolutePath(),
- "-yt",
- flinkLibFolder.getAbsolutePath(),
- "-ys",
- "2", // test that the job is executed with a DOP of 2
- "-yjm",
- "768m",
- "-ytm",
- taskManagerMemoryMB + "m",
- exampleJarLocation.getAbsolutePath()
- },
- /* test succeeded after this string */
- "Program execution finished",
- /* prohibited strings: (to verify the parallelism) */
- // (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)"
- // instead)
- new String[] {"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
- RunTypes.CLI_FRONTEND,
- 0,
- cliLoggerAuditingExtension::getMessages);
- LOG.info("Finished perJobYarnCluster()");
- });
- }
-
/**
* Starts a session cluster on YARN, and submits a streaming job.
*
@@ -480,81 +387,6 @@ void testNonexistingQueueWARNmessage() throws Exception {
});
}
- /**
- * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN
- * client.
- */
- @Test
- @Disabled("per-job mode is deprecated, we will be removed in 2.0 version.")
- void perJobYarnClusterWithParallelism() throws Exception {
- runTest(
- () -> {
- LOG.info("Starting perJobYarnClusterWithParallelism()");
- File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
- runWithArgs(
- new String[] {
- "run",
- "-p",
- "2", // test that the job is executed with a DOP of 2
- "-m",
- "yarn-cluster",
- "-yj",
- flinkUberjar.getAbsolutePath(),
- "-yt",
- flinkLibFolder.getAbsolutePath(),
- "-ys",
- "2",
- "-yjm",
- "768m",
- "-ytm",
- "1024m",
- exampleJarLocation.getAbsolutePath()
- },
- /* test succeeded after this string */
- "Program execution finished",
- /* prohibited strings: (we want to see "DataSink (...) (2/2) switched to FINISHED") */
- new String[] {"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
- RunTypes.CLI_FRONTEND,
- 0,
- cliLoggerAuditingExtension::getMessages);
- LOG.info("Finished perJobYarnClusterWithParallelism()");
- });
- }
-
- /** Test a fire-and-forget job submission to a YARN cluster. */
- @Test
- @Disabled("per-job mode is deprecated, we will be removed in 2.0 version.")
- void testDetachedPerJobYarnCluster(@TempDir File tempDir) throws Exception {
- runTest(
- () -> {
- LOG.info("Starting testDetachedPerJobYarnCluster()");
-
- File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
-
- testDetachedPerJobYarnClusterInternal(
- tempDir, exampleJarLocation.getAbsolutePath());
-
- LOG.info("Finished testDetachedPerJobYarnCluster()");
- });
- }
-
- /** Test a fire-and-forget job submission to a YARN cluster. */
- @Test
- @Disabled("per-job mode is deprecated, we will be removed in 2.0 version.")
- void testDetachedPerJobYarnClusterWithStreamingJob(@TempDir File tempDir) throws Exception {
- runTest(
- () -> {
- LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
-
- File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
-
- testDetachedPerJobYarnClusterInternal(
- tempDir, exampleJarLocation.getAbsolutePath());
-
- LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
- });
- }
-
private void testDetachedPerJobYarnClusterInternal(File tempDir, String job) throws Exception {
YarnClient yc = YarnClient.createYarnClient();
yc.init(YARN_CONFIGURATION);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 5a506732ed8aab..15b310245a782c 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -19,16 +19,16 @@
package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -47,7 +47,6 @@
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
-import java.io.File;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
@@ -79,6 +78,10 @@ void testFlinkContainerMemory() throws Exception {
configuration.set(
JobManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(masterMemory));
+ configuration.set(DeploymentOptions.TARGET, "yarn-application");
+ configuration.setString(
+ PipelineOptions.JARS.key(),
+ getTestJarPath("WindowJoin.jar").getAbsolutePath());
final TaskExecutorProcessSpec tmResourceSpec =
TaskExecutorProcessUtils.processSpecFromConfig(configuration);
@@ -100,14 +103,6 @@ void testFlinkContainerMemory() throws Exception {
.map(file -> new Path(file.toURI()))
.collect(Collectors.toList()));
- final File streamingWordCountFile = getTestJarPath("WindowJoin.jar");
-
- final PackagedProgram packagedProgram =
- PackagedProgram.newBuilder().setJarFile(streamingWordCountFile).build();
- final JobGraph jobGraph =
- PackagedProgramUtils.createJobGraph(
- packagedProgram, configuration, 1, false);
-
try {
final ClusterSpecification clusterSpecification =
new ClusterSpecification.ClusterSpecificationBuilder()
@@ -118,7 +113,10 @@ void testFlinkContainerMemory() throws Exception {
final ClusterClient clusterClient =
clusterDescriptor
- .deployJobCluster(clusterSpecification, jobGraph, true)
+ .deployApplicationCluster(
+ clusterSpecification,
+ ApplicationConfiguration.fromConfiguration(
+ configuration))
.getClusterClient();
final ApplicationId clusterId = clusterClient.getClusterId();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index c1c77c197d0c23..c6da95e9bb705e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -19,8 +19,6 @@
package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
@@ -71,7 +69,6 @@
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint;
-import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
import org.apache.hadoop.fs.FileStatus;
@@ -313,14 +310,6 @@ protected String getYarnSessionClusterEntrypoint() {
return YarnSessionClusterEntrypoint.class.getName();
}
- /**
- * The class to start the application master with. This class runs the main method in case of
- * the job cluster.
- */
- protected String getYarnJobClusterEntrypoint() {
- return YarnJobClusterEntrypoint.class.getName();
- }
-
public Configuration getFlinkConfiguration() {
return flinkConfiguration;
}
@@ -550,25 +539,6 @@ public ClusterClientProvider deployApplicationCluster(
}
}
- @Override
- public ClusterClientProvider deployJobCluster(
- ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
- throws ClusterDeploymentException {
-
- LOG.warn(
- "Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
- try {
- return deployInternal(
- clusterSpecification,
- "Flink per-job cluster",
- getYarnJobClusterEntrypoint(),
- jobGraph,
- detached);
- } catch (Exception e) {
- throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
- }
- }
-
@Override
public void killCluster(ApplicationId applicationId) throws FlinkException {
try {
@@ -943,23 +913,6 @@ private ApplicationReport startAppMaster(
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}
- // only for per job mode
- if (jobGraph != null) {
- for (Map.Entry entry :
- jobGraph.getUserArtifacts().entrySet()) {
- // only upload local files
- if (!Utils.isRemotePath(entry.getValue().filePath)) {
- Path localPath = new Path(entry.getValue().filePath);
- Tuple2 remoteFileInfo =
- fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
- jobGraph.setUserArtifactRemotePath(
- entry.getKey(), remoteFileInfo.f0.toString());
- }
- }
-
- jobGraph.writeUserArtifactEntriesToConfiguration();
- }
-
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
addLibFoldersToShipFiles(systemShipFiles);
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java
index e7e2891c3506d2..6bedf795128e96 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java
@@ -22,7 +22,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.commons.cli.CommandLine;
@@ -59,9 +58,7 @@ public boolean isActive(CommandLine commandLine) {
|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
final boolean hasYarnExecutor =
YarnSessionClusterExecutor.NAME.equalsIgnoreCase(
- configuration.get(DeploymentOptions.TARGET))
- || YarnJobClusterExecutor.NAME.equalsIgnoreCase(
- configuration.get(DeploymentOptions.TARGET));
+ configuration.get(DeploymentOptions.TARGET));
return hasYarnExecutor || yarnJobManager || hasYarnAppId;
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 4386ebc3c8958d..66fe7b1b5ea199 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -48,7 +48,6 @@
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
-import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.commons.cli.CommandLine;
@@ -404,10 +403,8 @@ public Configuration toConfiguration(CommandLine commandLine) throws FlinkExcept
effectiveConfiguration.set(HA_CLUSTER_ID, zooKeeperNamespace);
effectiveConfiguration.set(YarnConfigOptions.APPLICATION_ID, applicationId.toString());
- effectiveConfiguration.set(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
- } else {
- effectiveConfiguration.set(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
}
+ effectiveConfiguration.set(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
if (commandLine.hasOption(jmMemory.getOpt())) {
String jmMemoryVal = commandLine.getOptionValue(jmMemory.getOpt());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnDeploymentTarget.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnDeploymentTarget.java
index c7d22c77ca2822..672da106aa4193 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnDeploymentTarget.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnDeploymentTarget.java
@@ -31,10 +31,7 @@
@Internal
public enum YarnDeploymentTarget {
SESSION("yarn-session"),
- APPLICATION("yarn-application"),
-
- @Deprecated
- PER_JOB("yarn-per-job");
+ APPLICATION("yarn-application");
public static final String ERROR_MESSAGE =
"No Executor found. Please make sure to export the HADOOP_CLASSPATH environment variable "
@@ -81,9 +78,7 @@ private static YarnDeploymentTarget getFromName(final String deploymentTarget) {
return null;
}
- if (PER_JOB.name.equalsIgnoreCase(deploymentTarget)) {
- return PER_JOB;
- } else if (SESSION.name.equalsIgnoreCase(deploymentTarget)) {
+ if (SESSION.name.equalsIgnoreCase(deploymentTarget)) {
return SESSION;
} else if (APPLICATION.name.equalsIgnoreCase(deploymentTarget)) {
return APPLICATION;
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 820a91e93e4384..bfe36d5e7502ab 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -41,10 +41,7 @@
import static org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils.tryFindUserLibDirectory;
import static org.apache.flink.util.Preconditions.checkState;
-/**
- * This class contains utility methods for the {@link YarnSessionClusterEntrypoint} and {@link
- * YarnJobClusterEntrypoint}.
- */
+/** This class contains utility methods for the {@link YarnSessionClusterEntrypoint}. */
public class YarnEntrypointUtils {
public static Configuration loadConfiguration(
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
deleted file mode 100644
index 04a48c7d5cd976..00000000000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn.entrypoint;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
-import org.apache.flink.runtime.entrypoint.DynamicParametersConfigurationParserFactory;
-import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
-import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Entry point for Yarn per-job clusters.
- *
- * @deprecated Per-mode has been deprecated in Flink 1.15 and will be removed in the future. Please
- * use application mode instead.
- */
-@Deprecated
-public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
-
- public YarnJobClusterEntrypoint(Configuration configuration) {
- super(configuration);
- }
-
- @Override
- protected String getRPCPortRange(Configuration configuration) {
- return configuration.get(YarnConfigOptions.APPLICATION_MASTER_PORT);
- }
-
- @Override
- protected DefaultDispatcherResourceManagerComponentFactory
- createDispatcherResourceManagerComponentFactory(Configuration configuration)
- throws IOException {
- return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
- YarnResourceManagerFactory.getInstance(),
- FileJobGraphRetriever.createFrom(
- configuration,
- YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null)));
- }
-
- // ------------------------------------------------------------------------
- // The executable entry point for the Yarn Application Master Process
- // for a single Flink job.
- // ------------------------------------------------------------------------
-
- public static void main(String[] args) {
-
- LOG.warn(
- "Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
-
- // startup checks and logging
- EnvironmentInformation.logEnvironmentInfo(
- LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
- SignalHandler.register(LOG);
- JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
- Map env = System.getenv();
-
- final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
- Preconditions.checkArgument(
- workingDirectory != null,
- "Working directory variable (%s) not set",
- ApplicationConstants.Environment.PWD.key());
-
- try {
- YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
- } catch (IOException e) {
- LOG.warn("Could not log YARN environment information.", e);
- }
-
- final Configuration dynamicParameters =
- ClusterEntrypointUtils.parseParametersOrExit(
- args,
- new DynamicParametersConfigurationParserFactory(),
- YarnJobClusterEntrypoint.class);
- final Configuration configuration =
- YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);
-
- YarnJobClusterEntrypoint yarnJobClusterEntrypoint =
- new YarnJobClusterEntrypoint(configuration);
-
- ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
- }
-}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
deleted file mode 100644
index f797a4c9791c03..00000000000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn.executors;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor;
-import org.apache.flink.core.execution.PipelineExecutor;
-import org.apache.flink.yarn.YarnClusterClientFactory;
-import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-/**
- * The {@link PipelineExecutor} to be used when executing a job in isolation. This executor will
- * start a cluster specifically for the job at hand and tear it down when the job is finished either
- * successfully or due to an error.
- */
-@Internal
-@Deprecated
-public class YarnJobClusterExecutor
- extends AbstractJobClusterExecutor {
-
- public static final String NAME = YarnDeploymentTarget.PER_JOB.getName();
-
- public YarnJobClusterExecutor() {
- super(new YarnClusterClientFactory());
- }
-}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
deleted file mode 100644
index 4ef44b76826a83..00000000000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn.executors;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.core.execution.PipelineExecutor;
-import org.apache.flink.core.execution.PipelineExecutorFactory;
-import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
-
-import javax.annotation.Nonnull;
-
-/** An {@link PipelineExecutorFactory} for executing jobs on dedicated (per-job) clusters. */
-@Internal
-@Deprecated
-public class YarnJobClusterExecutorFactory implements PipelineExecutorFactory {
-
- @Override
- public String getName() {
- return YarnJobClusterExecutor.NAME;
- }
-
- @Override
- public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
- return YarnJobClusterExecutor.NAME.equalsIgnoreCase(
- configuration.get(DeploymentOptions.TARGET));
- }
-
- @Override
- public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
- try {
- return new YarnJobClusterExecutor();
- } catch (NoClassDefFoundError e) {
- throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
- }
- }
-}
diff --git a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
index 6899f90d2546af..b3e53fc8504512 100644
--- a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
@@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index cc2b719107e3bb..29b9e8f2e1ceac 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -40,7 +40,6 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -180,21 +179,6 @@ void testNodeLabelProperty() throws Exception {
assertThat(descriptor.getNodeLabel()).isEqualTo(nodeLabelCliInput);
}
- @Test
- void testExecutorCLIisPrioritised() throws Exception {
- final File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
-
- final Configuration configuration = new Configuration();
- configuration.set(
- YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
-
- validateYarnCLIisActive(configuration);
-
- final String[] argsUnderTest = new String[] {"-e", YarnJobClusterExecutor.NAME};
-
- validateExecutorCLIisPrioritised(configuration, argsUnderTest);
- }
-
private void validateExecutorCLIisPrioritised(
Configuration configuration, String[] argsUnderTest)
throws IOException, CliArgsException {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
index 8c51c3184b2a8e..db6c3efd24ffdb 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -23,7 +23,6 @@
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -34,11 +33,6 @@
/** Test for the {@link YarnClusterClientFactory} discovery. */
class YarnClusterClientFactoryTest {
- @Test
- void testYarnClusterClientFactoryDiscoveryWithPerJobExecutor() {
- testYarnClusterClientFactoryDiscoveryHelper(YarnJobClusterExecutor.NAME);
- }
-
@Test
void testYarnClusterClientFactoryDiscoveryWithSessionExecutor() {
testYarnClusterClientFactoryDiscoveryHelper(YarnSessionClusterExecutor.NAME);
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypointTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypointTest.java
deleted file mode 100644
index 9d3eaa52305715..00000000000000
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypointTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn.entrypoint;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Tests for {@link YarnJobClusterEntrypoint}. */
-class YarnJobClusterEntrypointTest {
-
- @Test
- void testCreateDispatcherResourceManagerComponentFactoryFailIfUsrLibDirDoesNotExist() {
- final Configuration configuration = new Configuration();
- configuration.set(
- YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR,
- YarnConfigOptions.UserJarInclusion.DISABLED);
- final YarnJobClusterEntrypoint yarnJobClusterEntrypoint =
- new YarnJobClusterEntrypoint(configuration);
- assertThatThrownBy(
- () ->
- yarnJobClusterEntrypoint
- .createDispatcherResourceManagerComponentFactory(
- configuration))
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("the usrlib directory does not exist.");
- }
-}