Работа с таблицами Iceberg в Hive

Apache Iceberg — это открытый высокопроизводительный формат для больших аналитических таблиц. В сервисе ADH Hive данный формат может быть использован для повышения производительности таблиц Hive.

ПРИМЕЧАНИЕ
Hive поддерживает Iceberg-таблицы начиная с версии ADH 3.2.4.3.

Использование Iceberg-таблиц в Hive доступно "из коробки". Поддержка формата Iceberg основана на механизме Hive StorageHandlers. Это значит, что для создания Iceberg-таблицы в Hive необходимо указать обработчик org.apache.iceberg.mr.hive.HiveIcebergStorageHandler в предложении STORED BY. Поддержка таблиц Iceberg включена по умолчанию и может быть отключена с помощью свойства iceberg.engine.hive.enabled (Clusters → <clusterName> → Services → Hive → Primary Configuration → hive-site.xml). Также параметр engine.hive.enabled можно указать программно в качестве свойства таблицы в момент ее создания. Пример показан ниже:

Catalog catalog = ... ;
    Map<String, String> tableProperties=Maps.newHashMap();
    tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED,"true");
    catalog.createTable(tableId,schema,spec,tableProperties);
ПРИМЕЧАНИЕ
Указание свойства для конкретной таблицы имеет приоритет над глобальной конфигурацией ADH.

На момент релиза ADH 3.2.4.3 поддержка Iceberg-таблиц в Hive включает в себя следующие возможности:

  • Создание таблицы.

  • Удаление таблицы.

  • Выборка данных.

  • Вставка данных.

Команды DDL

Создание таблицы Iceberg

Чтобы создать Iceberg-таблицу, используйте предложение STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' в стандартной Hive-команде CREATE TABLE.

Пример:

CREATE TABLE transactions (txn_id int, acc_id int, txn_amount decimal(10,2), txn_date date)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

Выполните команду DESCRIBE FORMATTED, чтобы получить информацию о только что созданной таблице:

DESCRIBE FORMATTED transactions;
Пример вывода
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
|           col_name            |                     data_type                      |                      comment                       |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name                    | data_type                                          | comment                                            |
| id                            | int                                                | from deserializer                                  |
| value                         | string                                             | from deserializer                                  |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | default                                            | NULL                                               |
| OwnerType:                    | USER                                               | NULL                                               |
| Owner:                        | hive                                               | NULL                                               |
| CreateTime:                   | Wed Jun 05 14:09:36 UTC 2024                       | NULL                                               |
| LastAccessTime:               | UNKNOWN                                            | NULL                                               |
| Retention:                    | 0                                                  | NULL                                               |
| Location:                     | hdfs://adh/apps/hive/warehouse/test_iceberg_table  | NULL                                               |
| Table Type:                   | MANAGED_TABLE                                      | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | bucketing_version                                  | 2                                                  |
|                               | current-schema                                     | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"value\",\"required\":false,\"type\":\"string\"}]} |
|                               | engine.hive.enabled                                | true                                               |
|                               | external.table.purge                               | TRUE                                               |
|                               | metadata_location                                  | hdfs://adh/apps/hive/warehouse/test_iceberg_table/metadata/00000-fa65f392-3b34-4e51-915b-8e2aae261098.metadata.json |
|                               | numFiles                                           | 0                                                  |
|                               | numRows                                            | 0                                                  |
|                               | rawDataSize                                        | 0                                                  |
|                               | snapshot-count                                     | 0                                                  |
|                               | storage_handler                                    | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler |
|                               | table_type                                         | ICEBERG                                            |
|                               | totalSize                                          | 0                                                  |
|                               | transient_lastDdlTime                              | 1717596576                                         |
|                               | uuid                                               | f9da6135-095e-491f-919e-f2194253b8f8               |
|                               | write.parquet.compression-codec                    | zstd                                               |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | org.apache.iceberg.mr.hive.HiveIcebergSerDe        | NULL                                               |
| InputFormat:                  | org.apache.iceberg.mr.hive.HiveIcebergInputFormat  | NULL                                               |
| OutputFormat:                 | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | 0                                                  | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+

Поле table_type в выводе указывает на то, что таблица была создана именно как Iceberg-таблица.

CREATE TABLE AS SELECT

Вы также можете использовать синтаксис Hive CREATE TABLE AS <tbl_name> (CTAS) для создания таблицы Iceberg на основе уже имеющейся таблицы Hive. В этом случае основное отличие заключается в том, что таблица Iceberg создается в самом начале выполнения запроса, в то время как данные вставляются в таблицу уже после завершения запроса. Таким образом, в течение некоторого времени таблица может не содержать никаких данных.

CREATE TABLE transactions_ctas
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' AS
SELECT * FROM transactions;
Пример вывода команды DESCRIBE FORMATTED
DESCRIBE FORMATTED transactions_ctas;
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
|           col_name            |                     data_type                      |                      comment                       |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name                    | data_type                                          | comment                                            |
| id                            | int                                                | from deserializer                                  |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | default                                            | NULL                                               |
| OwnerType:                    | USER                                               | NULL                                               |
| Owner:                        | hive                                               | NULL                                               |
| CreateTime:                   | Mon Jun 03 20:59:25 UTC 2024                       | NULL                                               |
| LastAccessTime:               | UNKNOWN                                            | NULL                                               |
| Retention:                    | 0                                                  | NULL                                               |
| Location:                     | hdfs://adh/apps/hive/warehouse/test_iceberg_table1 | NULL                                               |
| Table Type:                   | MANAGED_TABLE                                      | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | current-schema                                     | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"}]} |
|                               | engine.hive.enabled                                | true                                               |
|                               | external.table.purge                               | TRUE                                               |
|                               | metadata_location                                  | hdfs://adh/apps/hive/warehouse/test_iceberg_table1/metadata/00000-df221cec-ab76-4de9-95cd-ef7605c8e79f.metadata.json |
|                               | numFiles                                           | 0                                                  |
|                               | numRows                                            | 0                                                  |
|                               | rawDataSize                                        | 0                                                  |
|                               | snapshot-count                                     | 0                                                  |
|                               | storage_handler                                    | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler |
|                               | table_type                                         | ICEBERG                                            |
|                               | totalSize                                          | 0                                                  |
|                               | transient_lastDdlTime                              | 1717448365                                         |
|                               | uuid                                               | eb1933a0-43d9-4dd2-9426-cf7ace5063c8               |
|                               | write.parquet.compression-codec                    | zstd                                               |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | org.apache.iceberg.mr.hive.HiveIcebergSerDe        | NULL                                               |
| InputFormat:                  | org.apache.iceberg.mr.hive.HiveIcebergInputFormat  | NULL                                               |
| OutputFormat:                 | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | 0                                                  | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+

CREATE TABLE LIKE

Вы также можете использовать синтаксис CREATE TABLE LIKE <tbl_name> для создания пустой таблицы Iceberg, используя структуру существующей таблицы Hive.

Пример:

CREATE TABLE transactions_ctlt LIKE transactions
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
Пример вывода команды DESCRIBE FORMATTED
DESCRIBE FORMATTED transactions_ctlt;
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
|           col_name            |                     data_type                      |                      comment                       |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name                    | data_type                                          | comment                                            |
| txn_id                        | int                                                | from deserializer                                  |
| txn_amount                    | decimal(10,2)                                      | from deserializer                                  |
| txn_date                      | date                                               | from deserializer                                  |
| acc_id                        | int                                                | from deserializer                                  |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | default                                            | NULL                                               |
| OwnerType:                    | USER                                               | NULL                                               |
| Owner:                        | hive                                               | NULL                                               |
| CreateTime:                   | Thu Jun 06 13:57:31 UTC 2024                       | NULL                                               |
| LastAccessTime:               | UNKNOWN                                            | NULL                                               |
| Retention:                    | 0                                                  | NULL                                               |
| Location:                     | hdfs://adh/apps/hive/warehouse/transactions        | NULL                                               |
| Table Type:                   | MANAGED_TABLE                                      | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | bucketing_version                                  | 2                                                  |
|                               | current-schema                                     | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"txn_id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"txn_amount\",\"required\":false,\"type\":\"decimal(10, 2)\"},{\"id\":3,\"name\":\"txn_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"acc_id\",\"required\":false,\"type\":\"int\"}]} |
|                               | default-partition-spec                             | {\"spec-id\":0,\"fields\":[{\"name\":\"acc_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000}]} |
|                               | engine.hive.enabled                                | true                                               |
|                               | external.table.purge                               | TRUE                                               |
|                               | metadata_location                                  | hdfs://adh/apps/hive/warehouse/transactions/metadata/00000-b6dde9b3-d1cc-4f92-828e-5dab83b1e5e8.metadata.json |
|                               | snapshot-count                                     | 0                                                  |
|                               | storage_handler                                    | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler |
|                               | table_type                                         | ICEBERG                                            |
|                               | transient_lastDdlTime                              | 1717682251                                         |
|                               | uuid                                               | c6205d6d-ddca-4b04-a780-8b8e1048db0b               |
|                               | write.parquet.compression-codec                    | zstd                                               |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | org.apache.iceberg.mr.hive.HiveIcebergSerDe        | NULL                                               |
| InputFormat:                  | org.apache.iceberg.mr.hive.HiveIcebergInputFormat  | NULL                                               |
| OutputFormat:                 | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | 0                                                  | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
+-------------------------------+----------------------------------------------------+----------------------------------------------------

Создание партиционированной таблицы

Чтобы создать партиционированную таблицу, используйте синтаксис, как показано в следующем примере:

CREATE TABLE transactions_partitioned (
    txn_id int, txn_amount decimal(10,2), txn_date date)
PARTITIONED BY (acc_id int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

В выводе команды DESCRIBE FORMATTED найдите свойство default-partition-spec. В этом поле хранится спецификация партиций в формате Iceberg.

Пример вывода команды DESCRIBE FORMATTED
DESCRIBE FORMATTED transactions_partitioned;
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
|           col_name            |                     data_type                      |                      comment                       |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name                    | data_type                                          | comment                                            |
| txn_id                        | int                                                | from deserializer                                  |
| txn_amount                    | decimal(10,2)                                      | from deserializer                                  |
| txn_date                      | date                                               | from deserializer                                  |
| acc_id                        | int                                                | from deserializer                                  |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | default                                            | NULL                                               |
| OwnerType:                    | USER                                               | NULL                                               |
| Owner:                        | hive                                               | NULL                                               |
| CreateTime:                   | Thu Jun 06 13:57:31 UTC 2024                       | NULL                                               |
| LastAccessTime:               | UNKNOWN                                            | NULL                                               |
| Retention:                    | 0                                                  | NULL                                               |
| Location:                     | hdfs://adh/apps/hive/warehouse/transactions        | NULL                                               |
| Table Type:                   | MANAGED_TABLE                                      | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | bucketing_version                                  | 2                                                  |
|                               | current-schema                                     | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"txn_id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"txn_amount\",\"required\":false,\"type\":\"decimal(10, 2)\"},{\"id\":3,\"name\":\"txn_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"acc_id\",\"required\":false,\"type\":\"int\"}]} |
|                               | default-partition-spec                             | {\"spec-id\":0,\"fields\":[{\"name\":\"acc_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000}]} |
|                               | engine.hive.enabled                                | true                                               |
|                               | external.table.purge                               | TRUE                                               |
|                               | metadata_location                                  | hdfs://adh/apps/hive/warehouse/transactions/metadata/00000-b6dde9b3-d1cc-4f92-828e-5dab83b1e5e8.metadata.json |
|                               | snapshot-count                                     | 0                                                  |
|                               | storage_handler                                    | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler |
|                               | table_type                                         | ICEBERG                                            |
|                               | transient_lastDdlTime                              | 1717682251                                         |
|                               | uuid                                               | c6205d6d-ddca-4b04-a780-8b8e1048db0b               |
|                               | write.parquet.compression-codec                    | zstd                                               |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | org.apache.iceberg.mr.hive.HiveIcebergSerDe        | NULL                                               |
| InputFormat:                  | org.apache.iceberg.mr.hive.HiveIcebergInputFormat  | NULL                                               |
| OutputFormat:                 | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | 0                                                  | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
+-------------------------------+----------------------------------------------------+----------------------------------------------------

Команды DML

На момент релиза ADH 3.2.4.3 для Iceberg-таблиц в Hive доступны DML-операции SELECT и INSERT.

ПРИМЕЧАНИЕ
Для корректной работы DML-команд для таблиц Iceberg в Hive должен использоваться движок MapReduce.

Чтобы сменить движок Hive, используйте свойство hive.execution.engine в ADCM (Clusters → <clusterName> → Services → Hive → Primary Configuration → hive-site.xml). После изменения свойства необходим рестарт Hive.

SELECT

Синтаксис команды SELECT для работы с Iceberg-таблицами идентичен обычной команде SELECT в Hive. Однако под капотом Hive выполняет специфичный набор действий для получения всех преимуществ формата Iceberg. Отличия присутствуют как на этапе компиляции запроса, так и на этапе его выполнения.

Ниже приведено краткое описание алгоритма выборки данных из Iceberg-таблиц:

  1. Hive обращается к каталогу Iceberg для получения файла метаданных (metadata file) таблицы. Данный файл описывает структуру таблицы Iceberg. Каталог Iceberg возвращает указатель на файл метаданных.

  2. Hive анализирует файл метаданных и извлекает запись manifest list. В этой записи содержится информация о файлах манифеста (manifest files), которые вместе составляют единый снепшот, определяют партиции, их границы и так далее. Если таблица разбита на партиции, Hive также считывает спецификацию партиционирования (partition spec). Это позволяет выполнить определенные шаги по оптимизации уже на данном этапе.

  3. Далее Hive анализирует каждый отдельный файл манифеста из списка. В каждом таком файле содержится информация о файлах данных, где физически хранятся данные таблицы.

  4. Наконец, Hive сканирует сами файлы данных. Если таблица разбита на партиции, Hive обрабатывает только те файлы данных, которые соответствуют предложению WHERE или другим фильтрам. Основное преимущество в производительности формата Iceberg заключается именно в этой фазе — к этому моменту движок располагает достаточной информацией для сканирования только тех файлов с данными, которые релевантны указанному запросу SELECT. Такое поведение возможно благодаря тому, что данные отслеживаются на уровне файлов (а не на уровне директорий, как это реализовано в традиционной модели Hive).

INSERT

Iceberg-таблицы в Hive поддерживают классическую команду INSERT INTO.

Пример:

INSERT INTO transactions VALUES
(1, 1002, 10.00, '2023-01-01'),
(2, 1002, 20.00, '2023-01-03'),
(3, 1002, 30.00, '2023-01-02'),
(4, 1001, 100.50, '2023-01-02'),
(5, 1001, 150.50, '2023-01-04'),
(6, 1001, 200.50, '2023-01-03'),
(7, 1003, 50.00, '2023-01-03'),
(8, 1003, 50.00, '2023-01-01'),
(9, 1003, 75.00, '2023-01-04');

Также поддерживается многотабличная вставка данных, однако такие вставки не являются атомарными. Коммиты транзакций выполняются для каждой таблицы. В процессе вставки клиентские приложения могут "видеть" частичные обновления содержимого таблицы, а ошибки могут приводить к частичной записи данных. Изменения в рамках одной таблицы являются атомарной операцией.

В следующем примере показано наполнение двух Iceberg-таблиц данными из имеющейся таблицы Hive с использованием одной операции многотабличной вставки.

FROM transactions
   INSERT INTO transactions11 SELECT txn_id, txn_date
   INSERT INTO transactions22 SELECT txn_id, acc_id;

Кастомные каталоги Hive

В контексте Hive существует только один глобальный каталог данных — HiveCatalog, согласно которому данные хранятся в /apps/hive/warehouse/ директории в HDFS. Однако при использовании Iceberg-таблиц можно добавить несколько различных каталогов данных, например, Hive, Hadoop или их кастомные реализации.

Таблицы Iceberg можно создавать без привязки к определенному каталогу, указывая лишь директорию HDFS для хранения данных. Такие кросс-каталоговые таблицы могут использоваться в качестве временных или промежуточных таблиц, например, для операций JOIN.

В следующем примере создается дополнительный каталог Hive и указывается кастомная локация для хранения файлов, принадлежащих этому каталогу.

SET iceberg.catalog.my_test_hive.type=hive;
SET iceberg.catalog.my_test_hive.warehouse=hdfs:/adh/apps/hive/custom_catalog/warehouse;

Ограничения

Существуют некоторые ограничения при работе с таблицами Iceberg в Hive. Данные ограничения связаны с версией Hive 3.x, используемой в ADH. В ближайших релизах ADH с переходом на версию Hive 4.x поддержка Iceberg таблиц будет улучшена, а перечисленные ниже ограничения сняты.

  • На момент релиза ADH 3.2.4.3 доступны следующие операции DML/DDL:

    • CREATE TABLE

    • DROP TABLE

    • SELECT

    • INSERT

  • На момент релиза ADH 3.2.4.3 для создания Iceberg-таблиц в Hive используется предложение STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler', а не STORED BY ICEBERG.

  • Начиная с версии Apache Iceberg 0.11.0 при использовании движка Tez для Hive необходимо также отключить векторизацию (hive.vectorized.execution.enabled=false).

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней