Оптимизация производительности Flink

В данной статье описаны методы оптимизации и лучшие практики для повышения производительности сервиса Flink.

Память

Достаточный объем памяти, выделяемый компонентам Flink Task Manager/Job Manager, является одним из основных факторов стабильной работы Flink. Сюда входит не только JVM heap, но также области metaspace и off-heap. Для управления настройками памяти используйте параметры taskmanager.memory.* и jobmanager.memory.* в ADCM (Clusters → <cluster_name> → Services → Flink → Primary Configuration → flink-conf.yaml).

Параллелизм

Использование настроек параллелизма, подходящих под конкретный конвейер данных, играет важную роль для равномерного распределения нагрузки по кластеру Flink. Ниже приведены основные параметры для конфигурации параллелизма Flink. Эти параметры указываются в разделе настроек flink-conf.yaml в ADCM. Информация о том, как программно задать фактор параллелизма для уровней operator/environment/client, доступна в документации Flink.

Свойство конфигурации Описание Значение по умолчанию

parallelism.default

Устанавливает количество подзадач, которые каждый оператор может выполнять параллельно. Если значение параллелизма не указано на уровне оператора или задачи (job), Flink использует это значение по умолчанию

1

taskmanager.numberOfTaskSlots

Количество операторов или пользовательских функций, которые могут выполняться одним TaskManager одновременно. Установка значения больше 1 позволяет TaskManager использовать несколько ядер процессора, однако в таком случае общая выделенная память будет делиться между отдельными операторами/функциями. Этот параметр рекомендуется устанавливать равным количеству физических CPU-ядер хоста TaskManager

1

Контрольные точки и точки сохранения

Контрольные точки Flink (checkpoints) — это механизм обеспечения отказоустойчивости, с помощью которого данные состояния приложения (state data) сохраняются в хранилище (например, HDFS) с возможностью последующего восстановления в случае сбоев. По умолчанию создание контрольных точек отключено.

Основными характеристиками, которые влияют на производительность, являются частота создания контрольных точек и тайм-аут. Данные параметры можно настроить с помощью следующих свойств конфигурации.

Свойство конфигурации Описание Значение по умолчанию

execution.checkpointing.interval

Интервал для создания контрольных точек

 — 

execution.checkpointing.timeout

Максимальное время выполнения операции создания контрольной точки

10 min

Эти и другие параметры контрольных точек следует указывать в разделе настроек Custom flink-conf.yaml в ADCM. Пример конфигурации контрольных точек в коде Flink-приложения:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000); (1)
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); (2)
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoint(1); (3)
1 Включение контрольных точек каждые 1000 миллисекунд.
2 Установка режима контрольных точек EXACTLY_ONCE.
3 Максимальное количество параллельных контрольных точек.

Точки сохранения Flink (savepoints) являются схожим инструментом и служат для создания снепшотов состояния приложения с возможностью возобновления работы с этой точки. В отличие от контрольных точек, которые создаются автоматически через регулярные промежутки времени, точки сохранения создаются пользователем вручную. Если контрольные точки предназначены для обеспечения отказоустойчивости, то точки сохранения лучше всего подходят для обновления задач, миграции и ручного резервного копирования. Точки сохранения более гибки по сравнению с контрольными точками и позволяют указывать размер, частоту, формат хранения, а также другие характеристики, наиболее подходящие под конкретный ETL-пайплайн.

Управление состоянием

Использование подходящей реализации state backend позволяет значительно улучшить производительность Flink в зависимости от типа нагрузки. Flink поставляется с несколькими реализациями для управления состоянием (также допустима кастомная реализация):

  • HashMapStateBackend. Реализация по умолчанию, которая хранит данные о состоянии приложения в хеш-таблице. Сохраняя данные в памяти, является очень быстрым механизмом для управления состоянием, однако в таком случае максимальный размер данных ограничен объемом памяти на узлах кластера. Этот бэкенд подходит для приложений с небольшим или средним размером state-данных.

  • EmbeddedRocksDBStateBackend. Хранит данные о состоянии в базе данных RocksDB, что подразумевает расходы на (де)сериализацию, а также операции ввода/вывода. Эта реализация лучше всего подходит для приложений с большим объемом state-данных. Объем данных состояния, который может обработать бэкенд, ограничен только объемом дискового пространства на хостах кластера Flink. Для крупномасштабных проектов рекомендуется использовать EmbeddedRocksDBStateBackend или другие реализации.

Бэкенд управления данными состояния можно указать глобально или на уровне отдельных задач. Чтобы указать бэкенд по умолчанию для сервиса Flink, используйте свойство state.backend.type в Custom flink-conf.yaml. Допустимые значения свойства: hashmap, rocksdb или имя класса, реализующего StateBackendFactory. Ниже показан способ указания бэкенда состояния на уровне задачи:

  • Java

  • Scala

  • Python

Configuration conf = new Configuration();
conf.set(StateBackendOptions.STATE_BACKEND, "hashmap");
env.configure(config);
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new HashMapStateBackend())
conf = Configuration()
conf.set_string('state.backend.type', 'hashmap')
env = StreamExecutionEnvironment.get_execution_environment(config)

Оптимизация агрегирования

Flink поддерживает агрегатные функции, такие как SUM, COUNT, DISTINCT и так далее. При большом количестве входящих данных использование агрегатных функций может стать узким местом в ETL-пайплайне. Ниже приведены рекомендации по оптимизации агрегатных функций.

Агрегация MiniBatch

По умолчанию операторы групповой агрегации (group aggregation operators) обрабатывают входящие записи последовательно одну за другой. Такой подход часто приводит к нагрузке на state backend-систему (особенно если используется RocksDB). Более того, неизбежные перекосы данных в продуктовой среде усугубляют проблему, приводя к частым backpressure-состояниям.

Основная идея мини-пакетной агрегации заключается в кешировании поступающих данных в буфере оператора агрегации. При обработке сразу нескольких записей из буфера, движку Flink требуется лишь одна операция доступа к данным состояния по указанному ключу. Это уменьшает количество обращений к данным состояния и обеспечивает более высокую пропускную способность. Однако при таком подходе появляется задержка, так как входящие данные буферизируются, а не обрабатываются мгновенно. Использование мини-пакетной агрегации подразумевает компромисс между стабильной пропускной способностью и задержкой на буферизацию.

Следующая схема иллюстрирует традиционный и мини-пакетный подходы к агрегации Flink.

Агрегация MiniBatch
Агрегация MiniBatch
Агрегация MiniBatch
Агрегация MiniBatch

По умолчанию функция мини-пакетной агрегации отключена. Для включения этой оптимизации используйте следующие конфигурационные свойства в ADCM-настройках Custom flink-conf.yaml.

Свойство конфигурации Описание Значение по умолчанию

table.exec.mini-batch.enabled

Включает/отключает мини-пакетную агрегацию для буферизации входных данных. Flink формирует пакеты в буфере либо через определенный интервал, либо по достижении максимально допустимого количества записей в буфере

false

table.exec.mini-batch.allow-latency

Максимальный интервал задержки для буферизации входных записей. Единица времени указывается вместе со значением, например 5 s, 500 ms и так далее

 — 

table.exec.mini-batch.size

Максимальный размер пакета в буфере

-1

Ниже приведен пример программной установки этих свойств:

  • Java

  • Scala

  • Python

  • SQL CLI

Configuration configuration = new Configuration();

configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "2 s");
configuration.setString("table.exec.mini-batch.size", "1000");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
                               .inStreamingMode()
                               .withConfiguration(configuration)
                               .build();
val configuration = new Configuration;
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "2 s")
configuration.setString("table.exec.mini-batch.size", "1000")
val settings = EnvironmentSettings.newInstance
               .inStreamingMode
               .withConfiguration(configuration)
               .build
val tEnv: TableEnvironment = TableEnvironment.create(settings)
configuration = Configuration()
configuration.set("table.exec.mini-batch.enabled", "true")
configuration.set("table.exec.mini-batch.allow-latency", "2 s")
configuration.set("table.exec.mini-batch.size", "1000")
settings = EnvironmentSettings.new_instance() \
          .in_streaming_mode() \
          .with_configuration(configuration) \
          .build()
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '5000';

Агрегация типа local-global

Этот метод направлен на минимизацию эффекта перекоса входящих данных путем разбиения групповой агрегации на два этапа:

  1. Сначала данные агрегируются локально при попадании в оператор, а затем агрегируются еще раз глобально. Такой подход похож на этапы combine + reduce в парадигме MapReduce.

  2. На этапе глобальной агрегации вместо обработки большого количества входных записей обрабатываются пакеты с накопленными (reduced) данными. Это позволяет создать более равномерную нагрузку на каждый оператор.

Такое разделение позволяет значительно сократить количество shuffle-операций по сети и количество обращений к данным состояния. Объем входящих записей, накапливаемых на этапе локальной агрегации, зависит от интервала мини-пакетной агрегации, что означает, что эта оптимизация работает вместе с оптимизацией MiniBatch.

Рассмотрим следующий SQL-запрос:

SELECT acc_id, SUM(txn_id)
FROM transactions
GROUP BY acc_id;

Если записи во входящем потоке данных имеют перекос, некоторым операторам SUM придется обрабатывать гораздо больше данных, чем другим, что создает неравномерную нагрузку. Добавление этапа локальной агрегации позволяет "накопить" пакет записей с одинаковым ключом и обработать релевантные данные состояния за меньшее количество итераций.

Следующая схема иллюстрирует принцип агрегации типа local-global.

Агрегация типа local-global
Агрегация типа local-global
Агрегация типа local-global
Агрегация типа local-global

Чтобы использовать эту оптимизацию, установите свойство table.optimizer.agg-phase-strategy=TWO_PHASE в разделе настроек Custom flink-conf.yaml в ADCM или установите его программно:

  • Java

  • Scala

  • Python

  • SQL CLI

Configuration configuration = new Configuration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "2 s");
configuration.setString("table.exec.mini-batch.size", "1000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
...
val configuration = new Configuration;
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "2 s")
configuration.setString("table.exec.mini-batch.size", "1000")
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE")

...
configuration = Configuration()
configuration.set("table.exec.mini-batch.enabled", "true")
configuration.set("table.exec.mini-batch.allow-latency", "2 s")
configuration.set("table.exec.mini-batch.size", "1000")
configuration.set_string("table.optimizer.agg-phase-strategy", "TWO_PHASE")
...
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5s';
SET 'table.exec.mini-batch.size' = '5000';
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';

Разделение агрегации DISTINCT

Метод двухэтапной агрегации типа local-global эффективен для минимизации перекосов при работе с операторами SUM, COUNT, MAX, MIN, AVG. Однако этот подход теряет свою эффективность при появлении оператора DISTINCT.

Рассмотрим следующий SQL, используемый для получения количества уникальных аккаунтов по дням:

SELECT txn_date, COUNT(DISTINCT acc_id)
FROM transactions
GROUP BY txn_date;

Использование агрегации типа local-global для такого запроса дает очень небольшой выигрыш в производительности, особенно если значения acc_id в большинстве своем уникальны. В этом случае Flink помещает в аккумулятор (accumulator) почти все входящие записи, а значит, этап глобальной агрегации будет выполняться медленнее.

Решением этой проблемы является разделение агрегации DISTINCT на два этапа:

  1. Сначала Flink выполняет агрегацию по групповому ключу (group key) и дополнительно по ключу бакета. Ключ бакета вычисляется по формуле: . Значение BUCKET_NUM по умолчанию равно 1024 и может быть изменено с помощью свойства table.optimizer.distinct-agg.split.bucket-num.

  2. Затем с помощью еще одной агрегации данные сортируются по исходному групповому ключу, а оператор SUM подсчитывает значения COUNT(DISTINCT …​) из разных бакетов. В этом случае ключ бакета служит дополнительным критерием для группировки, снижая нагрузку на операции по исходной группе.

Используя разделение агрегации DISTINCT, Flink выполняет приведенный выше SQL-запрос так, как если бы он имел следующую структуру:

SELECT txn_date, SUM(cnt)
FROM (
    SELECT txn_date, COUNT(DISTINCT acc_id) as cnt
    FROM transactions
    GROUP BY txn_date, MOD(HASH_CODE(acc_id), 1024)
)
GROUP BY txn_date;

На следующей схеме описан процесс разделения агрегации для вышеуказанного SQL-запроса (цветом обозначены группы txn_date, а буквами — ID аккаунтов).

Разделение агрегации DISTINCT
Разделение агрегации DISTINCT
Разделение агрегации DISTINCT
Разделение агрегации DISTINCT
ПРИМЕЧАНИЕ
На схеме представлен простейший пример, который может выиграть от этой оптимизации. Flink также поддерживает разделение более сложных агрегатных запросов, включающих два или более оператора агрегации с разными DISTINCT-ключами, например, COUNT(DISTINCT k1), SUM(DISTINCT k2).

Чтобы активировать эту оптимизацию, используйте свойство table.optimizer.distinct-agg.split.enabled=true в разделе настроек Custom flink-conf.yaml в ADCM или укажите его в коде Flink-приложения:

  • Java

  • Scala

  • Python

TableEnvironment env = ...;
env.getConfig()
   .set("table.optimizer.distinct-agg.split.enabled", "true");
val tenv: TableEnvironment = ...

tenv.getConfig
    .set("table.optimizer.distinct-agg.split.enabled", "true")
tenv = ...
tenv.get_config().set("table.optimizer.distinct-agg.split.enabled", "true")

Обработка и сериализация данных

Ниже приведены общие рекомендации по обработке данных, которые могут ускорить работу Flink.

  • Использование сетевых буферов. Изменение настроек сетевых буферов влияет на скорость работы приложений Flink. Основная идея состоит в том, чтобы подобрать оптимальный размер буфера, сбалансировав потребление памяти и скорость обработки. Стоит следить за количеством данных, проходящих через сетевые буферы — это один из способов избежать замедления работы приложений.

  • Для управления потоками данных используйте оконные функции Flink (windows).

  • Используйте преимущества таких операторов Flink, как Join и Filter.

  • Эффективная (де)сериализация крайне важна для обеспечения высокой производительности и масштабируемости. Медленная де/сериализация, помноженная на объем входящих данных, может значительно замедлить работу приложений Flink. Используйте фреймворк сериализации, подходящий для вашего приложения, например Avro или Protobuf.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней