Run Flink on YARN

This article describes the use of the Flink service on top of a YARN cluster.

Overview

Generally, the Flink service supports two deployment modes:

  • Standalone mode. This mode assumes that the TaskManager and JobManager components are installed on the ADH cluster hosts. In this mode, a Flink cluster is started and stopped using ADCM and is continuously running, waiting for new jobs to be submitted. The amount of resources allocated to JobManager/TaskManager components is fixed and can be configured using the Flink service settings.

  • YARN mode. In this mode, the Flink service setup does not require JobManager/TaskManager components to be installed on ADH hosts. Instead, to execute a Flink application, these components are created dynamically in YARN containers and get disposed of once the job is complete (the containers' lifecycle actually varies based on the YARN mode used). When a Flink application runs on YARN, it is transferred to the YARN ResourceManager first, which spawns YARN containers on ADH hosts managed by YARN NodeManagers. During the job execution phase, Flink dynamically allocates and de-allocates TaskManager resources to complete the job. More details on running Flink on YARN are described below.

An important difference between the Standalone and YARN mode is where the Flink application code is compiled to a JobGraph. In the Standalone mode, the Flink client does the compilation and sends the JobGraph to the JobManager for execution. This makes Flink clients heavy CPU/RAM consumers and requires additional bandwidth for moving JobGraphs over the network. In the YARN mode, the application’s main() method runs in a dedicated YARN container with a JobManager, reducing the load on the hosts with Flink clients.

Prepare a user for the YARN mode

Before using Flink in the YARN mode, complete the steps below to prepare the local user for interaction with a YARN cluster.

  1. Create a user directory in HDFS and set ownership. These directories are used by YARN to store temporary execution data.

    $ hadoop fs -mkdir /user/<username>
    $ hadoop fs -chown -R <username>:hadoop /user/<username>

    where <username> is the local user that runs commands like flink run …​.

  2. If Kerberos is enabled and the local user name differs from the principal name, specify the user-principal mapping using an auth_to_local rule. For example:

    RULE:[1:$1@$0](k_alpashkin_krb2@AD.RANGER-TEST)s/.*/admin/
  3. Run the Update Core configuration action of the Core configuration service.

  4. Restart HDFS, YARN, and Flink services.

  5. If Ranger plugins are enabled for HDFS and YARN, grant permissions to the user.

A Flink cluster can run on YARN in two modes:

These modes are described below with usage examples.

Application mode

Running Flink on YARN in the application mode creates a short-lived, per-application Flink cluster that serves only one Flink application. This cluster exists only for the lifetime of the submitted application and gets automatically destroyed when the application exits. This mode provides better isolation for multiple Flink jobs, since every job runs in its own dedicated Flink cluster without any interference.

Here’s a sequence of events that take place when you submit a Flink application to YARN in the application mode:

  1. The processing starts with submitting a Flink JAR to the Flink client.

  2. The Flink client requests YARN to create a dedicated container (Application Master) to deploy a JobManager. The application code with all the dependencies gets transferred to the JobManager’s container. The main() method runs inside this container.

  3. The JobManager requests additional containers from YARN to deploy TaskManagers, required for executing Flink tasks. The number of spawned TaskManagers depends on the application’s startup parameters and the Flink service settings (parallelism, number of slots).

  4. When the Flink job completes (successfully or not, or gets cancelled), all the containers allocated by YARN are shut down.

Below is an example of running Flink on YARN in the application mode:

  1. Submit the application JAR to the Flink client:

    $ flink run-application \ (1)
    -t yarn-application \ (2)
    /usr/lib/flink/examples/batch/WordCount.jar
    1 The run-application command instructs Flink to run in the application mode.
    2 Specifies YARN as the deployment target.

    The output indicates the interaction with YARN components and contains the ID of the YARN Application Master:

    Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    2025-06-24 15:37:27,169 INFO  org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032
    2025-06-24 15:37:27,411 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200
    2025-06-24 15:37:27,429 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
    2025-06-24 15:37:27,668 INFO  org.apache.hadoop.conf.Configuration                         [] - resource-types.xml not found
    2025-06-24 15:37:27,669 INFO  org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to find 'resource-types.xml'.
    2025-06-24 15:37:27,758 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2048, slotsPerTaskManager=1}
    2025-06-24 15:37:28,517 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
    2025-06-24 15:37:29,279 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
    2025-06-24 15:37:29,304 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1750778541795_0003
    2025-06-24 15:37:29,363 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1750778541795_0003
    2025-06-24 15:37:29,363 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
    2025-06-24 15:37:29,367 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
    2025-06-24 15:37:34,956 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
    2025-06-24 15:37:34,957 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface adh-ka-2.ru-central1.internal:8081 of application 'application_1750778541795_0003'.
  2. Once the application is submitted, you can invoke commands on it like cancelling, listing, or creating a savepoint. For example:

    $ flink list \ (1)
      -t yarn-application \
      -Dyarn.application.id=application_1750778541795_0003
    
    $ flink cancel \ (2)
      -t yarn-application \
      -Dyarn.application.id=application_1750778541795_0003 <jobId>
    1 Lists Flink jobs belonging to the application_1750778541795_0003 YARN application.
    2 Cancels the Flink job identified by <jobId>. Cancelling a job stops the Flink cluster.
  3. Verify that the application has succeeded in the YARN ResourceManager web UI. The up-to-date web UI link can be found in ADCM (Clusters → <ADH_cluster> → Services → YARN → Info).

    YARN web UI
    YARN web interface
    YARN web UI
    YARN web interface
TIP

For more light-weight job submission, consider using the yarn.provided.lib.dirs parameter and upload your Flink JARs to an HDFS location accessible by all ADH hosts. In this case, the Flink client does not have to transfer heavy JARs to the JobManager as it can be picked from a remote location. For example:

$ flink run-application \
    -t yarn-application \
    -Dyarn.provided.lib.dirs="hdfs://user/admin/flink_demo/test_flink.jar" \
    hdfs://user/admin/flink_demo/test_flink.jar

For more information on running Flink on YARN in the application mode, see Flink documentation.

Session mode

In this mode, a single (typically long-running) Flink cluster is deployed in YARN containers to serve multiple Flink jobs. When a Flink job completes, the Flink cluster session remains alive.

Running Flink in this mode assumes the following sequence of events:

  1. The processing starts with creating a Flink cluster session in YARN. This creates a YARN container (Application Master) with a JobManager. Containers for TaskManagers are initialized dynamically based on the startup parameters and the configuration of the Flink service.

  2. The ID of the launched YARN application is retrieved (for example, from the YARN web UI, logs, or by using the yarn application -list command).

  3. One or more Flink jobs are attached to the running Flink cluster session.

  4. When required, the Flink cluster session is terminated manually.

Below is an example of running a Flink application on YARN in the session mode:

  1. Launch a Flink cluster session on YARN:

    $ flink-yarn-session \
        -d \ (1)
        -nm my_flink_job \ (2)
        -s 2 \ (3)
        -tm 2048 (4)
        -yj /usr/lib/flink/lib/flink-dist-1.20.1.jar (5)
    1 Starts a Flink cluster session in the detached mode.
    2 Sets a custom name for the YARN application.
    3 Sets the number of slots per TaskManager.
    4 Sets the amount of memory in megabytes to allocate per TaskManager container.
    5 Overrides the yarn.flink-dist-jar configuration property. Without this, a warning may appear in logs: No path for the flink jar passed…​.
  2. Get the YARN application ID. For this, you can use the YARN web UI or use the command:

    $ yarn application -list

    The output indicates the running Flink cluster:

    WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
    2025-06-26 14:24:07,737 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032
    2025-06-26 14:24:07,974 INFO client.AHSProxy: Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200
    Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
                    Application-Id      Application-Name        Application-Type          User           Queue                   State             Final-State             Progress                        Tracking-URL
    application_1750778541795_0007          my_flink_job            Apache Flink         flink         default                 RUNNING               UNDEFINED                 100%            http://10.92.43.237:8081
  3. Attach a Flink job to the YARN session:

    $ flink run
        -t yarn-session
        -Dyarn.application.id=application_1750778541795_0007
        /usr/lib/flink/examples/batch/WordCount.jar
    Sample output

     

    Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    2025-06-26 14:27:37,600 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /var/tmp/.yarn-properties-flink.
    2025-06-26 14:27:37,600 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /var/tmp/.yarn-properties-flink.
    Executing WordCount example with default input data set.
    Use --input to specify file input.
    Printing result to stdout. Use --output to specify output path.
    2025-06-26 14:27:38,422 INFO  org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032
    2025-06-26 14:27:38,659 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200
    2025-06-26 14:27:38,673 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
    2025-06-26 14:27:38,815 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface adh-ka-3.ru-central1.internal:8081 of application 'application_1750778541795_0007'.
    Job has been submitted with JobID 5fb3965a1ed2ee3ed5790a3fe4df84c1
    Program execution finished
    Job with JobID 5fb3965a1ed2ee3ed5790a3fe4df84c1 has finished.
    Job Runtime: 7027 ms
    Accumulator Results:
    - 180f560f3af3522f5cb4c5ba2d0f4297 (java.util.ArrayList) [170 elements]
    
    (a,5)
    (action,1)
    ...
  4. View the application execution details in the YARN ResourceManager web UI. The up-to-date web UI link can be found in ADCM (Clusters → <ADH_cluster> → Services → YARN → Info).

    YARN web UI
    YARN web interface
    YARN web UI
    YARN web interface
  5. When the Flink cluster session is no longer needed, kill the YARN application using the command:

    $ yarn application -kill application_1750778541795_0007

    The output:

    2025-06-26 14:33:19,976 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032
    2025-06-26 14:33:20,226 INFO client.AHSProxy: Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200
    Killing application application_1750778541795_0007
    2025-06-26 14:33:20,609 INFO impl.YarnClientImpl: Killed application application_1750778541795_0007

For more information on running Flink on YARN in the session mode, see Flink documentation.

Found a mistake? Seleсt text and press Ctrl+Enter to report it