Hadoop: YARN Federation

Архитектура

Известно, что YARN масштабируется до тысяч узлов. Масштабируемость YARN определяется Resource Manager, и она пропорциональна количеству узлов, активных приложений и контейнеров, а так же частоте heartbeat-сообщений (как узлов, так и приложений). Снижение heartbeat-сообщений может обеспечить увеличение масштабируемости, однако это отрицательно сказывается на использовании.

В данной главе описан подход на основе Federation для масштабирования одного кластера YARN до десятков тысяч узлов путем интеграции нескольких подкластеров YARN. Предлагаемый метод заключается в разделении большого кластера (10-100 тысяч узлов) на более мелкие блоки, называемые субкластерами (sub-cluster), каждый из которых имеет свой собственный YARN Resource Manager и вычислительные узлы. Система Federation объединяет эти субкластеры и делает их одним большим кластером YARN для приложений. Приложения при этом видят один массивный кластер YARN и могут планировать задачи на любом его узле, в то время как в рамках системы Federation ведутся переговоры с Resource Manager субкластеров для предоставления ресурсов приложению. Цель состоит в том, чтобы позволить отдельной задаче бесшовно “охватить” субкластеры.

Такая конструкция является структурно масштабируемой, поскольку она связывает количество узлов, за которые отвечает каждый Resource Manager, а соответствующие политики пытаются обеспечить, чтобы большинство приложений находилось в одном субкластере, таким образом, число видимых приложений для каждого Resource Manager также ограничено. Это означает, что масштабирумость может быть почти линейной, просто добавляя субкластеры (поскольку для них требуется очень небольшая координация).

Такая архитектура может обеспечить очень строгое соблюдение инвариантов планирования в каждом субкластере (просто наследуется от YARN), в то время как непрерывная перебалансировка по субкластеру обеспечивает (менее строго) то, что эти свойства также соблюдаются на глобальном уровне (например, если субкластер теряет большое количество узлов, можно переназначить очереди на другие субкластеры, чтобы обеспечить исключение несправедливого воздействия на пользователей, работающих в поврежденном субкластере).

Federation спроектирована как “слой” поверх существующей кодовой базы YARN с ограниченными изменениями в основных механизмах (Рис.37.).

../../_images/federation_architecture.png

Рис. 37. Основные компоненты, составляющие кластер Federation

YARN Sub-cluster

Субкластер (sub-cluster) – это кластер YARN размером до нескольких тысяч узлов. Точный размер субкластера определяется с учетом простоты развертывания/обслуживания, согласования с сетевыми зонами и их доступности, а также общими рекомендациями.

Resource Manager субкластера YARN работает с высоким уровнем доступности (HA) с сохранением работоспособности, то есть необходимо быть в состоянии к сбоям Resource Manager и Node Manager с минимальными нарушениями. Если весь субкластер скомпрометирован, внешние механизмы обеспечивают повторную передачу заданий в отдельный субкластер (в дальнейшем это может быть включено в Federation).

Субкластер также является единицей масштабируемости в среде Federation – ее можно расширить, добавив один или несколько субкластеров.

По своей структуре каждый субкластер является полностью функциональным Resource Manager, и его вклад в Federation может быть установлен лишь на долю его общей емкости, т.е. субкластер может иметь “частичное” обязательство перед Federation, сохраняя при этом способность выдавать часть своих возможностей локальным способом.

Router

Приложения YARN отправляются на один из маршрутизаторов (Router), который, в свою очередь, применяет политику маршрутизации (полученную из Policy Store), запрашивает в State Store URL-адрес субкластера и перенаправляет запрос на отправку приложения в соответствующий Resource Manager субкластера. Субкластер, в котором запускается задание, называется “домашним субкластером” (home sub-cluster), а “вторичными” (secondary sub-clusters) называются все остальные субкластеры, на которые распространяется задание.

Маршрутизатор предоставляет ApplicationClientProtocol внешнему миру, прозрачно скрывая присутствие нескольких Resource Manager. Для этого маршрутизатор также сохраняет соответствие между приложением и его домашним субкластером в State Store. Это позволяет маршрутизаторам быть в мягком состоянии, недорого поддерживая при этом запросы пользователей, так как любой маршрутизатор может восстановить приложение для маппинга домашнего субкластера и направить запросы к нужному Resource Manager. Целесообразно для кэширования производительности и балансировки нагрузки. Состояние Federation (включая приложения и узлы) отображается через веб-интерфейс.

AMRMProxy

AMRMProxy является ключевым компонентом, позволяющим приложению масштабироваться и работать в субкластерах. AMRMProxy работает на всех машинах Node Manager и действует как прокси-сервер для YARN Resource Manager для Application Master, реализуя ApplicationMasterProtocol. Приложениям не разрешается напрямую связываться с Resource Manager субкластера. Система принудительно подключает их только к конечной точке AMRMProxy, что обеспечивает прозрачный доступ к нескольким YARN Resource Manager (путем динамической маршрутизации / разделения / маппинга коммуникаций). В любой момент времени задание может охватывать один домашний и несколько вторичных субкластеров, но работающие в AMRMProxy политики пытаются ограничить площадь каждого задания, чтобы минимизировать накладные расходы на инфраструктуру планирования (Рис.38.).

../../_images/amrmproxy_architecture.png

Рис. 38. Архитектура цепочки перехватчиков AMRMProxy

Роль AMRMProxy:

  • Защита субкластера YARN Resource Manager от некорректно работающих Application Master. AMRMProxy может предотвратить DDOS-атаки, дросселируя/уничтожая требующих слишком много ресурсов Application Master;
  • Маскировка нескольких YARN Resource Manager в кластере и прозрачный допуск Application Master к распределению по субкластерам. Все распределения контейнеров выполняются инфраструктурой YARN Resource Manager, которая состоит из AMRMProxy, выходящего в домашний и другие субкластера Resource Manager;
  • Перехват всех запросов, поэтому может принудительно применять квоты приложений, которые не могут быть выполнены субкластером Resource Manager (поскольку каждый из них видит только часть запросов Application Master);
  • Может применять политики балансировки нагрузки / переполнения.

Global Policy Generator

Global Policy Generator (GPG) контролирует всю Federation и гарантирует, что система все время настроена должным образом. Ключевым моментом идеи является то, что доступность кластера не зависит от постоянно включенного GPG. При этом GPG работает непрерывно, но вне зоны действия всех операций кластера, и предоставляет уникальную точку обзора, которая позволяет применять глобальные инварианты, влиять на балансировку нагрузки, инициировать дренаж субкластеров, которые будут подвергаться техническому обслуживанию, и т.д. GPG точнее обновляет маппинг распределения пропускной способности пользователя субкластеру и реже меняет политики, выполняющиеся в Routers, AMRMProxy (и возможных Resource Manager).

В случае если GPG недоступен, операции кластера продолжаются с момента последней публикации политик GPG, и хотя долгосрочная недоступность может означать, что некоторые из желательных свойств баланса, оптимального использования кластера и глобальных инвариантов могут исчезнуть, вычисления и доступ к данным не будут скомпрометированы.

В текущей реализации Global Policy Generator представляет собой процесс ручной настройки, представленный через CLI (YARN-3657).

Эта часть системы Federation является частью будущей работы в YARN-5597.

Federation State-Store

Federation State определяет дополнительное состояние, которое необходимо поддерживать для свободного объединения нескольких отдельных субкластеров в один большой кластер Federation. Включает в себя:

  • Sub-cluster Membership

Члены YARN Resource Manager непрерывно передают heartbeat-сообщения в State Store для keep-alive и публикации своей текущей мощности/загрузке. Эта информация используется GPG для принятия необходимых политических решений. Также эта информация может использоваться маршрутизаторами для выбора лучшего домашнего субдкластера. Этот механизм позволяет динамически увеличивать/уменьшать “кластерный парк”, добавляя или удаляя субкластеры, а также позволяет легко обслуживать каждый из них. Это новая функциональность, которую необходимо добавить в YARN Resource Manager, при этом механизмы между собой хорошо понятны, поскольку функциональность аналогична индивидуальной высокой доступности (HA) YARN Resource Manager.

  • Application’s Home Sub-cluster

Субкластер, в котором выполняется Application Master, называется “домашним субкластером приложения” (home sub-cluster). При этом Application Master не ограничивается ресурсами только домашнего субкластера и может запрашивать ресурсы из других, называемых “вторичными” (secondary sub-clusters). Среда Federation настраивается и периодически налаживается таким образом, чтобы при размещении Application Master в субкластере он мог найти большую часть ресурсов в домашнем субкластере и только в определенных случаях запрашивал ресурсы у других субкластеров.

Federation Policy Store

Federation Policy Store – это логически отдельное хранилище (хотя оно может поддерживаться одним и тем же физическим компонентом), которое содержит информацию о том, как приложения и запросы ресурсов направляются в разные субкластеры. Текущая реализация предоставляет несколько политик – от случайных/ хэширующих/ циклических/ приоритетных до более сложных, которые учитывают нагрузку субкластера и запрашивают потребности в локальности.

Запуск приложений через субкластеры

При отправке приложения система определяет наиболее подходящий субкластер для его запуска, и он становится домашним. Все коммуникации от Application Master к Resource Manager осуществляются через AMRMProxy, работающий локально на машине Application Master. AMRMProxy предоставляет ту же конечную точку протокола ApplicationMasterService, что и YARN Resource Manager. Application Master может запрашивать контейнеры, используя информацию о местоположении, предоставляемую уровнем хранения.

В идеальном случае приложение размещается в субкластере, где доступны все ему необходимые ресурсы и данные, но если ему нужны контейнеры на узлах в других субкластерах, AMRMProxy прозрачно согласовывает с их Resource Manager и предоставляет ресурсы, что позволяет приложению рассматривать всю среду Federation как один массивный кластер YARN. AMRMProxy, Global Policy Generator и Router работают вместе для бесшовной реализации (Рис.39.).

../../_images/federation_sequence_diagram.png

Рис. 39. Диаграмма последовательности

На рисунке показана диаграмма последовательности для следующего потока выполнения задания:

  1. Router получает запрос на отправку приложения, являющийся жалобой на YARN Application Client Protocol.
  2. Маршрутизатор опрашивает таблицу/политику маршрутизации, чтобы выбрать домашний Resource Manager для задания (конфигурация политики принимается из State Store по heartbeat-сообщению).
  3. Маршрутизатор запрашивает состояние membership, чтобы определить конечную точку домашнего Resource Manager.
  4. Затем маршрутизатор перенаправляет запрос на отправку приложения в домашний Resource Manager.
  5. Маршрутизатор обновляет состояние приложения с помощью идентификатора домашнего субкластера.
  6. Как только приложение отправляется в домашний Resource Manager, запускается поток YARN, то есть приложение добавляется в очередь планировщика, и его Application Master запускается в домашнем субкластере в первом NodeManager с доступными ресурсами.
  • Во время этого процесса среда Application Master изменяется, указывая адрес AMRMProxy в качестве YARN Resource Manager для связи;
  • Токены безопасности также изменяются NodeManager при запуске Application Master, так что Application Master может общаться только с AMRMProxy. Любые дальнейшие коммуникации от Application Master до YARN Resource Manager осуществляются посредством AMRMProxy.
  1. Затем Application Master запрашивает контейнеры, используя информацию о местонахождении, предоставляемую HDFS.
  2. На основе политики AMRMProxy может олицетворять Application Master в других субкластерах, отправляя Unmanaged Application Master и перенаправляя heartbeats-сообщения Application Master соответствующим субкластерам.
  • Federation поддерживает несколько попыток приложения с помощью AMRMProxy HA. Контейнеры Application Master имеют разные идентификаторы попыток в домашнем субкластере, но один и тот же Unmanaged Application Master во вторичных;
  • Когда AMRMProxy HA включен, токен Unmanaged Application Master хранится в Yarn Registry. При вызове registerApplicationMaster от каждой попытки приложения AMRMProxy извлекает существующие токены Unmanaged Application Master из реестра (если таковые имеются) и повторно подключается к существующим Unmanaged Application Master.
  1. AMRMProxy использует как информацию о местонахождении, так и подключаемую политику, настроенную в State Store, чтобы решить, следует ли перенаправлять полученные от Application Master запросы ресурсов в домашний Resource Manager или во вторичный (один или более). На рисунке отображен случай, когда AMRMProxy решает переслать запрос на вторичный Resource Manager.
  2. Вторичный Resource Manager предоставляет AMRMProxy актуальные токены контейнера для запуска нового контейнера на узле в его субкластере. Такой механизм гарантирует, что каждый субкластер использует свои собственные токены безопасности и избегает необходимости общего секрета кластера для создания токенов.
  3. AMRMProxy пересылает ответ распределения обратно в Application Master.
  4. Application Master запускает контейнер на целевом NodeManager (в субкластере 2), используя стандартные протоколы YARN.

Конфигурация

Настройка YARN для использования Federation осуществляется через ряд свойств в файле conf/yarn-site.xml.

Общие для всех

yarn.federation.enabled – включена Federation или нет. Пример значения:

true

yarn.resourcemanager.cluster-id – уникальный идентификатор субкластера для данного Resource Manager (такой же, что используется для HA). Пример значения:

<unique-subcluster-id>

State Store:

В настоящее время поддерживается реализации State Store на основе ZooKeeper и SQL.

Обязательные настройки ZooKeeper для Hadoop:

yarn.federation.state-store.class – тип State Store. Пример значения:

org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore

hadoop.zk.address – адрес для ансамбля ZooKeeper. Пример значения:

host:port

Обязательные параметры SQL:

yarn.federation.state-store.class – тип State Store. Пример значения:

org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore

yarn.federation.state-store.sql.url – имя базы данных для SQLFederationStateStore, в которой хранится состояние. Пример значения:

jdbc:mysql://<host>:<port>/FederationStateStore

yarn.federation.state-store.sql.jdbc-class – используемый класс jdbc для SQLFederationStateStore. Пример значения:

com.mysql.jdbc.jdbc2.optional.MysqlDataSource

yarn.federation.state-store.sql.username – имя пользователя для соединения с БД для SQLFederationStateStore. Пример значения:

<dbuser>

yarn.federation.state-store.sql.password – пароль для подключения к БД для SQLFederationStateStore. Пример значения:

<dbpass>

Для MySQL и Microsoft SQL Server предоставляются скрипты.

Для MySQL необходимо загрузить последнюю версию jar 5.x из MVN Repository и добавить ее в CLASSPATH. Затем схема БД создается путем выполнения следующих скриптов SQL в базе данных:

  • sbin/FederationStateStore/MySQL/FederationStateStoreDatabase.sql
  • sbin/FederationStateStore/MySQL/FederationStateStoreUser.sql
  • sbin/FederationStateStore/MySQL/FederationStateStoreTables.sql
  • sbin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql

В том же каталоге предоставляются скрипты для удаления хранимых процедур, таблиц, пользователя и базы данных.

Important

FederationStateStoreUser.sql определяет для БД пользователя/пароль по умолчанию, для которого настоятельно рекомендуется установить собственный надежный пароль

Для SQL-сервера процесс аналогичен, но драйвер jdbc уже включен. Скрипты SQL-сервера находятся в каталоге sbin/FederationStateStore/SQLServer/.

Optional:

yarn.federation.failover.enabled – следует ли повторить попытку, учитывая отказоустойчивость Resource Manager в каждом субкластере. Пример значения:

true

yarn.federation.blacklist-subclusters – список черных списков субкластеров, используемых для отключения субкластера. Пример значения:

<subcluster-id>

yarn.federation.policy-manager – выбор диспетчера политик, определяющий как Applications и ResourceRequests маршрутизируются через систему. Пример значения:

org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager

yarn.federation.policy-manager-params – полезная нагрузка, которая настраивает политику. В примере набор весов для политик маршрутизатора и AMRMProxy. Обычно генерируется путем сериализации policymanager, который был сконфигурирован программно, или путем заполнения State Store его сериализованной формой .json. Пример значения:

<binary>

yarn.federation.subcluster-resolver.class – класс, используемый для определения, к какому субкластеру принадлежит узел, и к какому субкластеру(ам) принадлежит стойка. Пример значения:

org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl

yarn.federation.machine-list – путь к файлу со списком машин, используемых SubClusterResolver. Каждая строка файла представляет собой узел с информацией о субкластере и стойке (например: node1, subcluster1, rack1 / node2, subcluster2, rack1 / node3, subcluster3, rack2 / node4, subcluster3, rack2). Пример значения:

<path of machine-list file>

Resource Manager

Дополнительная конфигурация, которая должна отображаться в файле conf/yarn-site.xml в каждом Resource Manager.

yarn.resourcemanager.epoch – начальное значение для ряда идентификаторов контейнеров, гарантирующих уникальность container-IDs, генерируемых различными Resource Manager. Поэтому значение параметра должно быть уникальным среди субкластеров и быть достаточно разнесенным, чтобы учитывать сбои. Приращения 1000 допускают большое количество субкластеров и гарантируют практически нулевую вероятность коллизий (коллизия может произойти только в том случае, если контейнер все еще жив при 1000 перезапусках одного Resource Manager, в то время как следующий Resource Manager никогда не перезапускается, и приложение запрашивает больше контейнеров). Пример значения:

<unique-epoch>

Опционально:

yarn.federation.state-store.heartbeat-interval-secs – интервал частоты, с которой Resource Manager сообщают о своем membership в Federation центральному State Store. Пример значения:

60

Router

Дополнительные конфигурации, которые должны отображаться в файле conf/yarn-site.xml в каждом Router.

yarn.router.bind-host – IP-адрес хоста для привязки маршрутизатора. Фактический адрес, к которому привывается сервер. Если этот адрес установлен, серверы RPC и webapp привязываются к нему и к указанному в yarn.router.*.address порту. Для того, чтобы маршрутизатор слушал все интерфейсы, рекомендуется значение:

0.0.0.0

yarn.router.clientrm.interceptor-class.pipeline – разделенный запятыми список классов перехватчиков, которые должны запускаться на маршрутизаторе при взаимодействии с клиентом. Последним этапом этого конвейера должен быть Federation Client Interceptor. Пример значения:

org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor

Опционально:

yarn.router.hostname – имя хоста маршрутизатора. Пример значения:

0.0.0.0

yarn.router.clientrm.address – адрес клиента маршрутизатора. Пример значения:

0.0.0.0:8050

yarn.router.webapp.address – адрес веб-приложения на маршрутизаторе. Пример значения:

0.0.0.0:8089

yarn.router.admin.address – админ-адрес на маршрутизаторе. Пример значения:

0.0.0.0:8052

yarn.router.webapp.https.address – безопасный адрес веб-приложения на маршрутизаторе. Пример значения:

0.0.0.0:8091

yarn.router.submit.retry – количество попыток в маршрутизаторе, перед отказом. Пример значения:

3

yarn.federation.statestore.max-connections – максимальное количество параллельных подключений, которые каждый маршрутизатор устанавливает в State Store. Пример значения:

10

yarn.federation.cache-ttl.secs – маршрутизатор кеширует информацию, и это время, чтобы уйти до того, как кеш становится недействительным. Пример значения:

60

yarn.router.webapp.interceptor-class.pipeline – разделенный запятыми список классов перехватчиков, которые должны запускаться на маршрутизаторе при взаимодействии с клиентом через интерфейс REST. Последним этапом этого конвейера должен быть Federation Interceptor REST. Пример значения:

org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST

NodeManager

Дополнительные конфигурации, которые должны отображаться в файле conf/yarn-site.xml в каждом NodeManager.

yarn.nodemanager.amrmproxy.enabled – определяет, включен ли AMRMProxy. Пример значения:

true

yarn.nodemanager.amrmproxy.interceptor-class.pipeline – разделенный запятыми список перехватчиков, которые необходимо запустить в AMRMProxy. Для Federation последним этапом этого конвейера должен быть FederationInterceptor. Пример значения:

org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor

Опционально:

yarn.nodemanager.amrmproxy.ha.enable – определяет, включен ли AMRMProxy HA для поддержки нескольких попыток приложения. Пример значения:

true

yarn.federation.statestore.max-connections – максимальное количество параллельных подключений от каждого AMRMProxy к State Store. Это значение обычно ниже, чем у маршрутизатора, поскольку всегда есть много AMRMProxy, которые могут быстро прожечь многие соединения с БД. Пример значения:

1

yarn.federation.cache-ttl.secs – время для кэша AMRMProxy. Это значение обычно больше, чем у маршрутизатора, так как количество AMRMProxy велико, и целесообразно ограничить нагрузку центральным State Store. Пример значения:

300

Запуск тестового задания

Для отправки заданий в кластер Federation необходимо создать отдельный набор конфигураций для клиента, из которого будут отправляться задания. В них conf/yarn-site.xml должен иметь следующие дополнительные конфигурации:

yarn.resourcemanager.address – перенаправляет запущенные на клиенте задания на клиентский порт маршрутизатора Resource Manager. Пример значения:

<router_host>:8050

yarn.resourcemanager.scheduler.address – перенаправляет задания на порт федерации AMRMProxy. Пример значения:

localhost:8049

Любые задания YARN для кластера могут быть отправлены из описанных выше конфигураций клиента. Чтобы запустить задание через Federation, сначала необходимо запустить все участвующие в ней кластеры. Затем выполнить старт маршрутизатора на компьютере маршрутизатора с помощью команды:

$HADOOP_HOME/bin/yarn --daemon start router

Теперь, когда $HADOOP_CONF_DIR указывает на папку конфигураций клиента, необходимо запустить задание обычным способом. Конфигурации направляют задание на клиентский порт маршрутизатора Resource Manager, где Router должен прослушиваться после запуска. Пример запуска задания Pi на кластере Federation с клиента:

$HADOOP_HOME/bin/yarn jar hadoop-mapreduce-examples-3.0.0.jar pi 16 1000

Задание передается на маршрутизатор, который использует сгенерированную политику из GPG, чтобы выбрать домашний Resource Manager для задания.

Выходные данные приведенного примера задания должны быть примерно такими:

2017-07-13 16:29:25,055 INFO mapreduce.Job: Job job_1499988226739_0001 running in uber mode : false
2017-07-13 16:29:25,056 INFO mapreduce.Job:  map 0% reduce 0%
2017-07-13 16:29:33,131 INFO mapreduce.Job:  map 38% reduce 0%
2017-07-13 16:29:39,176 INFO mapreduce.Job:  map 75% reduce 0%
2017-07-13 16:29:45,217 INFO mapreduce.Job:  map 94% reduce 0%
2017-07-13 16:29:46,228 INFO mapreduce.Job:  map 100% reduce 100%
2017-07-13 16:29:46,235 INFO mapreduce.Job: Job job_1499988226739_0001 completed successfully
.
.
.
Job Finished in 30.586 seconds
Estimated value of Pi is 3.14250000......

Состояние задания также можно отслеживать в веб-интерфейсе маршрутизатора по адресу routerhost:8089.

Важно обратить внимание, что для использования Federation не потребовалось никаких изменений в коде или перекомпиляции входного jar. Кроме того, выходные данные приведенного задания такие же, как и при запуске без Federation. Чтобы получить все преимущества Federation, рекомендуется использовать большее количество mappers, чем того требует кластер. Для приведенного примера это число составляет 16.