Оптимизация производительности Hive

В этой статье описаны методы оптимизации и лучшие практики для повышения производительности вашего Hive-кластера.

Партиционирование (Partitioning)

Партиционирование в Hive — это способ разбиения одной массивной таблицы на меньшие единицы (партиции) в зависимости от данных в столбцах. Hive создает отдельную партицию для каждого уникального значения в столбце. В файловой системе HDFS для каждой партиции создается директория, где хранятся данные каждой партиции.

Партиционирование может значительно ускорить работу Hive, избавляя от необходимости сканировать данные, не имеющие отношения к текущему запросу. Партиционирование наиболее эффективно, когда данные равномерно распределяются между партициями. При сильном "перекосе" (skew) данных в партициях у одних worker-узлов будет гораздо больше данных для обработки, чем у других, что приведет к неэффективному использованию ресурсов кластера. Также эффективность партиционирования во многом зависит от того, как будут использоваться столбцы разделения. Например, рекомендуется разделять данные на партиции по столбцам, которые регулярно участвуют в операциях GROUP BY.

Синтаксис для создания таблицы с партициями выглядит следующим образом:

CREATE TABLE transactions(
    txn_id int, acc_id int, txn_amount decimal(10,2)
 ) PARTITIONED BY (txn_date date);

Данная команда создает таблицу с партициями по столбцу txn_date. Вывод команды DESCRIBE transactions:

+--------------------------+----------------+----------+
|         col_name         |   data_type    | comment  |
+--------------------------+----------------+----------+
| txn_id                   | int            |          |
| acc_id                   | int            |          |
| txn_amount               | decimal(10,2)  |          |
| txn_date                 | date           |          |
|                          | NULL           | NULL     |
| # Partition Information  | NULL           | NULL     |
| # col_name               | data_type      | comment  |
| txn_date                 | date           |          |
+--------------------------+----------------+----------+

Для каждой уникальной даты в столбце txn_date Hive создаст новую партицию для хранения данных, относящихся к этой дате. Выполните следующую команду, чтобы увидеть HDFS-файлы, используемые Hive для хранения данных таблицы:

$ hdfs dfs -ls /apps/hive/warehouse/transactions

Вывод:

Found 3 items
drwxr-xr-x   - hive hadoop          0 2024-05-01 00:48 /apps/hive/warehouse/transactions/txn_date=2023-01-01
drwxr-xr-x   - hive hadoop          0 2024-05-01 00:49 /apps/hive/warehouse/transactions/txn_date=2023-01-02
drwxr-xr-x   - hive hadoop          0 2024-05-01 00:49 /apps/hive/warehouse/transactions/txn_date=2023-01-03

Hive создает директорию для каждой партиции и записывает соответствующие данные в эту директорию. Такое разбиение очень эффективно для дальнейших операций чтения, особенно на больших массивах данных. В последующих запросах Hive должен будет сканировать только определенные директории вместо сканирования всего набора данных.

ПРИМЕЧАНИЕ
Стоит внимательно выбирать столбец для партиционирования таблицы. Слишком большое количество партиций (а значит, и создаваемых HDFS-файлов) может избыточно нагружать NameNode.

Вы также можете разделить таблицу на партиции по нескольким столбцам одновременно для более детального разбиения. Пример создания таблицы с несколькими партициями показан ниже.

CREATE TABLE transactions_1 (
    id int, txn_amount decimal(10,2)
 ) PARTITIONED BY (txn_date date, acc_id int);

Получите информацию о партициях таблицы с помощью команды:

DESCRIBE transactions_1;

Вывод:

+--------------------------+----------------+----------+
|         col_name         |   data_type    | comment  |
+--------------------------+----------------+----------+
| id                       | int            |          |
| txn_amount               | decimal(10,2)  |          |
| txn_date                 | date           |          |
| acc_id                   | int            |          |
|                          | NULL           | NULL     |
| # Partition Information  | NULL           | NULL     |
| # col_name               | data_type      | comment  |
| txn_date                 | date           |          |
| acc_id                   | int            |          |
+--------------------------+----------------+----------+

При записи данных в такую таблицу Hive сохраняет данные в HDFS, как показано ниже.

$ hdfs dfs -ls -R /apps/hive/warehouse/transactions_1

Вывод:

drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01/acc_id=1
-rw-r--r--   3 hive hadoop          8 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01/acc_id=1/000000_0
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01/acc_id=2
-rw-r--r--   3 hive hadoop          9 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01/acc_id=2/000000_0
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-02
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-02/acc_id=3
-rw-r--r--   3 hive hadoop          8 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-02/acc_id=3/000000_0
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03/acc_id=1
-rw-r--r--   3 hive hadoop          8 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03/acc_id=1/000000_0
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03/acc_id=2
-rw-r--r--   3 hive hadoop          8 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03/acc_id=2/000000_0

Обратите внимание, что HDFS-директории txn_date={date} могут содержать поддиректории acc_id=N, где хранятся данные партиций по столбцу acc_id.

Бакетинг (Bucketing)

Использование бакетов (buckets) в Hive — это способ оптимизации, предполагающий разбиение одной большой таблицы на несколько файлов, которые легче сканировать. Основное различие между партициями и бакетами заключается в том, что Hive создает новые партиции на основе данных таблицы — по одной партиции на каждое уникальное значение в столбце. В случае с бакетами вы можете указать заранее определенное количество бакетов в момент создания таблицы.

В следующей таблице приведены основные различия между партициями и бакетами.

Партиционирование Бакетинг

Для каждой партиции создается HDFS-директория

Для каждого бакета создается HDFS-файл

Разбиение таблицы на бакеты осуществляется по одному или по нескольким столбцам

Для разбиения таблицы на бакеты может использоваться только один столбец

Количество партиций определяется содержимым таблицы: одна партиция на каждое уникальное значение в столбце

Количество бакетов может быть указано в момент создания таблицы

Используется предложение PARTITIONED BY

Используется предложение CLUSTERED BY

Использование бакетов оказывает следующий эффект на Hive-кластер:

  • Уменьшается общий размер хранимых данных.

  • Снижается скорость вставки данных.

  • Выборка по ключу сортировки выполняется быстрее.

  • Объединение двух таблиц с помощью sort merge join может быть выполнено без предварительной сортировки.

  • Hive-бакеты несовместимы с бакетами Spark из-за различий в алгоритмах хеширования.

Синтаксис для создания таблицы с бакетами имеет следующий вид:

CREATE TABLE employees(
    emp_id int,
    first_name string,
    last_name string,
    department_id int
)
CLUSTERED BY (department_id) SORTED BY (emp_id) INTO 5 BUCKETS;

Эта команда создает таблицу Hive, данные которой будут храниться в 5 HDFS-файлах (значение 5 в предложении CLUSTERED BY). При вставке данных в такую таблицу Hive равномерно распределяет данные между 5 файлами HDFS. В каждом бакете (файле) данные хранятся отсортированными по столбцу emp_id (предложение SORTED BY).

Как и в случае с партиционированием, распределение данных по отдельным бакетам сокращает общее время чтения. Однако количество бакетов должно быть разумным, так как слишком большое количество маленьких файлов может свести на нет все усилия по оптимизации.

Вы можете использовать бакеты вместе с партициями для расширенного разделения данных и повышения производительности. Например:

CREATE TABLE employees(
    emp_id int,
    first_name string,
    last_name string,
    office_num int
)
PARTITIONED BY (department_id int)
CLUSTERED BY (office_num) INTO 5 BUCKETS;
ПРИМЕЧАНИЕ
Столбец, используемый для партиционирования, указывается только в предложении PARTITIONED BY. Столбец, используемый для разбиения на бакеты, определяется и в спецификации столбцов, и в предложении CLUSTERED BY.

Векторизация

Векторное выполнение запросов — это встроенная функция оптимизации Hive, которая помогает снизить нагрузку на CPU за счет обработки блоков строк, а не одной строки за раз. При обработке блоков строк каждый столбец хранится в памяти как массив (вектор) примитивных типов данных. Арифметические операции и операции сравнения выполняются над примитивными значениями из массива. Такой подход сокращает количество проверок условий для каждого значения в столбце, позволяет более эффективно использовать процессор, активно задействуя кеширование, а также уменьшает общее время обработки данных.

Векторизация в Hive работает только для определенных типов данных и имеет ограниченную поддержку пользовательских функций. По умолчанию векторизация в Hive отключена. Для использования векторизованных запросов необходимо выполнить следующие условия:

  • В качестве формата хранения должен использоваться ORC.

  • Установлено конфигурационное свойство hive.vectorized.execution.enabled = true.

Для выполнения векторизованного запроса используйте следующие шаги:

  1. Создайте Hive-таблицу с форматом данных ORC:

    CREATE TABLE test_table_vectorized (
        id int, value string
    ) STORED AS ORC;
  2. Активируйте функцию векторизации с помощью свойства:

    SET hive.vectorized.execution.enabled=true;
  3. Выполните команду EXPLAIN EXTENDED для функции агрегации:

    EXPLAIN EXTENDED
        SELECT COUNT(*) FROM test_table_vectorized;

    Вывод:

+----------------------------------------------------+
|                      Explain                       |
+----------------------------------------------------+
| STAGE DEPENDENCIES:                                |
|   Stage-1 is a root stage                          |
|   Stage-0 depends on stages: Stage-1               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-1                                   |
|     Tez                                            |
|       DagId: hive_20240504214716_0b61644b-d042-4333-8566-05a678208479:12 |
|       Edges:                                       |
|         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)    |
|       DagName: hive_20240504214716_0b61644b-d042-4333-8566-05a678208479:12 |
|       Vertices:                                    |
|         Map 1                                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   alias: test_table_vectorized     |
|                   Statistics: Num rows: 2 Data size: 182 Basic stats: COMPLETE Column stats: COMPLETE |
|                   GatherStats: false               |
|                   Select Operator                  |
|                     Statistics: Num rows: 2 Data size: 182 Basic stats: COMPLETE Column stats: COMPLETE |
|                     Group By Operator              |
|                       aggregations: count()        |
|                       mode: hash                   |
|                       outputColumnNames: _col0     |
|                       Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE |
|                       Reduce Output Operator       |
|                         null sort order:           |
|                         sort order:                |
|                         Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE |
|                         tag: -1                    |
|                         value expressions: _col0 (type: bigint) |
|                         auto parallelism: false    |
|             Execution mode: vectorized             |
|             ...
|         Reducer 2                                  |
|             Execution mode: vectorized             |
|             ...
|                                                    |
|   Stage: Stage-0                                   |
|     Fetch Operator                                 |
|       limit: -1                                    |
|       Processor Tree:                              |
|         ListSink                                   |
|                                                    |
+----------------------------------------------------+

Как можно увидеть в выводе, определенные этапы выполнения запроса содержат строку Execution mode: vectorized, что свидетельствует о том, что данный этап был выполнен в режиме векторизации.

Если установить значение hive.vectorized.execution.enabled=false (значение по умолчанию), Hive вернется к построчной обработке без использования векторов.

Оптимизация затрат (Cost-based optimization)

Механизм оптимизации затрат (Сost-Based Optimization, CBO) — это встроенная функция Hive, которая выполняет такие оптимизации, как переписывание запросов (query rewriting), переупорядочивание JOIN-операций для более эффективной работы, исключение JOIN-операций из AST и так далее.

В основе данного функционала лежит фреймворк Apache Calcite, а основным источником данных для выполнения оптимизаций является статистика таблиц. По умолчанию механизм оптимизации затрат отключен. Для активации CBO выполните следующие шаги.

  1. Установите следующие конфигурационные свойства в ADCM (Clusters → <clusterName> → Services → Hive → Primary configuration):

    • hive.cbo.enable=true

    • hive.compute.query.using.stats=true

    • hive.stats.fetch.column.stats=true

  2. Выполните сбор статистики для необходимой таблицы. Это обновит актуальную статистику таблицы в базе данных Hive Metastore.

    ANALYZE TABLE transactions COMPUTE STATISTICS;

Использование TEZ

Apache TEZ — это платформа для создания высокопроизводительных приложений, которая используется в качестве движка исполнения для Hive. TEZ считается гораздо более гибким и мощным преемником фреймворка MapReduce.

TEZ является предпочтительным выбором для вычислительных задач Hive, поскольку он уменьшает количество обращений к диску и уменьшает общее время выполнения.

В ADH TEZ доступен в виде отдельного компонента (Hive Tez) сервиса Hive. Если в кластере ADH установлен компонент Hive Tez, он по умолчанию используется в качестве основного движка исполнения (параметр hive.execution.engine).

Формат данных ORC

Использование подходящего формата хранения данных имеет основополагающее значение для производительности Hive. Формат Optimized Row Columnar (ORC) — это формат данных, разработанный специально для экосистемы Hadoop. Будучи столбцовым форматом, ORC оптимизирован для операций на основе столбцов, например, фильтрация и агрегация. По сравнению с другими форматами данных, поддерживаемыми Hive (Parquet, Avro, TextFile и так далее), ORC опережает их по показателям компрессии и скорости, позволяя экономить петабайты данных.

По умолчанию Hive использует формат хранения TextFile. Чтобы создать таблицу, хранящую данные в формате ORC, используйте предложение STORED AS ORC, например:

CREATE TABLE test_tbl_orc (
  id int,
  data String
) STORED AS ORC;
Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней