Skip to content

Commit

Permalink
[FLINK-36310] Remove per-job mode
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup committed Jan 2, 2025
1 parent 08990c7 commit 484f211
Show file tree
Hide file tree
Showing 53 changed files with 53 additions and 2,731 deletions.
17 changes: 1 addition & 16 deletions docs/content.zh/docs/concepts/flink-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ _JobManager_ 具有许多与协调 Flink 应用程序的分布式执行有关的

_Flink 应用程序_ 是从其 ``main()`` 方法产生的一个或多个 Flink 作业的任何用户程序。这些作业的执行可以在本地 JVM(`LocalEnvironment`)中进行,或具有多台机器的集群的远程设置(``RemoteEnvironment``)中进行。对于每个程序,`ExecutionEnvironment` 提供了一些方法来控制作业执行(例如设置并行度)并与外界交互(请参考 [Flink 程序剖析]({{< ref "docs/dev/datastream/overview" >}}#anatomy-of-a-flink-program) )。

Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集群]({{< ref "docs/concepts/glossary" >}}#flink-session-cluster)、专用的 [Flink Job 集群]({{< ref "docs/concepts/glossary" >}}#flink-job-cluster)[Flink Application 集群]({{< ref "docs/concepts/glossary" >}}#flink-application-cluster)。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。
Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集群]({{< ref "docs/concepts/glossary" >}}#flink-session-cluster) 或 [Flink Application 集群]({{< ref "docs/concepts/glossary" >}}#flink-application-cluster)。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。

### Flink Session 集群

Expand All @@ -111,21 +111,6 @@ Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集
以前,Flink Session 集群也被称为 <i> session 模式</i>下的 Flink 集群。
{{< /hint >}}

### Flink Job 集群

* **集群生命周期**:在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。

* **资源隔离**:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。

* **其他注意事项**:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。

{{< hint info >}}
以前,Flink Job 集群也被称为<i> job (or per-job) 模式</i>下的 Flink 集群。
{{< /hint >}}
{{< hint info >}}
Kubernetes 不支持 Flink Job 集群。 请参考 [Standalone Kubernetes]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}#per-job-cluster-mode) 和 [Native Kubernetes]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#per-job-cluster-mode)。
{{< /hint >}}

### Flink Application 集群

* **集群生命周期**:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 ``main()``方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(``ApplicationClusterEntryPoint``)负责调用 ``main()``方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。
Expand Down
15 changes: 3 additions & 12 deletions docs/content.zh/docs/deployment/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ $ export JOB_ID="cca7bc1061d61cf15238e92312c2fc20"
The `run` command support passing additional configuration parameters via the
`-D` argument. For example setting the [maximum parallelism]({{< ref "docs/deployment/config#pipeline-max-parallelism" >}}#application-mode)
for a job can be done by setting `-Dpipeline.max-parallelism=120`. This argument is very useful for
configuring per-job or application mode clusters, because you can pass any configuration parameter
configuring application mode clusters, because you can pass any configuration parameter
to the cluster, without changing the configuration file.

When submitting a job to an existing session cluster, only [execution configuration parameters]({{< ref "docs/deployment/config#execution" >}}) are supported.
Expand Down Expand Up @@ -403,13 +403,11 @@ Resource Provider section. Jobs can be submitted in different [Deployment Modes]
The parameterization of a job submission differs based on the underlying framework and Deployment Mode.

`bin/flink` offers a parameter `--target` to handle the different options. In addition to that, jobs
have to be submitted using `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode),
[Per-Job Mode]({{< ref "docs/deployment/overview" >}}#per-job-mode) and
[Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of
have to be submitted using `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode)
and [Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of
parameter combinations:
* YARN
* `./bin/flink run --target yarn-session`: Submission to an already running Flink on YARN cluster
* `./bin/flink run --target yarn-per-job`: Submission spinning up a Flink on YARN cluster in Per-Job Mode
* `./bin/flink run --target yarn-application`: Submission spinning up Flink on YARN cluster in Application Mode
* Kubernetes
* `./bin/flink run --target kubernetes-session`: Submission to an already running Flink on Kubernetes cluster
Expand Down Expand Up @@ -474,13 +472,6 @@ $ ./bin/flink run \
--python examples/python/table/word_count.py
```

- Run a PyFlink job using a [YARN cluster in Per-Job Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#per-job-cluster-mode):
```bash
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/word_count.py
```

- Run a PyFlink job using a [YARN cluster in Application Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#application-mode):
```bash
$ ./bin/flink run -t yarn-application \
Expand Down
24 changes: 5 additions & 19 deletions docs/content.zh/docs/deployment/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ When deploying Flink, there are often multiple options available for each buildi
JobManager <a href="#deployment-modes">modes for job submissions</a>:
<ul>
<li><b>Application Mode</b>: runs the cluster exclusively for one application. The job's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.</li>
<li><b>Per-Job Mode</b>: runs the cluster exclusively for one job. The job's main method (or client) runs only prior to the cluster creation.</li>
<li><b>Session Mode</b>: one JobManager instance manages multiple jobs sharing the same cluster of TaskManagers</li>
</ul>
</td>
Expand Down Expand Up @@ -173,7 +172,6 @@ covered by [FLINK-26606](https://issues.apache.org/jira/browse/FLINK-26606).

Flink can execute applications in one of three ways:
- in Application Mode,
- in a Per-Job Mode,
- in Session Mode.

The above modes differ in:
Expand All @@ -182,7 +180,7 @@ Flink can execute applications in one of three ways:


<!-- Image source: https://docs.google.com/drawings/d/1EfloufuOp1A7YDwZmBEsHKRLIrrbtRkoWRPcfZI5RYQ/edit?usp=sharing -->
{{< img class="img-fluid" width="80%" style="margin: 15px" src="/fig/deployment_modes.svg" alt="Figure for Deployment Modes" >}}
{{< img class="img-fluid" width="70%" style="margin: 15px" src="/fig/deployment_modes.png" alt="Figure for Deployment Modes" >}}

#### Application Mode

Expand All @@ -196,8 +194,8 @@ network bandwidth to download dependencies and ship binaries to the cluster, and
Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time,
the `main()` method of the application is executed on the JobManager. Creating a cluster per application can be
seen as creating a session cluster shared only among the jobs of a particular application, and torn down when
the application finishes. With this architecture, the *Application Mode* provides the same resource isolation
and load balancing guarantees as the *Per-Job* mode, but at the granularity of a whole application. Executing
the application finishes. With this architecture, the *Application Mode* provides the application granularity resource isolation
and load balancing guarantees. Executing
the `main()` on the JobManager allows for saving the CPU cycles required, but also save the bandwidth required
for downloading the dependencies locally. Furthermore, it allows for more even spread of the network load for
downloading the dependencies of the applications in the cluster, as there is one JobManager per application.
Expand All @@ -208,7 +206,7 @@ as in the other modes. This may have implications for your code as, for example,
your environment using the `registerCachedFile()` must be accessible by the JobManager of your application.
{{< /hint >}}

Compared to the *Per-Job* mode, the *Application Mode* allows the submission of applications consisting of
The *Application Mode* allows the submission of applications consisting of
multiple jobs. The order of job execution is not affected by the deployment mode but by the call used
to launch the job. Using `execute()`, which is blocking, establishes an order and it will lead to the
execution of the "next" job being postponed until "this" job finishes. Using `executeAsync()`, which is
Expand All @@ -224,16 +222,6 @@ Additionally, when any of multiple running jobs in Application Mode (submitted f
Regular job completions (by the sources shutting down) are supported.
{{< /hint >}}

#### Per-Job Mode

Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available resource provider
framework (e.g. YARN, Kubernetes) to spin up a cluster for each submitted job. This cluster is available to
that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are
cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own
TaskManagers. In addition, it spreads the load of book-keeping across multiple JobManagers, as there is
one per job. For these reasons, the *Per-Job* resource allocation model is the preferred mode by many
production reasons.

#### Session Mode

*Session mode* assumes an already running cluster and uses the resources of that cluster to execute any
Expand All @@ -250,9 +238,7 @@ is responsible for the book-keeping of all the jobs in the cluster.
#### Summary

In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster
and the resources are shared across all jobs. The *Per-Job* mode pays the price of spinning up a cluster
for every submitted job, but this comes with better isolation guarantees as the resources are not shared
across jobs. In this case, the lifecycle of the cluster is bound to that of the job. Finally, the
and the resources are shared across all jobs. The
*Application Mode* creates a session cluster per application and executes the application's `main()`
method on the cluster.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@ $ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-

You can override configurations set in [Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}) by passing key-value pairs `-Dkey=value` to `bin/flink`.

### Per-Job Cluster Mode

Flink on Kubernetes does not support Per-Job Cluster Mode.

### Session Mode

You have seen the deployment of a Session cluster in the [Getting Started](#getting-started) guide at the top of this page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
$ kubectl delete -f jobmanager-application-non-ha.yaml
```

<a name="per-job-cluster-mode"></a>

### Per-Job 集群模式

在 Kubernetes 上部署 Standalone 集群时不支持 Per-Job 集群模式。

<a name="session-mode"></a>

### Session 集群模式
Expand Down
28 changes: 3 additions & 25 deletions docs/content.zh/docs/deployment/resource-providers/yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Congratulations! You have successfully run a Flink application by deploying Flin

## Deployment Modes Supported by Flink on YARN

For production use, we recommend deploying Flink Applications in the [Per-job or Application Mode]({{< ref "docs/deployment/overview" >}}#deployment-modes), as these modes provide a better isolation for the Applications.
For production use, we recommend deploying Flink Applications in the [Application Mode]({{< ref "docs/deployment/overview" >}}#deployment-modes), as these modes provide a better isolation for the Applications.

### Application Mode

Expand Down Expand Up @@ -120,28 +120,6 @@ The above will allow the job submission to be extra lightweight as the needed Fl
are going to be picked up by the specified remote locations rather than be shipped to the cluster by the
client.

### Per-Job Cluster Mode

The Per-job Cluster mode will launch a Flink cluster on YARN, then run the provided application jar locally and finally submit the JobGraph to the JobManager on YARN. If you pass the `--detached` argument, the client will stop once the submission is accepted.

The YARN cluster will stop once the job has stopped.

```bash
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
```

Once a Per-Job Cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint.

```bash
# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
```

Note that cancelling your job on an Per-Job Cluster will stop the cluster.


### Session Mode

We describe deployment with the Session Mode in the [Getting Started](#getting-started) guide at the top of the page.
Expand Down Expand Up @@ -241,9 +219,9 @@ The configuration parameter for specifying the REST endpoint port is [rest.bind-

When deploying Flink with Session Mode on Yarn, only the JAR file specified in startup command will be recognized as user-jars and included into user classpath.

**PerJob Mode & Application Mode**
**Application Mode**

When deploying Flink with PerJob/Application Mode on Yarn, the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder will be recognized as user-jars.
When deploying Flink with Application Mode on Yarn, the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder will be recognized as user-jars.
By default Flink will include the user-jars into the system classpath. This behavior can be controlled with the [yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar) parameter.

When setting this to `DISABLED` Flink will include the jar in the user classpath instead.
Expand Down
10 changes: 1 addition & 9 deletions docs/content.zh/docs/ops/debugging/debugging_classloading.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,11 @@ created for an job/application and will contain the job/application's jar files.
-->

**Per-Job模式(已弃用)(Yarn)**

当前只有Yarn支持Per-Job模式。默认情况下,Flink集群运行在Per-Job模式下时会将用户的jar文件包含在系统的classpath中。
这种模式可以由[yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar) 参数控制。
当该参数设定为`DISABLED`时,Flink会将用户jar文件含在用户的classpath中,并由*FlinkUserCodeClassLoader*进行动态加载。

详细信息参见[Flink on Yarn]({{< ref "docs/deployment/resource-providers/yarn" >}})。

**Application模式(Standalone/Yarn/Kubernetes)**

当Application模式的Flink集群基于Standalone或Kubernetes方式运行时,用户jar文件(启动命令指定的jar文件和Flink的`usrlib`目录中的jar包)会由*FlinkUserCodeClassLoader*进行动态加载。

当Flink集群以Application模式运行时,用户jar文件(启动命令指定的jar文件和Flink的`usrlib`目录中的jar包)默认情况下会包含在系统classpath(*AppClassLoader*)。与Per-Job模式相同,[yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar)设置为`DISABLED`时,Flink会将用户jar文件含在用户的classpath中,并由*FlinkUserCodeClassLoader*进行动态加载。
当Flink集群以Application模式运行时,用户jar文件(启动命令指定的jar文件和Flink的`usrlib`目录中的jar包)默认情况下会包含在系统classpath(*AppClassLoader*)。当[yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar)设置为`DISABLED`时,Flink会将用户jar文件含在用户的classpath中,并由*FlinkUserCodeClassLoader*进行动态加载。


## 倒置类加载(Inverted Class Loading)和ClassLoader解析顺序
Expand Down
31 changes: 0 additions & 31 deletions docs/content/docs/concepts/flink-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,4 @@ isolation guarantees.
Formerly, a Flink Session Cluster was also known as a Flink Cluster in `session mode`.
{{< /hint >}}

### Flink Job Cluster (deprecated)

{{< hint danger >}}
Per-job mode is only supported by YARN and has been deprecated in Flink 1.15.
It will be dropped in [FLINK-26000](https://issues.apache.org/jira/browse/FLINK-26000).
Please consider application mode to launch a dedicated cluster per-job on YARN.
{{< /hint >}}

* **Cluster Lifecycle**: in a Flink Job Cluster, the available cluster manager
(like YARN) is used to spin up a cluster for each submitted job
and this cluster is available to that job only. Here, the client first
requests resources from the cluster manager to start the JobManager and
submits the job to the Dispatcher running inside this process. TaskManagers
are then lazily allocated based on the resource requirements of the job. Once
the job is finished, the Flink Job Cluster is torn down.

* **Resource Isolation**: a fatal error in the JobManager only affects the one job running in that Flink Job Cluster.

* **Other considerations**: because the ResourceManager has to apply and wait
for external resource management components to start the TaskManager
processes and allocate resources, Flink Job Clusters are more suited to large
jobs that are long-running, have high-stability requirements and are not
sensitive to longer startup times.

{{< hint info >}}
Formerly, a Flink Job Cluster was also known as a Flink Cluster in `job (or per-job) mode`.
{{< /hint >}}
{{< hint info >}}
Flink Job Clusters are only supported with YARN.
{{< /hint >}}

{{< top >}}
Loading

0 comments on commit 484f211

Please sign in to comment.