Запуск Flink в YARN
Данная статья описывает использование сервиса Flink в режиме YARN.
Обзор
Изначально сервис Flink поддерживает два режима развертывания:
-
Режим standalone. Данный режим предполагает, что компоненты TaskManager и JobManager установлены на хостах ADH-кластера. В этом режиме запуск и остановка кластера Flink выполняется с помощью ADCM, а кластер постоянно активен, ожидая новых Flink-задач. Количество ресурсов, выделяемых компонентам JobManager/TaskManager, фиксировано и указывается с помощью настроек Flink-сервиса.
-
Режим YARN. В этом режиме сервис Flink не требует установки компонентов JobManager/TaskManager на хостах ADH-кластера. Для выполнения Flink-задач эти компоненты создаются динамически в контейнерах YARN, а затем утилизируются (жизненный цикл контейнеров зависит от используемого режима YARN). При запуске Flink-приложения в режиме YARN JAR-файл передается компоненту YARN ResourceManager, который создает YARN-контейнеры на хостах ADH под управлением YARN NodeManager. Непосредственно на этапе выполнения кода Flink динамически аллоцирует ресурсы для TaskManager. Подробности использования Flink в YARN с примерами приведены далее в статье.
Ключевым отличием режима standalone от YARN является то, где именно код Flink-приложения компилируется в JobGraph.
В standalone-режиме компиляция выполняется компонентом Flink client, а затем JobGraph отправляется в JobManager.
Как следствие, компоненты Flink client могут потреблять значительное количество ресурсов CPU/RAM и дополнительно создавать нагрузку на сеть, перемещая объекты JobGraph.
В режиме YARN метод main() приложения Flink выполняется в выделенном YARN-контейнере с JobManager, что снижает нагрузку на компоненты Flink client.
Подготовка пользователя к взаимодействию с YARN
Перед использованием Flink в YARN выполните следующие шаги для подготовки локального пользователя к взаимодействию с YARN-кластером.
-
Создайте директорию в HDFS и установите права владельца. YARN использует эту директорию для хранения временных данных.
$ hadoop fs -mkdir /user/<username> $ hadoop fs -chown -R <username>:hadoop /user/<username>где
<username>— локальный пользователь, от которого запускаются командыflink run …. -
Если кластер керберизирован и имя локального пользователя отличается от принципала, укажите сопоставление пользователь-принципал, используя правила
auth_to_local. Например:RULE:[1:$1@$0](k_alpashkin_krb2@AD.RANGER-TEST)s/.*/admin/ -
В сервисе Core configuration выполните действие Update Core configuration.
-
Перезапустите сервисы HDFS, YARN и Flink.
-
Если плагины Ranger включены для HDFS и YARN, предоставьте соответствующие права пользователю.
Режимы работы Flink в YARN
Кластер Flink может работать в YARN в двух режимах:
Ниже описаны особенности каждого из режимов с примерами.
Режим приложения
При запуске Flink в YARN в режиме приложения (application mode) создается временный кластер Flink для выполнения одного Flink-приложения. После завершения работы приложения кластер Flink автоматически уничтожается. Данный режим обеспечивает наилучшую изоляцию при выполнении нескольких Flink-задач, поскольку задачи выполняются в выделенных кластерах Flink, не мешая друг другу.
Далее отражены основные события, которые происходят при запуске Flink в YARN в режиме приложения:
-
JAR-файл с приложением Flink передается клиенту Flink.
-
Клиент Flink запрашивает у YARN выделенный контейнер (Application Master) для развертывания в нем компонента JobManager. Код приложения со всеми зависимостями переносится в контейнер JobManager. Метод
main()выполняется внутри этого контейнера. -
В процессе выполнения JobManager запрашивает у YARN дополнительные контейнеры для TaskManager, которые необходимы для выполнения отдельных Flink-задач. Количество создаваемых TaskManager зависит от параметров запуска приложения и настроек сервиса Flink (параллелизм, количество слотов).
-
После завершения задачи (успешного, неудачного или отмены) все контейнеры, выделенные YARN, уничтожаются.
Ниже показан пример использования Flink в YARN в режиме приложения:
-
Запустите приложение Flink, указав путь к JAR-файлу:
$ flink run-application \ (1) -t yarn-application \ (2) /usr/lib/flink/examples/batch/WordCount.jar1 Команда run-applicationзапускает кластер Flink в режиме приложения.2 Указывает YARN в качестве среды для развертывания. В выводе содержится информация о взаимодействии с компонентами YARN, а также Application Master ID:
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 2025-06-24 15:37:27,169 INFO org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032 2025-06-24 15:37:27,411 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200 2025-06-24 15:37:27,429 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2025-06-24 15:37:27,668 INFO org.apache.hadoop.conf.Configuration [] - resource-types.xml not found 2025-06-24 15:37:27,669 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to find 'resource-types.xml'. 2025-06-24 15:37:27,758 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2048, slotsPerTaskManager=1} 2025-06-24 15:37:28,517 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2025-06-24 15:37:29,279 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided. 2025-06-24 15:37:29,304 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1750778541795_0003 2025-06-24 15:37:29,363 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1750778541795_0003 2025-06-24 15:37:29,363 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated 2025-06-24 15:37:29,367 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED 2025-06-24 15:37:34,956 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully. 2025-06-24 15:37:34,957 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface adh-ka-2.ru-central1.internal:8081 of application 'application_1750778541795_0003'. -
После запуска приложения вы можете использовать такие команды, как отмена, получение списка задач или создание точки сохранения (savepoint). Например:
$ flink list \ (1) -t yarn-application \ -Dyarn.application.id=application_1750778541795_0003 $ flink cancel \ (2) -t yarn-application \ -Dyarn.application.id=application_1750778541795_0003 <jobId>1 Выводит список задач Flink, принадлежащих YARN-приложению application_1750778541795_0003.2 Отменяет выполнение Flink-задачи <jobId>. Отмена задачи останавливает кластер Flink. -
Проверьте статус приложения в веб-интерфейсе YARN ResourceManger. Актуальная ссылка на веб-интерфейс доступна в ADCM (Clusters → <ADH_cluster> → Services → YARN → Info).
Web-интерфейс YARN
Web-интерфейс YARN
|
РЕКОМЕНДАЦИЯ
Для более "легковесного" запуска задач используйте параметр
|
Дополнительная информация об использовании Flink с YARN в режиме приложения доступна в документации Flink.
Режим сессии
В этом режиме один кластер Flink (обычно долго работающий) разворачивается в контейнерах YARN для обслуживания множества Flink-задач. Когда одна задача Flink завершается, сессия Flink-кластера остается активной.
Ниже представлена последовательность событий, которые происходят при запуске Flink в YARN в режиме сессии:
-
Клиент Flink запускает сессию YARN c Flink-кластером. Для этого создается YARN-контейнер с JobManager (Application Master). Контейнеры для TaskManager инициализируются динамически с учетом параметров запуска и конфигурации сервиса Flink.
-
ID запущенного приложения YARN сохраняется (например, c помощью веб-интерфейса YARN, логов или команды
yarn application -list). -
С помощью клиента Flink одна или несколько задач отправляется на выполнение в Flink-кластер.
-
При необходимости сессия Flink-кластера завершается вручную.
Ниже показан пример использования Flink в YARN в режиме сессии:
-
Запустите сессию Flink-кластера в YARN:
$ flink-yarn-session \ -d \ (1) -nm my_flink_job \ (2) -s 2 \ (3) -tm 2048 (4) -yj /usr/lib/flink/lib/flink-dist-1.20.1.jar (5)1 Запуск сессии в фоновом (detached) режиме. 2 Указание пользовательского имени для YARN-приложения. 3 Установка количества слотов для TaskManager. 4 Установка максимального объема памяти в мегабайтах, выделяемой для контейнеров с TaskManager. 5 Переопределение параметра конфигурации yarn.flink-dist-jar. Без указания данного параметра в логах может появиться ошибка:No path for the flink jar passed…. -
Получите ID приложения YARN. Для этого используйте веб-интерфейс YARN или команду:
$ yarn application -listВывод содержит ID YARN-приложения:
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS. 2025-06-26 14:24:07,737 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032 2025-06-26 14:24:07,974 INFO client.AHSProxy: Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200 Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1750778541795_0007 my_flink_job Apache Flink flink default RUNNING UNDEFINED 100% http://10.92.43.237:8081 -
Запустите приложение Flink, подключившись к сессии YARN:
$ flink run -t yarn-session -Dyarn.application.id=application_1750778541795_0007 /usr/lib/flink/examples/batch/WordCount.jarПример выводаSetting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 2025-06-26 14:27:37,600 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /var/tmp/.yarn-properties-flink. 2025-06-26 14:27:37,600 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /var/tmp/.yarn-properties-flink. Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. 2025-06-26 14:27:38,422 INFO org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032 2025-06-26 14:27:38,659 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200 2025-06-26 14:27:38,673 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2025-06-26 14:27:38,815 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface adh-ka-3.ru-central1.internal:8081 of application 'application_1750778541795_0007'. Job has been submitted with JobID 5fb3965a1ed2ee3ed5790a3fe4df84c1 Program execution finished Job with JobID 5fb3965a1ed2ee3ed5790a3fe4df84c1 has finished. Job Runtime: 7027 ms Accumulator Results: - 180f560f3af3522f5cb4c5ba2d0f4297 (java.util.ArrayList) [170 elements] (a,5) (action,1) ...
-
Проверьте детали выполнения задачи в веб-интерфейсе YARN ResourceManger. Актуальная ссылка на веб-интерфейс доступна в ADCM (Clusters → <ADH_cluster> → Services → YARN → Info).
Web-интерфейс YARN
Web-интерфейс YARN -
Когда сессия Flink-кластера больше не нужна, завершите приложение YARN с помощью команды:
$ yarn application -kill application_1750778541795_0007Вывод:
2025-06-26 14:33:19,976 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at adh-ka-2.ru-central1.internal/10.92.41.75:8032 2025-06-26 14:33:20,226 INFO client.AHSProxy: Connecting to Application History server at adh-ka-3.ru-central1.internal/10.92.43.237:10200 Killing application application_1750778541795_0007 2025-06-26 14:33:20,609 INFO impl.YarnClientImpl: Killed application application_1750778541795_0007
Дополнительная информация об использовании Flink в режиме сессии YARN доступна в документации Flink.