Планирование задач Spark

При использовании Spark в режиме кластера существует две области, где применимо планирование и распределение ресурсов:

  • Планирование на уровне кластера. Предполагает планирование отдельных Spark-приложений, которые выполняются на разных узлах кластера.

  • Планирование на уровне приложения. Предполагает планирование параллельных задач Spark, например collect, save и других действий, выполняющихся внутри одного приложения Spark.

Планирование на уровне кластера

Для запуска Spark в режиме кластера используется кластерный менеджер YARN, задачей которого является планирование Spark-задач и выделение для них ресурсов на каждом узле кластера.

Статическое выделение ресурсов

Один из способов управления распределением ресурсов в YARN — это указание фиксированного объема ресурсов при запуске скрипта spark-submit (статическое распределение ресурсов). Ниже показан пример запуска Spark в YARN, где с помощью специальных параметров указан объем выделяемых ресурсов.

$ ./bin/spark-submit --class <app-class-name> \
    --master yarn \ (1)
    --deploy-mode cluster \ (2)
    --driver-memory 4g \ (3)
    --num-executors (4)
    --executor-memory 2g \ (5)
    --executor-cores 1 \ (6)
    ...
1 Тип менеджера кластера.
2 Режим деплоя, при котором код программы драйвера выполняется на worker-узлах.
3 Максимальное количество памяти, выделяемое для программы-драйвера.
4 Количество Spark executor, выделяемых на кластере.
5 Максимальное количество памяти, выделяемое на один executor.
6 Количество ядер ЦПУ, выделяемое на один executor.

Динамическое выделение ресурсов

Альтернативным подходом к распределению ресурсов кластера является встроенная в Spark функция динамического распределения ресурсов. Данная опция позволяет Spark динамически контролировать количество executor в зависимости от нагрузки. Для активации этой функции в ADH-сервисах Spark/Spark3, используйте действие Switch dynamic resource allocation в интерфейсе ADCM.

Использование внешнего shuffle-сервиса

После включения динамического распределения ресурсов следующим шагом может стать переход на внешний shuffle-сервис для улучшения надежности и ресурсопотребления. Основной задачей Spark shuffle-сервиса (Shuffle Service, SS) является обмен блоками данных между несколькими Spark executor или между executor и Spark-драйвером на этапе перетасовки (shuffle). По умолчанию Spark использует реализацию SS для YARN, которая запускается на каждом NodeManager YARN-кластера. Данная реализация SS доступна в /usr/lib/spark/yarn/spark-{version}-yarn-shuffle.jar. Однако эта реализация SS имеет один недостаток — в случае сбоя Spark executor удаляются все shuffle-данные, записанные на диск.

Для обеспечения максимальной надежности и скорости можно использовать внешний SS. Ниже описаны шаги по его подключению к кластеру Spark:

  1. Добавьте JAR с вашей реализацией SS в classpath, например /usr/lib/spark/*. В качестве примера для создания своей реализации SS можно использовать /usr/lib/spark/yarn/spark-{version}-yarn-shuffle.jar.

  2. В ADCM на странице Clusters → <YOUR_CLUSTER_NAME> → Services → YARN → Primary configuration укажите имя вашего класса SS в поле yarn.nodemanager.aux-services.spark_shuffle.class.

  3. Для каждого NodeManager увеличьте размер heap с помощью параметра YARN_HEAPSIZE (по умолчанию 1000) в файле etc/hadoop/yarn-env.sh. Это необходимо для избежания проблем с GC на этапе перетасовки.

  4. Выполните рестарт YARN.

Вы также можете указать конфигурацию SS в файле spark-shuffle-site.xml и поместить этот файл в classpath SS. Подробная информация о конфигурационных параметрах SS доступна в документации Spark.

Планирование на уровне приложения

Внутри одного приложения Spark (здесь и далее под "приложением" стоит понимать один экземпляр SparkContext) планирование и распределение ресурсов также применимо, поскольку несколько Spark-задач могут выполняться параллельно, если они запущены из разных потоков. В данной статье "задача" означает действие Spark, например reduce, save, collect и прочие, а также любые другие задачи, необходимые для выполнения этого действия.

Режимы FIFO и справедливого распределения

По умолчанию задачи Spark выполняются в порядке FIFO. Каждая задача разбивается на этапы (например, фазы map/reduce), и задача, которая запустилась первой, получает приоритет на все доступные ресурсы, пока не завершены все ее этапы. Затем приоритет получает вторая задача и так далее. Если стоящей в начале очереди задаче не требуются все доступные ресурсы, то последующие задачи могут стартовать параллельно. Однако, если задачи в начале очереди являются трудоемкими, то выполнение последующих задач может существенно задержаться.

Помимо FIFO, Spark может выполнять задачи в режиме справедливого распределения (fair sharing mode). В этом режиме Spark распределяет задачи по принципу round-robin, так что все задачи получают примерно равные доли ресурсов кластера. Таким образом маленькие задачи, запущенные во время выполнения большой задачи, могут сразу же начать получать ресурсы, не дожидаясь завершения большой задачи, и при этом демонстрировать хорошее время отклика. По умолчанию режим справедливого распределения отключен, его можно включить, установив для экземпляра SparkContext значение spark.scheduler.mode=FAIR, как показано ниже:

val conf = new SparkConf()
          .setMaster(...)
          .setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

Пул справедливого планировщика

Spark позволяет группировать задачи в пулы (pool) и устанавливать различные параметры планирования (например, вес) для каждого пула. Это может быть полезно для создания высокоприоритетных пулов для важных задач.

По умолчанию все задачи попадают в дефолтный пул. Чтобы направить задачу в определенный пул, необходимо установить локальное свойство spark.scheduler.pool для вашего экземпляра SparkContext. Это необходимо сделать в том же потоке, из которого запускаются задачи, например:

sc.setLocalProperty("spark.scheduler.pool", "mPool")

По умолчанию каждый пул получает равную долю ресурсов кластера, а внутри каждого пула задачи выполняются в порядке FIFO. Например, если создать один пул для каждого пользователя, то каждый пользователь получит равную долю ресурсов кластера, а запросы от каждого пользователя будут выполняться один за другим.

Поведение пулов планировщика можно настроить с помощью следующих параметров:

  • schedulingMode. Определяет, порядок выполнения задач в пуле. Допустимые значения:

    • FIFO — задачи выполняются по очереди, является значением по умолчанию.

    • FAIR — ресурсы пула распределяются справедливо между задачами.

  • weight. Устанавливает приоритет пула по отношению к другим пулам. Например, если установить вес пула 2, то такой пул получит в 2 раза больше ресурсов, чем другие активные пулы. Используя большие значения веса (например, 1000), можно реализовать приоритет для нескольких пулов. По умолчанию вес равен 1.

  • minShare. Кроме веса, для каждого пула может быть задана минимальная доля (в виде количества ядер ЦПУ). Планировщик Spark всегда пытается удовлетворить минимальные доли всех активных пулов, прежде чем перераспределять дополнительные ресурсы в соответствии с весами. Таким образом, свойство minShare является еще одним способом обеспечить быстрое получение пулом определенного количества ресурсов (например, 10 ядер) без предоставления ему высокого приоритета. По умолчанию для каждого пула свойство minShare равно 0.

Приведенные выше свойства необходимо указать в файле fairscheduler.xml, аналогичном /etc/spark3/conf.dist/fairscheduler.xml.template, как показано ниже.

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

Затем следует либо поместить файл fairscheduler.xml в classpath, либо установить свойство SparkConf spark.scheduler.allocation.file, которое указывает на файл fairscheduler.xml. Например:

conf.set("spark.scheduler.allocation.file", "file:///path/to/local-file")
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/hdfs-file")

Более детальная информация о свойствах пула для справедливого планировщика Spark доступна в документации Spark.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней