Оптимизация производительности в Spark
В этом разделе описаны методы оптимизации производительности Spark, а также приведены соответствующие конфигурационные параметры для тонкой настройки этих функций.
ПРИМЕЧАНИЕ
В данной статье "Spark" подразумевает ADH-сервис Spark3, так как некоторые функции оптимизации не полностью поддерживаются ADH-сервисом Spark.
|
Параметры конфигурации Spark, описанные в статье, можно указать с помощью sparkSession.setConf("property=value")
или с помощью команд Spark SQL типа SET property=value
.
Больше информации о параметрах, используемых для настройки производительности, доступно в документации Spark.
Кеширование данных в памяти
Spark SQL позволяет кешировать таблицы в памяти, используя специальный столбцовый формат, с помощью методов spark.catalog.cacheTable("tableName")
или dataFrame.cache()
.
После вызова этих методов Spark будет сканировать только необходимые столбцы и автоматически сжимать данные для минимизации использования памяти и нагрузки на GC.
Чтобы удалить таблицу из памяти, используйте spark.catalog.uncacheTable("tableName")
или dataFrame.unpersist()
.
Свойство | Описание | Значение по умолчанию |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
При значении |
true |
spark.sql.inMemoryColumnarStorage.batchSize |
Устанавливает размер пакета при кешировании столбца. Установка большего размера пакета позволяет более эффективно использовать память и обеспечивает лучший уровень сжатия, однако увеличивает риск OOM-исключений |
10000 |
spark.sql.files.maxPartitionBytes |
Максимальное количество байт, помещаемых в один раздел (partition) при чтении файла. Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON |
134217728 (128 MБ) |
spark.sql.files.openCostInBytes |
Влияет на количество создаваемых разделов, в которые будут записаны данные при чтении. Расчетная "стоимость" открытия файла, измеряемая в количестве байт, которые могут быть прочитаны за раз. Данный параметр используется при записи нескольких файлов в один раздел. Рекомендуется оценивать стоимость с запасом — в таком случае разделы с маленькими файлами обрабатываются быстрее, чем разделы с бОльшими файлами (запланированные на выполнение первыми). Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON |
4194304 (4 МБ) |
spark.sql.files.minPartitionNum |
Желаемое (однако не гарантируемое) минимальное количество разделов при разбиении файла.
Если параметр не задан, используется значение свойства |
Значение свойства |
spark.sql.broadcastTimeout |
Тайм-аут (в секундах) на выполнение операции трансляции при выполнении broadcast JOIN |
300 |
spark.sql.autoBroadcastJoinThreshold |
Максимальный размер таблицы (в байтах), перед тем как данные таблицы будут транслированы (broadcast) на все worker-узлы при выполнении JOIN.
Установка значения |
10485760 (10 МБ) |
spark.sql.shuffle.partitions |
Устанавливает количество разделов, используемых при перетасовке данных в операциях JOIN/агрегации |
200 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
Устанавливает предельное значение для активации параллельного листинга при чтении входных путей (input paths). Если количество входных путей превышает это значение, Spark выполняет листинг, используя распределенную задачу. В остальных случаях Spark использует последовательный листинг. Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON |
32 |
spark.sql.sources.parallelPartitionDiscovery.parallelism |
Устанавливает значение максимального параллелизма листинга для входных путей задачи. Если количество входных путей больше, чем значение данного параметра, оно будет урезано до значения параметра. Данный параметр актуален при работе с файловыми данными типа Parquet, ORC и JSON |
10000 |
Использование подсказок JOIN
Операции JOIN в Spark SQL — это преобразования, подразумевающие массовую перетасовку данных по сети, которые при неправильной конфигурации могут ухудшать производительность.
Используя подсказки для JOIN-стратегий (JOIN hints), можно "рекомендовать" Spark использовать определенную стратегию JOIN для конкретной таблицы.
Вы можете значительно улучшить производительность JOIN-операций, выбирая стратегии, которые лучше всего подходят для конкретных таблиц.
Ниже показан пример объединения двух таблиц с использованием подсказки BROADCAST
.
val joined_table = spark.table("src").join(spark.table("records").hint("broadcast"), "key")
Если для таблицы указано несколько подсказок, Spark применяет подсказки со следующим приоритетом:
-
BROADCAST
-
MERGE
-
SHUFFLE_HASH
-
SHUFFLE_REPLACE_NL
ПРИМЕЧАНИЕ
Использование подсказки не дает гарантии, что Spark будет использовать именно рекомендуемую стратегию, поскольку указанная стратегия может поддерживать не все типы JOIN.
|
Использование подсказок разбиения
В Spark SQL подсказки разбиения (partition hints) позволяют "рекомендовать" определенную стратегию для выполнения разбиения (partitioning). Основные подсказки разбиения показаны ниже:
-
COALESCE. Позволяет контролировать количество выходных файлов. Принимает количество партиций в качестве параметра.
-
REPARTITION. Используется для переразбиения на нужное количество партиций. В качестве параметров может принимать количество партиций, имена столбцов, оба значения, либо использоваться без параметров.
-
REPARTITION_BY_RANGE. Принимает имена столбцов и количество партиций (опционально).
-
REBALANCE. Используется для перераспределения результатов запроса с целью сделать партиции примерно одинакового размера, без существенного перекоса.
Примеры подсказок разбиения показаны ниже.
SELECT /*+ COALESCE(5) */ * FROM t
SELECT /*+ REPARTITION(5) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(5, c) */ * FROM t
SELECT /*+ REPARTITION */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(5, c) */ * FROM t
SELECT /*+ REBALANCE */ * FROM t
SELECT /*+ REBALANCE(5) */ * FROM t
SELECT /*+ REBALANCE(c) */ * FROM t
SELECT /*+ REBALANCE(5, c) */ * FROM t
Адаптивное выполнение запросов
Адаптивное выполнение запросов (Adaptive Query Execution, AQE) — это техника оптимизации, которая на основе статистики выполнения подбирает наиболее эффективный план выполнения запроса.
AQE включена по умолчанию и может быть отключена с помощью свойства spark.sql.adaptive.enabled
.
Основные функции AQE описаны ниже.
Объединение разделов после перетасовки
Эта функция AQE объединяет разделы (partitions coalescence) после перетасовки на основе выходной статистики, что позволяет не указывать вручную количество разделов для перетасовки, которое бы подходило вашему набору данных.
Вместо этого Spark сам подберет подходящее число разделов во время выполнения (при условии, что задано достаточно большое начальное число разделов с помощью свойства spark.sql.adaptive.coalescePartitions.initialPartitionNum
).
Эта функция активируется, если заданы параметры spark.sql.adaptive.enabled=true
и spark.sql.adaptive.coalescePartitions.enabled=true
.
Свойство | Описание | Значение по умолчанию |
---|---|---|
spark.sql.adaptive.coalescePartitions.enabled |
Если указано значение |
true |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
Если значение равно |
true |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
Минимальный размер разделов перетасовки после слияния.
Значение должно составлять не более 20% от значения параметра |
1 МБ |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
Начальное количество разделов перетасовки перед слиянием.
Если значение не задано, используется значение параметра |
— |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
Рекомендуемый размер (в байтах) разделов перетасовки при адаптивной оптимизации ( |
64 МБ |
Преобразование sort-merge JOIN в broadcast hash JOIN
AQE автоматически преобразует JOIN сортировочного слияния (sort-merge JOIN) в транслируемый хеш-JOIN (broadcast hash JOIN), если статистика любой из сторон JOIN меньше порога адаптивного транслируемого хеш-JOIN (определяется параметром spark.sql.adaptive.autoBroadcastJoinThreshold
).
Это не так эффективно, как использование изначально транслируемого хеш-JOIN, но все же более эффективно, чем JOIN слияния, так как позволяет сохранить сортировку обеих сторон JOIN и локально обработать файлы перетасовки для уменьшения нагрузки на сеть (при условии, что spark.sql.adaptive.localShuffleReader.enabled=true
).
Свойство | Описание | Значение по умолчанию |
---|---|---|
spark.sql.adaptive.autoBroadcastJoinThreshold |
Устанавливает максимальный размер (в байтах) для таблицы, которая будет транслироваться на все worker-узлы при выполнении JOIN.
Установка значения |
— |
Преобразование sort-merge JOIN в shuffled hash JOIN
AQE автоматически преобразует JOIN сортировочного слияния (sort-merge JOIN) в перетасованный хеш-JOIN (shuffled hash JOIN), если все разделы после сортировки меньше предельного значения, определяемого параметром spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
.
Свойство | Описание | Значение по умолчанию |
---|---|---|
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
Устанавливает максимальный размер (в байтах) для каждого раздела, который может быть использован для построения локальной хеш-таблицы.
Если значение параметра больше или равно |
0 |
Оптимизация перекосов в JOIN
Перекос данных (data skew) часто приводит к замедлению JOIN-операций.
Данная функция AQE динамически обрабатывает перекос в JOIN сортировочного слияния (sort-merge JOIN), разбивая (и при необходимости реплицируя) перекошенные задачи на примерно одинаковые по размеру.
Функция активируется при установке параметров spark.sql.adaptive.enabled=true
и spark.sql.adaptive.skewJoin.enabled=true
.
Свойство | Описание | Значение по умолчанию |
---|---|---|
spark.sql.adaptive.skewJoin.enabled |
Если установлено значение |
true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
Раздел считается перекошенным, если его размер (в байтах) больше данного коэффициента, умноженного на медианный размер раздела, а также если размер раздела больше значения |
5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
Раздел считается перекошенным, если его размер (в байтах) больше данного предельного значения, а также если размер раздела больше значения параметра |
256 МБ |