Оптимизация производительности в Spark

В этом разделе описаны методы оптимизации производительности Spark, а также приведены соответствующие конфигурационные параметры для тонкой настройки этих функций.

ПРИМЕЧАНИЕ
В данной статье "Spark" подразумевает ADH-сервис Spark3, так как некоторые функции оптимизации не полностью поддерживаются ADH-сервисом Spark.

Параметры конфигурации Spark, описанные в статье, можно указать с помощью sparkSession.setConf("property=value") или с помощью команд Spark SQL типа SET property=value. Больше информации о параметрах, используемых для настройки производительности, доступно в документации Spark.

Кеширование данных в памяти

Spark SQL позволяет кешировать таблицы в памяти, используя специальный столбцовый формат, с помощью методов spark.catalog.cacheTable("tableName") или dataFrame.cache(). После вызова этих методов Spark будет сканировать только необходимые столбцы и автоматически сжимать данные для минимизации использования памяти и нагрузки на GC. Чтобы удалить таблицу из памяти, используйте spark.catalog.uncacheTable("tableName") или dataFrame.unpersist().

Свойство Описание Значение по умолчанию

spark.sql.inMemoryColumnarStorage.compressed

При значении true Spark автоматически подбирает кодек сжатия для каждого столбца, основываясь на статистике о данных в столбце

true

spark.sql.inMemoryColumnarStorage.batchSize

Устанавливает размер пакета при кешировании столбца. Установка большего размера пакета позволяет более эффективно использовать память и обеспечивает лучший уровень сжатия, однако увеличивает риск OOM-исключений

10000

spark.sql.files.maxPartitionBytes

Максимальное количество байт, помещаемых в один раздел (partition) при чтении файла. Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON

134217728 (128 MБ)

spark.sql.files.openCostInBytes

Влияет на количество создаваемых разделов, в которые будут записаны данные при чтении. Расчетная "стоимость" открытия файла, измеряемая в количестве байт, которые могут быть прочитаны за раз. Данный параметр используется при записи нескольких файлов в один раздел. Рекомендуется оценивать стоимость с запасом — в таком случае разделы с маленькими файлами обрабатываются быстрее, чем разделы с бОльшими файлами (запланированные на выполнение первыми). Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON

4194304 (4 МБ)

spark.sql.files.minPartitionNum

Желаемое (однако не гарантируемое) минимальное количество разделов при разбиении файла. Если параметр не задан, используется значение свойства spark.default.parallelism. Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON

Значение свойства spark.default.parallelism

spark.sql.broadcastTimeout

Тайм-аут (в секундах) на выполнение операции трансляции при выполнении broadcast JOIN

300

spark.sql.autoBroadcastJoinThreshold

Максимальный размер таблицы (в байтах), перед тем как данные таблицы будут транслированы (broadcast) на все worker-узлы при выполнении JOIN. Установка значения -1 отключает трансляцию данных

10485760 (10 МБ)

spark.sql.shuffle.partitions

Устанавливает количество разделов, используемых при перетасовке данных в операциях JOIN/агрегации

200

spark.sql.sources.parallelPartitionDiscovery.threshold

Устанавливает предельное значение для активации параллельного листинга при чтении входных путей (input paths). Если количество входных путей превышает это значение, Spark выполняет листинг, используя распределенную задачу. В остальных случаях Spark использует последовательный листинг. Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON

32

spark.sql.sources.parallelPartitionDiscovery.parallelism

Устанавливает значение максимального параллелизма листинга для входных путей задачи. Если количество входных путей больше, чем значение данного параметра, оно будет урезано до значения параметра. Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON

10000

Использование подсказок JOIN

Операции JOIN в Spark SQL — это преобразования, подразумевающие массовую перетасовку данных по сети, которые при неправильной конфигурации могут ухудшать производительность. Используя подсказки для JOIN-стратегий (JOIN hints), можно "рекомендовать" Spark использовать определенную стратегию JOIN для конкретной таблицы. Вы можете значительно улучшить производительность JOIN-операций, выбирая стратегии, которые лучше всего подходят для конкретных таблиц. Ниже показан пример объединения двух таблиц с использованием подсказки BROADCAST.

val joined_table = spark.table("src").join(spark.table("records").hint("broadcast"), "key")

Если для таблицы указано несколько подсказок, Spark применяет подсказки со следующим приоритетом:

  1. BROADCAST

  2. MERGE

  3. SHUFFLE_HASH

  4. SHUFFLE_REPLACE_NL

ПРИМЕЧАНИЕ
Использование подсказки не дает гарантии, что Spark будет использовать именно рекомендуемую стратегию, поскольку указанная стратегия может поддерживать не все типы JOIN.

Использование подсказок разбиения

В Spark SQL подсказки разбиения (partition hints) позволяют "рекомендовать" определенную стратегию для выполнения разбиения (partitioning). Основные подсказки разбиения показаны ниже:

  • COALESCE. Позволяет контролировать количество выходных файлов. Принимает количество партиций в качестве параметра.

  • REPARTITION. Используется для переразбиения на нужное количество партиций. В качестве параметров может принимать количество партиций, имена столбцов, оба значения, либо использоваться без параметров.

  • REPARTITION_BY_RANGE. Принимает имена столбцов и количество партиций (опционально).

  • REBALANCE. Используется для перераспределения результатов запроса с целью сделать партиции примерно одинакового размера, без существенного перекоса.

Примеры подсказок разбиения показаны ниже.

SELECT /*+ COALESCE(5) */ * FROM t
SELECT /*+ REPARTITION(5) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(5, c) */ * FROM t
SELECT /*+ REPARTITION */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(5, c) */ * FROM t
SELECT /*+ REBALANCE */ * FROM t
SELECT /*+ REBALANCE(5) */ * FROM t
SELECT /*+ REBALANCE(c) */ * FROM t
SELECT /*+ REBALANCE(5, c) */ * FROM t

Адаптивное выполнение запросов

Адаптивное выполнение запросов (Adaptive Query Execution, AQE) — это техника оптимизации, которая на основе статистики выполнения подбирает наиболее эффективный план выполнения запроса. AQE включена по умолчанию и может быть отключена с помощью свойства spark.sql.adaptive.enabled. Основные функции AQE описаны ниже.

Объединение разделов после перетасовки

Эта функция AQE объединяет разделы (partitions coalescence) после перетасовки на основе выходной статистики, что позволяет не указывать вручную количество разделов для перетасовки, которое бы подходило вашему набору данных. Вместо этого Spark сам подберет подходящее число разделов во время выполнения (при условии, что задано достаточно большое начальное число разделов с помощью свойства spark.sql.adaptive.coalescePartitions.initialPartitionNum).

Эта функция активируется, если заданы параметры spark.sql.adaptive.enabled=true и spark.sql.adaptive.coalescePartitions.enabled=true.

Свойство Описание Значение по умолчанию

spark.sql.adaptive.coalescePartitions.enabled

Если указано значение true и spark.sql.adaptive.enabled=true, Spark будет объединять смежные разделы перетасовки в соответствии с целевым размером раздела (определяется свойством spark.sql.adaptive.advisoryPartitionSizeInBytes), чтобы избежать слишком большого количества маленьких задач

true

spark.sql.adaptive.coalescePartitions.parallelismFirst

Если значение равно true, то при слиянии смежных разделов Spark игнорирует целевой размер, указанный в spark.sql.adaptive.advisoryPartitionSizeInBytes (по умолчанию равен 64 МБ), и учитывает только минимальный размер раздела, определяемый свойством spark.sql.adaptive.coalescePartitions.minPartitionSize (по умолчанию 1 МБ). Это позволяет добиться максимального эффекта параллелизма. Данные действия позволяют избежать ухудшения производительности при включенном AQE. Рекомендуется использовать значение false и соблюдать целевой размер, устанавливаемый свойством spark.sql.adaptive.advisoryPartitionSizeInBytes

true

spark.sql.adaptive.coalescePartitions.minPartitionSize

Минимальный размер разделов перетасовки после слияния. Значение должно составлять не более 20% от значения параметра spark.sql.adaptive.advisoryPartitionSizeInBytes. Этот параметр может быть полезен, если целевой размер игнорируется при слиянии разделов (поведение по умолчанию)

1 МБ

spark.sql.adaptive.coalescePartitions.initialPartitionNum

Начальное количество разделов перетасовки перед слиянием. Если значение не задано, используется значение параметра spark.sql.shuffle.partitions. Данный параметр имеет эффект только если установлены spark.sql.adaptive.enabled=true и spark.sql.adaptive.coalescePartitions.enabled=true

 — 

spark.sql.adaptive.advisoryPartitionSizeInBytes

Рекомендуемый размер (в байтах) разделов перетасовки при адаптивной оптимизации (spark.sql.adaptive.enabled=true). Данный параметр имеет эффект, если Spark объединяет небольшие разделы перетасовки или разбивает разделы перетасовки с перекосом

64 МБ

Преобразование sort-merge JOIN в broadcast hash JOIN

AQE автоматически преобразует JOIN сортировочного слияния (sort-merge JOIN) в транслируемый хеш-JOIN (broadcast hash JOIN), если статистика любой из сторон JOIN меньше порога адаптивного транслируемого хеш-JOIN (определяется параметром spark.sql.adaptive.autoBroadcastJoinThreshold). Это не так эффективно, как использование изначально транслируемого хеш-JOIN, но все же более эффективно, чем JOIN слияния, так как позволяет сохранить сортировку обеих сторон JOIN и локально обработать файлы перетасовки для уменьшения нагрузки на сеть (при условии, что spark.sql.adaptive.localShuffleReader.enabled=true).

Свойство Описание Значение по умолчанию

spark.sql.adaptive.autoBroadcastJoinThreshold

Устанавливает максимальный размер (в байтах) для таблицы, которая будет транслироваться на все worker-узлы при выполнении JOIN. Установка значения -1 отключает трансляцию. Значение по умолчанию берется из параметра spark.sql.autoBroadcastJoinThreshold. Обратите внимание, что данный параметр используется только в AQE

 — 

Преобразование sort-merge JOIN в shuffled hash JOIN

AQE автоматически преобразует JOIN сортировочного слияния (sort-merge JOIN) в перетасованный хеш-JOIN (shuffled hash JOIN), если все разделы после сортировки меньше предельного значения, определяемого параметром spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold.

Свойство Описание Значение по умолчанию

spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

Устанавливает максимальный размер (в байтах) для каждого раздела, который может быть использован для построения локальной хеш-таблицы. Если значение параметра больше или равно spark.sql.adaptive.advisoryPartitionSizeInBytes, и все разделы также не превышают значение данного параметра, Spark будет использовать перетасованный хеш-JOIN вместо сортированного JOIN, независимо от значения spark.sql.join.preferSortMergeJoin

0

Оптимизация перекосов в JOIN

Перекос данных (data skew) часто приводит к замедлению JOIN-операций. Данная функция AQE динамически обрабатывает перекос в JOIN сортировочного слияния (sort-merge JOIN), разбивая (и при необходимости реплицируя) перекошенные задачи на примерно одинаковые по размеру. Функция активируется при установке параметров spark.sql.adaptive.enabled=true и spark.sql.adaptive.skewJoin.enabled=true.

Свойство Описание Значение по умолчанию

spark.sql.adaptive.skewJoin.enabled

Если установлено значение true и spark.sql.adaptive.enabled=true, Spark динамически обрабатывает перекос в JOIN сортировочного слияния, разбивая (и реплицируя при необходимости) перекошенные разделы

true

spark.sql.adaptive.skewJoin.skewedPartitionFactor

Раздел считается перекошенным, если его размер (в байтах) больше данного коэффициента, умноженного на медианный размер раздела, а также если размер раздела больше значения spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

5

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Раздел считается перекошенным, если его размер (в байтах) больше данного предельного значения, а также если размер раздела больше значения параметра spark.sql.adaptive.skewJoin.skewedPartitionFactor, умноженного на медианный размер раздела. Значение этого свойства должно быть больше, чем spark.sql.adaptive.advisoryPartitionSizeInBytes

256 МБ

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней