Запуск Flink в YARN

Данная статья описывает использование сервиса Flink в режиме YARN.

Обзор

Изначально сервис Flink поддерживает два режима развертывания:

  • Режим standalone. Данный режим предполагает, что компоненты TaskManager и JobManager установлены на хостах ADH-кластера. В этом режиме запуск и остановка кластера Flink выполняется с помощью ADCM, а кластер постоянно активен, ожидая новых Flink-задач. Количество ресурсов, выделяемых компонентам JobManager/TaskManager, фиксировано и указывается с помощью настроек Flink-сервиса.

  • Режим YARN. В этом режиме сервис Flink не требует установки компонентов JobManager/TaskManager на хостах ADH-кластера. Для выполнения Flink-задач эти компоненты создаются динамически в контейнерах YARN, а затем утилизируются (жизненный цикл контейнеров зависит от используемого режима YARN). При запуске Flink-приложения в режиме YARN JAR-файл передается компоненту YARN ResourceManager, который создает YARN-контейнеры на хостах ADH под управлением YARN NodeManager. Непосредственно на этапе выполнения кода Flink динамически аллоцирует ресурсы для TaskManager. Подробности использования Flink в YARN с примерами приведены далее в статье.

Ключевым отличием режима standalone от YARN является то, где именно код Flink-приложения компилируется в JobGraph. В standalone-режиме компиляция выполняется компонентом Flink client, а затем JobGraph отправляется в JobManager. Как следствие, компоненты Flink client могут потреблять значительное количество ресурсов CPU/RAM и дополнительно создавать нагрузку на сеть, перемещая объекты JobGraph. В режиме YARN метод main() приложения Flink выполняется в выделенном YARN-контейнере с JobManager, что снижает нагрузку на компоненты Flink client.

Подготовка пользователя к взаимодействию с YARN

Перед использованием Flink в YARN выполните следующие шаги для подготовки локального пользователя к взаимодействию с YARN-кластером.

  1. Создайте директорию в HDFS и установите права владельца. YARN использует эту директорию для хранения временных данных.

    $ hadoop fs -mkdir /user/<username>
    $ hadoop fs -chown -R <username>:hadoop /user/<username>

    где <username> — локальный пользователь, от которого запускаются команды flink run …​.

  2. Если кластер керберизирован и имя локального пользователя отличается от принципала, укажите сопоставление пользователь-принципал, используя правила auth_to_local. Например:

    RULE:[1:$1@$0](k_alpashkin_krb2@AD.RANGER-TEST)s/.*/admin/
  3. В сервисе Core configuration выполните действие Update Core configuration.

  4. Перезапустите сервисы HDFS, YARN и Flink.

  5. Если плагины Ranger включены для HDFS и YARN, предоставьте соответствующие права пользователю.

Кластер Flink может работать в YARN в двух режимах:

Ниже описаны особенности каждого из режимов с примерами.

Режим приложения

При запуске Flink в YARN в режиме приложения (application mode) создается временный кластер Flink для выполнения одного Flink-приложения. После завершения работы приложения кластер Flink автоматически уничтожается. Данный режим обеспечивает наилучшую изоляцию при выполнении нескольких Flink-задач, поскольку задачи выполняются в выделенных кластерах Flink, не мешая друг другу.

Далее отражены основные события, которые происходят при запуске Flink в YARN в режиме приложения:

  1. JAR-файл с приложением Flink передается клиенту Flink.

  2. Клиент Flink запрашивает у YARN выделенный контейнер (Application Master) для развертывания в нем компонента JobManager. Код приложения со всеми зависимостями переносится в контейнер JobManager. Метод main() выполняется внутри этого контейнера.

  3. В процессе выполнения JobManager запрашивает у YARN дополнительные контейнеры для TaskManager, которые необходимы для выполнения отдельных Flink-задач. Количество создаваемых TaskManager зависит от параметров запуска приложения и настроек сервиса Flink (параллелизм, количество слотов).

  4. После завершения задачи (успешного, неудачного или отмены) все контейнеры, выделенные YARN, уничтожаются.

Ниже показан пример использования Flink в YARN в режиме приложения:

  1. Запустите приложение Flink, указав путь к JAR-файлу:

    $ flink run-application \ (1)
    -t yarn-application \ (2)
    /usr/lib/flink/examples/batch/WordCount.jar
    1 Команда run-application запускает кластер Flink в режиме приложения.
    2 Указывает YARN в качестве среды для развертывания.

    В выводе содержится информация о взаимодействии с компонентами YARN, а также Application Master ID:

    Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    2025-06-24 15:37:27,169 INFO  org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032
    2025-06-24 15:37:27,411 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200
    2025-06-24 15:37:27,429 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
    2025-06-24 15:37:27,668 INFO  org.apache.hadoop.conf.Configuration                         [] - resource-types.xml not found
    2025-06-24 15:37:27,669 INFO  org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to find 'resource-types.xml'.
    2025-06-24 15:37:27,758 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2048, slotsPerTaskManager=1}
    2025-06-24 15:37:28,517 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
    2025-06-24 15:37:29,279 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
    2025-06-24 15:37:29,304 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1750778541795_0003
    2025-06-24 15:37:29,363 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1750778541795_0003
    2025-06-24 15:37:29,363 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
    2025-06-24 15:37:29,367 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
    2025-06-24 15:37:34,956 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
    2025-06-24 15:37:34,957 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface adh-ka-2.ru-central1.internal:8081 of application 'application_1750778541795_0003'.
  2. После запуска приложения вы можете использовать такие команды, как отмена, получение списка задач или создание точки сохранения (savepoint). Например:

    $ flink list \ (1)
      -t yarn-application \
      -Dyarn.application.id=application_1750778541795_0003
    
    $ flink cancel \ (2)
      -t yarn-application \
      -Dyarn.application.id=application_1750778541795_0003 <jobId>
    1 Выводит список задач Flink, принадлежащих YARN-приложению application_1750778541795_0003.
    2 Отменяет выполнение Flink-задачи <jobId>. Отмена задачи останавливает кластер Flink.
  3. Проверьте статус приложения в веб-интерфейсе YARN ResourceManger. Актуальная ссылка на веб-интерфейс доступна в ADCM (Clusters → <ADH_cluster> → Services → YARN → Info).

    Веб-интерфейс YARN
    Web-интерфейс YARN
    Веб-интерфейс YARN
    Web-интерфейс YARN
РЕКОМЕНДАЦИЯ

Для более "легковесного" запуска задач используйте параметр yarn.provided.lib.dirs, предварительно загрузив JAR-файлы Flink в директорию HDFS, доступную на всех хостах ADH. В этом случае клиент Flink не переносит массивные JAR-файлы в JobManager, так как они могут быть считаны напрямую из HDFS. Например:

$ flink run-application \
    -t yarn-application \
    -Dyarn.provided.lib.dirs="hdfs://user/admin/flink_demo/test_flink.jar" \
    hdfs://user/admin/flink_demo/test_flink.jar

Дополнительная информация об использовании Flink с YARN в режиме приложения доступна в документации Flink.

Режим сессии

В этом режиме один кластер Flink (обычно долго работающий) разворачивается в контейнерах YARN для обслуживания множества Flink-задач. Когда одна задача Flink завершается, сессия Flink-кластера остается активной.

Ниже представлена последовательность событий, которые происходят при запуске Flink в YARN в режиме сессии:

  1. Клиент Flink запускает сессию YARN c Flink-кластером. Для этого создается YARN-контейнер с JobManager (Application Master). Контейнеры для TaskManager инициализируются динамически с учетом параметров запуска и конфигурации сервиса Flink.

  2. ID запущенного приложения YARN сохраняется (например, c помощью веб-интерфейса YARN, логов или команды yarn application -list).

  3. С помощью клиента Flink одна или несколько задач отправляется на выполнение в Flink-кластер.

  4. При необходимости сессия Flink-кластера завершается вручную.

Ниже показан пример использования Flink в YARN в режиме сессии:

  1. Запустите сессию Flink-кластера в YARN:

    $ flink-yarn-session \
        -d \ (1)
        -nm my_flink_job \ (2)
        -s 2 \ (3)
        -tm 2048 (4)
        -yj /usr/lib/flink/lib/flink-dist-1.20.1.jar (5)
    1 Запуск сессии в фоновом (detached) режиме.
    2 Указание пользовательского имени для YARN-приложения.
    3 Установка количества слотов для TaskManager.
    4 Установка максимального объема памяти в мегабайтах, выделяемой для контейнеров с TaskManager.
    5 Переопределение параметра конфигурации yarn.flink-dist-jar. Без указания данного параметра в логах может появиться ошибка: No path for the flink jar passed…​.
  2. Получите ID приложения YARN. Для этого используйте веб-интерфейс YARN или команду:

    $ yarn application -list

    Вывод содержит ID YARN-приложения:

    WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
    2025-06-26 14:24:07,737 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032
    2025-06-26 14:24:07,974 INFO client.AHSProxy: Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200
    Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
                    Application-Id      Application-Name        Application-Type          User           Queue                   State             Final-State             Progress                        Tracking-URL
    application_1750778541795_0007          my_flink_job            Apache Flink         flink         default                 RUNNING               UNDEFINED                 100%            http://10.92.43.237:8081
  3. Запустите приложение Flink, подключившись к сессии YARN:

    $ flink run
        -t yarn-session
        -Dyarn.application.id=application_1750778541795_0007
        /usr/lib/flink/examples/batch/WordCount.jar
    Пример вывода

     

    Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    2025-06-26 14:27:37,600 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /var/tmp/.yarn-properties-flink.
    2025-06-26 14:27:37,600 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /var/tmp/.yarn-properties-flink.
    Executing WordCount example with default input data set.
    Use --input to specify file input.
    Printing result to stdout. Use --output to specify output path.
    2025-06-26 14:27:38,422 INFO  org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032
    2025-06-26 14:27:38,659 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200
    2025-06-26 14:27:38,673 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
    2025-06-26 14:27:38,815 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface adh-ka-3.ru-central1.internal:8081 of application 'application_1750778541795_0007'.
    Job has been submitted with JobID 5fb3965a1ed2ee3ed5790a3fe4df84c1
    Program execution finished
    Job with JobID 5fb3965a1ed2ee3ed5790a3fe4df84c1 has finished.
    Job Runtime: 7027 ms
    Accumulator Results:
    - 180f560f3af3522f5cb4c5ba2d0f4297 (java.util.ArrayList) [170 elements]
    
    (a,5)
    (action,1)
    ...
  4. Проверьте детали выполнения задачи в веб-интерфейсе YARN ResourceManger. Актуальная ссылка на веб-интерфейс доступна в ADCM (Clusters → <ADH_cluster> → Services → YARN → Info).

    Веб-интерфейс YARN
    Web-интерфейс YARN
    Веб-интерфейс YARN
    Web-интерфейс YARN
  5. Когда сессия Flink-кластера больше не нужна, завершите приложение YARN с помощью команды:

    $ yarn application -kill application_1750778541795_0007

    Вывод:

    2025-06-26 14:33:19,976 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032
    2025-06-26 14:33:20,226 INFO client.AHSProxy: Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200
    Killing application application_1750778541795_0007
    2025-06-26 14:33:20,609 INFO impl.YarnClientImpl: Killed application application_1750778541795_0007

Дополнительная информация об использовании Flink в режиме сессии YARN доступна в документации Flink.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней