Работа с таблицами Iceberg в Hive
Apache Iceberg — это открытый высокопроизводительный формат для больших аналитических таблиц. В сервисе ADH Hive данный формат может быть использован для повышения производительности таблиц Hive.
ПРИМЕЧАНИЕ
Hive поддерживает Iceberg-таблицы начиная с версии ADH 3.2.4.3.
|
Использование Iceberg-таблиц в Hive доступно "из коробки".
Поддержка формата Iceberg основана на механизме Hive StorageHandlers.
Это значит, что для создания Iceberg-таблицы в Hive необходимо указать обработчик org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
в предложении STORED BY
.
Поддержка таблиц Iceberg включена по умолчанию и может быть отключена с помощью свойства iceberg.engine.hive.enabled
(Clusters → <clusterName> → Services → Hive → Primary Configuration → hive-site.xml).
Также параметр engine.hive.enabled
можно указать программно в качестве свойства таблицы в момент ее создания.
Пример показан ниже:
Catalog catalog = ... ;
Map<String, String> tableProperties=Maps.newHashMap();
tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED,"true");
catalog.createTable(tableId,schema,spec,tableProperties);
ПРИМЕЧАНИЕ
Указание свойства для конкретной таблицы имеет приоритет над глобальной конфигурацией ADH.
|
На момент релиза ADH 3.2.4.3 поддержка Iceberg-таблиц в Hive включает в себя следующие возможности:
-
Создание таблицы.
-
Удаление таблицы.
-
Выборка данных.
-
Вставка данных.
Команды DDL
Создание таблицы Iceberg
Чтобы создать Iceberg-таблицу, используйте предложение STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
в стандартной Hive-команде CREATE TABLE
.
Пример:
CREATE TABLE transactions (txn_id int, acc_id int, txn_amount decimal(10,2), txn_date date)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
Выполните команду DESCRIBE FORMATTED
, чтобы получить информацию о только что созданной таблице:
DESCRIBE FORMATTED transactions;
+-------------------------------+----------------------------------------------------+----------------------------------------------------+ | col_name | data_type | comment | +-------------------------------+----------------------------------------------------+----------------------------------------------------+ | # col_name | data_type | comment | | id | int | from deserializer | | value | string | from deserializer | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | hive | NULL | | CreateTime: | Wed Jun 05 14:09:36 UTC 2024 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | hdfs://adh/apps/hive/warehouse/test_iceberg_table | NULL | | Table Type: | MANAGED_TABLE | NULL | | Table Parameters: | NULL | NULL | | | bucketing_version | 2 | | | current-schema | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"value\",\"required\":false,\"type\":\"string\"}]} | | | engine.hive.enabled | true | | | external.table.purge | TRUE | | | metadata_location | hdfs://adh/apps/hive/warehouse/test_iceberg_table/metadata/00000-fa65f392-3b34-4e51-915b-8e2aae261098.metadata.json | | | numFiles | 0 | | | numRows | 0 | | | rawDataSize | 0 | | | snapshot-count | 0 | | | storage_handler | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler | | | table_type | ICEBERG | | | totalSize | 0 | | | transient_lastDdlTime | 1717596576 | | | uuid | f9da6135-095e-491f-919e-f2194253b8f8 | | | write.parquet.compression-codec | zstd | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.iceberg.mr.hive.HiveIcebergSerDe | NULL | | InputFormat: | org.apache.iceberg.mr.hive.HiveIcebergInputFormat | NULL | | OutputFormat: | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | 0 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | +-------------------------------+----------------------------------------------------+----------------------------------------------------+
Поле table_type
в выводе указывает на то, что таблица была создана именно как Iceberg-таблица.
CREATE TABLE AS SELECT
Вы также можете использовать синтаксис Hive CREATE TABLE AS <tbl_name>
(CTAS) для создания таблицы Iceberg на основе уже имеющейся таблицы Hive.
В этом случае основное отличие заключается в том, что таблица Iceberg создается в самом начале выполнения запроса, в то время как данные вставляются в таблицу уже после завершения запроса.
Таким образом, в течение некоторого времени таблица может не содержать никаких данных.
CREATE TABLE transactions_ctas
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' AS
SELECT * FROM transactions;
DESCRIBE FORMATTED transactions_ctas; +-------------------------------+----------------------------------------------------+----------------------------------------------------+ | col_name | data_type | comment | +-------------------------------+----------------------------------------------------+----------------------------------------------------+ | # col_name | data_type | comment | | id | int | from deserializer | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | hive | NULL | | CreateTime: | Mon Jun 03 20:59:25 UTC 2024 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | hdfs://adh/apps/hive/warehouse/test_iceberg_table1 | NULL | | Table Type: | MANAGED_TABLE | NULL | | Table Parameters: | NULL | NULL | | | current-schema | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"}]} | | | engine.hive.enabled | true | | | external.table.purge | TRUE | | | metadata_location | hdfs://adh/apps/hive/warehouse/test_iceberg_table1/metadata/00000-df221cec-ab76-4de9-95cd-ef7605c8e79f.metadata.json | | | numFiles | 0 | | | numRows | 0 | | | rawDataSize | 0 | | | snapshot-count | 0 | | | storage_handler | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler | | | table_type | ICEBERG | | | totalSize | 0 | | | transient_lastDdlTime | 1717448365 | | | uuid | eb1933a0-43d9-4dd2-9426-cf7ace5063c8 | | | write.parquet.compression-codec | zstd | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.iceberg.mr.hive.HiveIcebergSerDe | NULL | | InputFormat: | org.apache.iceberg.mr.hive.HiveIcebergInputFormat | NULL | | OutputFormat: | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | 0 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | +-------------------------------+----------------------------------------------------+----------------------------------------------------+
CREATE TABLE LIKE
Вы также можете использовать синтаксис CREATE TABLE LIKE <tbl_name>
для создания пустой таблицы Iceberg, используя структуру существующей таблицы Hive.
Пример:
CREATE TABLE transactions_ctlt LIKE transactions
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
DESCRIBE FORMATTED transactions_ctlt; +-------------------------------+----------------------------------------------------+----------------------------------------------------+ | col_name | data_type | comment | +-------------------------------+----------------------------------------------------+----------------------------------------------------+ | # col_name | data_type | comment | | txn_id | int | from deserializer | | txn_amount | decimal(10,2) | from deserializer | | txn_date | date | from deserializer | | acc_id | int | from deserializer | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | hive | NULL | | CreateTime: | Thu Jun 06 13:57:31 UTC 2024 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | hdfs://adh/apps/hive/warehouse/transactions | NULL | | Table Type: | MANAGED_TABLE | NULL | | Table Parameters: | NULL | NULL | | | bucketing_version | 2 | | | current-schema | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"txn_id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"txn_amount\",\"required\":false,\"type\":\"decimal(10, 2)\"},{\"id\":3,\"name\":\"txn_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"acc_id\",\"required\":false,\"type\":\"int\"}]} | | | default-partition-spec | {\"spec-id\":0,\"fields\":[{\"name\":\"acc_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000}]} | | | engine.hive.enabled | true | | | external.table.purge | TRUE | | | metadata_location | hdfs://adh/apps/hive/warehouse/transactions/metadata/00000-b6dde9b3-d1cc-4f92-828e-5dab83b1e5e8.metadata.json | | | snapshot-count | 0 | | | storage_handler | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler | | | table_type | ICEBERG | | | transient_lastDdlTime | 1717682251 | | | uuid | c6205d6d-ddca-4b04-a780-8b8e1048db0b | | | write.parquet.compression-codec | zstd | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.iceberg.mr.hive.HiveIcebergSerDe | NULL | | InputFormat: | org.apache.iceberg.mr.hive.HiveIcebergInputFormat | NULL | | OutputFormat: | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | 0 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | +-------------------------------+----------------------------------------------------+----------------------------------------------------
Создание партиционированной таблицы
Чтобы создать партиционированную таблицу, используйте синтаксис, как показано в следующем примере:
CREATE TABLE transactions_partitioned (
txn_id int, txn_amount decimal(10,2), txn_date date)
PARTITIONED BY (acc_id int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
В выводе команды DESCRIBE FORMATTED
найдите свойство default-partition-spec
.
В этом поле хранится спецификация партиций в формате Iceberg.
DESCRIBE FORMATTED transactions_partitioned; +-------------------------------+----------------------------------------------------+----------------------------------------------------+ | col_name | data_type | comment | +-------------------------------+----------------------------------------------------+----------------------------------------------------+ | # col_name | data_type | comment | | txn_id | int | from deserializer | | txn_amount | decimal(10,2) | from deserializer | | txn_date | date | from deserializer | | acc_id | int | from deserializer | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | hive | NULL | | CreateTime: | Thu Jun 06 13:57:31 UTC 2024 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | hdfs://adh/apps/hive/warehouse/transactions | NULL | | Table Type: | MANAGED_TABLE | NULL | | Table Parameters: | NULL | NULL | | | bucketing_version | 2 | | | current-schema | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"txn_id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"txn_amount\",\"required\":false,\"type\":\"decimal(10, 2)\"},{\"id\":3,\"name\":\"txn_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"acc_id\",\"required\":false,\"type\":\"int\"}]} | | | default-partition-spec | {\"spec-id\":0,\"fields\":[{\"name\":\"acc_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1000}]} | | | engine.hive.enabled | true | | | external.table.purge | TRUE | | | metadata_location | hdfs://adh/apps/hive/warehouse/transactions/metadata/00000-b6dde9b3-d1cc-4f92-828e-5dab83b1e5e8.metadata.json | | | snapshot-count | 0 | | | storage_handler | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler | | | table_type | ICEBERG | | | transient_lastDdlTime | 1717682251 | | | uuid | c6205d6d-ddca-4b04-a780-8b8e1048db0b | | | write.parquet.compression-codec | zstd | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.iceberg.mr.hive.HiveIcebergSerDe | NULL | | InputFormat: | org.apache.iceberg.mr.hive.HiveIcebergInputFormat | NULL | | OutputFormat: | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | 0 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | +-------------------------------+----------------------------------------------------+----------------------------------------------------
Команды DML
На момент релиза ADH 3.2.4.3 для Iceberg-таблиц в Hive доступны DML-операции SELECT
и INSERT
.
ПРИМЕЧАНИЕ
Для корректной работы DML-команд для таблиц Iceberg в Hive должен использоваться движок MapReduce.
|
Чтобы сменить движок Hive, используйте свойство hive.execution.engine
в ADCM (Clusters → <clusterName> → Services → Hive → Primary Configuration → hive-site.xml).
После изменения свойства необходим рестарт Hive.
SELECT
Синтаксис команды SELECT
для работы с Iceberg-таблицами идентичен обычной команде SELECT
в Hive.
Однако под капотом Hive выполняет специфичный набор действий для получения всех преимуществ формата Iceberg.
Отличия присутствуют как на этапе компиляции запроса, так и на этапе его выполнения.
Ниже приведено краткое описание алгоритма выборки данных из Iceberg-таблиц:
-
Hive обращается к каталогу Iceberg для получения файла метаданных (metadata file) таблицы. Данный файл описывает структуру таблицы Iceberg. Каталог Iceberg возвращает указатель на файл метаданных.
-
Hive анализирует файл метаданных и извлекает запись manifest list. В этой записи содержится информация о файлах манифеста (manifest files), которые вместе составляют единый снепшот, определяют партиции, их границы и так далее. Если таблица разбита на партиции, Hive также считывает спецификацию партиционирования (partition spec). Это позволяет выполнить определенные шаги по оптимизации уже на данном этапе.
-
Далее Hive анализирует каждый отдельный файл манифеста из списка. В каждом таком файле содержится информация о файлах данных, где физически хранятся данные таблицы.
-
Наконец, Hive сканирует сами файлы данных. Если таблица разбита на партиции, Hive обрабатывает только те файлы данных, которые соответствуют предложению
WHERE
или другим фильтрам. Основное преимущество в производительности формата Iceberg заключается именно в этой фазе — к этому моменту движок располагает достаточной информацией для сканирования только тех файлов с данными, которые релевантны указанному запросуSELECT
. Такое поведение возможно благодаря тому, что данные отслеживаются на уровне файлов (а не на уровне директорий, как это реализовано в традиционной модели Hive).
INSERT
Iceberg-таблицы в Hive поддерживают классическую команду INSERT INTO
.
Пример:
INSERT INTO transactions VALUES
(1, 1002, 10.00, '2023-01-01'),
(2, 1002, 20.00, '2023-01-03'),
(3, 1002, 30.00, '2023-01-02'),
(4, 1001, 100.50, '2023-01-02'),
(5, 1001, 150.50, '2023-01-04'),
(6, 1001, 200.50, '2023-01-03'),
(7, 1003, 50.00, '2023-01-03'),
(8, 1003, 50.00, '2023-01-01'),
(9, 1003, 75.00, '2023-01-04');
Также поддерживается многотабличная вставка данных, однако такие вставки не являются атомарными. Коммиты транзакций выполняются для каждой таблицы. В процессе вставки клиентские приложения могут "видеть" частичные обновления содержимого таблицы, а ошибки могут приводить к частичной записи данных. Изменения в рамках одной таблицы являются атомарной операцией.
В следующем примере показано наполнение двух Iceberg-таблиц данными из имеющейся таблицы Hive с использованием одной операции многотабличной вставки.
FROM transactions
INSERT INTO transactions11 SELECT txn_id, txn_date
INSERT INTO transactions22 SELECT txn_id, acc_id;
Кастомные каталоги Hive
В контексте Hive существует только один глобальный каталог данных — HiveCatalog
, согласно которому данные хранятся в /apps/hive/warehouse/ директории в HDFS.
Однако при использовании Iceberg-таблиц можно добавить несколько различных каталогов данных, например, Hive
, Hadoop
или их кастомные реализации.
Таблицы Iceberg можно создавать без привязки к определенному каталогу, указывая лишь директорию HDFS для хранения данных. Такие кросс-каталоговые таблицы могут использоваться в качестве временных или промежуточных таблиц, например, для операций JOIN.
В следующем примере создается дополнительный каталог Hive и указывается кастомная локация для хранения файлов, принадлежащих этому каталогу.
SET iceberg.catalog.my_test_hive.type=hive;
SET iceberg.catalog.my_test_hive.warehouse=hdfs:/adh/apps/hive/custom_catalog/warehouse;
Ограничения
Существуют некоторые ограничения при работе с таблицами Iceberg в Hive. Данные ограничения связаны с версией Hive 3.x, используемой в ADH. В ближайших релизах ADH с переходом на версию Hive 4.x поддержка Iceberg таблиц будет улучшена, а перечисленные ниже ограничения сняты.
-
На момент релиза ADH 3.2.4.3 доступны следующие операции DML/DDL:
-
CREATE TABLE
-
DROP TABLE
-
SELECT
-
INSERT
-
-
На момент релиза ADH 3.2.4.3 для создания Iceberg-таблиц в Hive используется предложение
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
, а неSTORED BY ICEBERG
. -
Начиная с версии Apache Iceberg 0.11.0 при использовании движка Tez для Hive необходимо также отключить векторизацию (
hive.vectorized.execution.enabled=false
).