Работа с таблицами Iceberg в Spark
Apache Iceberg — это открытый, высокопроизводительный формат для больших аналитических таблиц. Сервис ADH Spark3 поддерживает данный формат, позволяя вам работать с таблицами Iceberg через Spark.
ПРИМЕЧАНИЕ
Spark3 поддерживает Iceberg-таблицы начиная с версии ADH 3.2.4.3.
|
В сервисе Spark3 поддержка Iceberg-таблиц включена по умолчанию. В этой статье для демонстрации примеров работы с Iceberg-таблицами используется Spark SQL — модуль, позволяющий выполнять традиционные ANSI SQL-команды на объектах Spark DataFrame.
Вы можете выполнять запросы Spark SQL, используя оболочку spark3-sql. Для запуска spark3-sql выполните следующую команду на хосте, где установлен компонент Spark3 Client.
$ spark3-sql
В данной статье все SQL-примеры предназначены для запуска в оболочке spark3-sql.
Каталоги Spark
Архитектура Iceberg добавляет в Spark API понятие каталогов (catalogs), которые позволяют Spark обращаться к Iceberg-таблицам по имени.
Каталог Iceberg — это именованное хранилище для таблиц и их метаданных, разделенное на пространства имен.
Имена каталогов и пространства имен используются в SQL-запросах для идентификации таблиц, принадлежащих к разным каталогам, аналогично нотации SELECT … FROM <database_name>.<table_name>
.
Для доступа к конкретной Iceberg-таблице используется следующий синтаксис:
SELECT ... FROM <catalog_name>.<namespace>.<table_name> ...
Iceberg предоставляет несколько бэкендов каталогов, таких как REST, Hive, JDBC и прочих. Ниже показан пример создания Iceberg-каталога типа Hive, который загружает таблицы из Hive Metastore:
$ spark3-sql
--conf spark.sql.catalog.test_catalog_hive=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.test_catalog_hive.type=hive
--conf spark.sql.catalog.test_catalog_hive.warehouse=hdfs://ka-adh-1.ru-central1.internal:8020/user/admin/test_warehouse
Для обращения к таблицам внутри такого каталога необходимо добавить к именам таблиц префикс test_catalog_hive
.
ПРИМЕЧАНИЕ
Параметры конфигурации каталогов, указанные с помощью --conf в примере выше, также можно указать в ADCM (Clusters → <clusterName> → Services → Spark3 → Primary Configuration → Custom spark-defaults.conf).
|
Iceberg предоставляет 2 имплементации каталогов:
-
org.apache.iceberg.spark.SparkCatalog
. Позволяет работать только с Iceberg-таблицами, используя Hive Metastore или warehouse-директорию в Hadoop. -
org.apache.iceberg.spark.SparkSessionCatalog
. Добавляет поддержку Iceberg-таблиц к функционалу встроенного Spark-каталога. При работе с обычными таблицами (не Iceberg),SparkSessionCatalog
делегирует выполнение встроенному каталогу Spark. В сервисе Spark3 данная реализация используется по умолчанию, позволяя работать как с Iceberg-таблицами, так и с обычными (не Iceberg) таблицами.
В сервисе Spark3 каталог по умолчанию называется spark-catalog
.
Данный каталог поддерживает одноуровневые пространства имен, которые соответствуют именам баз данных Hive.
Примеры запросов в данной статье предназначены для выполнения в дефолтном каталоге spark-catalog
с пространством имен default
.
Параметры конфигурации каталогов
Каталоги Iceberg можно настроить с помощью конфигурационных свойств spark.sql.catalog.<catalog-name>.*
.
Общие параметры конфигурации для Hive и Hadoop представлены в таблице ниже.
Свойство | Описание |
---|---|
spark.sql.catalog.<catalog-name>.type |
Определяет реализацию Iceberg-каталога. Доступны следующие значения
|
spark.sql.catalog.<catalog-name>.catalog-impl |
Кастомная реализация каталога Iceberg.
Если не задано значение |
spark.sql.catalog.<catalog-name>.io-impl |
Пользовательская реализация FileIO |
spark.sql.catalog.<catalog-name>.metrics-reporter-impl |
Пользовательская реализация MetricsReporter |
spark.sql.catalog.<catalog-name>.default-namespace |
Пространство имен, используемое по умолчанию для каталога.
Значение по умолчанию: |
spark.sql.catalog.<catalog-name>.uri |
Hive Metastore URL для каталогов Hive (thrift://host:port) либо REST URL для каталогов типа REST |
spark.sql.catalog.<catalog-name>.warehouse |
Базовый путь к warehouse-директории. Например, hdfs://nn:8020/warehouse/path |
spark.sql.catalog.<catalog-name>.cache-enabled |
Включает или отключает кеширование для каталога.
По умолчанию используется |
spark.sql.catalog.<catalog-name>.cache.expiration-interval-ms |
Время, по истечении которого кешированные записи каталога будут удалены.
Данный параметр актуален, если |
spark.sql.catalog.<catalog-name>.table-default.propertyKey |
Устанавливает значение по умолчанию для свойства таблицы Iceberg с ключом |
spark.sql.catalog.<catalog-name>.table-override.propertyKey |
Устанавливает значение для свойства таблицы Iceberg с ключом |
Дополнительная информация о создании и настройке каталогов доступна в документации Iceberg.
Команды DDL
Создание таблицы Iceberg
Для создания Iceberg-таблицы добавьте предложение USING iceberg
к стандартному оператору CREATE TABLE
.
Пример:
CREATE TABLE spark_catalog.default.transactions(
txn_id int,
acc_id int,
txn_value double,
txn_date date)
USING iceberg;
Выполните команду DESCRIBE FORMATTED
, чтобы увидеть информацию о только что созданной таблице:
DESCRIBE FORMATTED spark_catalog.default.transactions;
Пример вывода:
txn_id int acc_id int txn_value double txn_date date # Metadata Columns _spec_id int _partition struct<> _file string _pos bigint _deleted boolean # Detailed Table Information Name spark_catalog.default.transactions Type MANAGED Location hdfs://adh/apps/hive/warehouse/transactions Provider iceberg Owner hdfs Table Properties [current-snapshot-id=none,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
Значение Provider
в выводе указывает на то, что таблица была создана как Iceberg-таблица.
Создание партиционированной таблицы
Чтобы создать таблицу с партициями, используйте синтаксис, как показано в следующем примере:
CREATE TABLE spark_catalog.default.transactions_partitioned (
txn_id bigint,
acc_id int,
txn_value double,
txn_date date)
USING iceberg
PARTITIONED BY (acc_id);
Вы также можете использовать выражения преобразования (transform expressions) в предложении PARTITIONED BY
для создания скрытых партиций вместо явного указания партиции.
Пример приведен ниже.
CREATE TABLE spark_catalog.default.transactions_partitioned_transform (
id bigint,
acc_id int,
txn_value double,
txn_date timestamp)
USING iceberg
PARTITIONED BY (day(txn_date)); (1)
1 | day(txn_date) — выражение преобразования для создания партиций по дням. |
Чтобы проверить разбиение таблицы на партиции, запишите в таблицу тестовые данные с помощью следующей команды:
INSERT INTO spark_catalog.default.transactions_partitioned_transform VALUES
(1, 1002, 10.00, cast('2023-01-01' as timestamp)),
(2, 1001, 20.00, cast('2023-01-02' as timestamp));
Затем проверьте содержимое warehouse-директории в HDFS:
$ hdfs dfs -ls -R /apps/hive/warehouse/transactions_partitioned_transform
Пример вывода:
drwxrwxr-x - hdfs hadoop 0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data drwxrwxr-x - hdfs hadoop 0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-01 -rw-r--r-- 3 hdfs hadoop 1206 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-01/00000-12-a029426f-ca10-4efc-b87c-389694c218bc-0-00001.parquet drwxrwxr-x - hdfs hadoop 0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-02 -rw-r--r-- 3 hdfs hadoop 1206 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-02/00000-12-a029426f-ca10-4efc-b87c-389694c218bc-0-00002.parquet drwxrwxr-x - hdfs hadoop 0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata -rw-r--r-- 3 hdfs hadoop 1381 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/00000-28c5f350-f14a-4fcb-b509-10a327fa19cb.metadata.json -rw-r--r-- 3 hdfs hadoop 2519 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/00001-7c83b7d2-0a43-463e-b180-6589a0f696af.metadata.json -rw-r--r-- 3 hdfs hadoop 7157 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/28884384-0539-4ca5-8dcb-95d00de4419e-m0.avro -rw-r--r-- 3 hdfs hadoop 4264 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/snap-2743337418634934372-1-28884384-0539-4ca5-8dcb-95d00de4419e.avro
Согласно выводу, тестовые данные были распределены по нескольким HDFS-директориям (партициям) с именами txn_date_day={date}, каждая из которых представляет отдельный день.
CREATE TABLE: CTAS/RTAS
Вы можете использовать синтаксис CREATE TABLE AS <tbl_name>
(CTAS) и REPLACE TABLE AS <tbl_name>
(RTAS) для создания/перезаписи таблицы Iceberg на основе уже существующей таблицы.
Примеры приведены ниже.
CREATE TABLE spark_catalog.default.transactions_ctas USING iceberg
AS SELECT txn_id, txn_value
FROM spark_catalog.default.transactions;
REPLACE TABLE spark_catalog.default.transactions_rtas USING iceberg
AS SELECT txn_id, txn_value
FROM spark_catalog.default.transactions
WHERE txn_value < 50;
Операции создания/перезаписи таблицы с использованием синтаксиса CTAS/RTAS являются атомарными при использовании реализации каталога SparkCatalog
.
При использовании SparkSessionCatalog
(используется по умолчанию в Spark3) атомарность операций CTAS/RTAS не гарантируется.
DROP TABLE
Для удаления Iceberg-таблицы используйте стандартный синтаксис:
DROP TABLE <table_name> [PURGE]
Необязательный флаг PURGE
определяет, нужно ли удалять содержимое таблицы.
Если PURGE
не установлен, из каталога удаляются только метаданные таблицы.
ALTER TABLE
Сервис Spark3 предоставляет богатую поддержку операций ALTER TABLE
для Iceberg-таблиц.
Ниже показаны примеры основных операций ALTER TABLE
.
Более подробную информацию о командах ALTER TABLE
для Iceberg-таблиц можно найти в документации Iceberg.
Следующая команда переименовывает таблицу Iceberg.
Команда изменяет только метаданные таблицы, не затрагивая данные таблицы.
ALTER TABLE spark_catalog.default.transactions
RENAME TO spark_catalog.default.transactions_new;
Следующий пример устанавливает формат записи ORC, используя параметры таблиц Iceberg.
ALTER TABLE spark_catalog.default.transactions
SET TBLPROPERTIES ('write.format.default'='orc');
РЕКОМЕНДАЦИЯ
Чтобы очистить свойство таблицы, используйте команду UNSET .
|
Вы можете добавить столбец в существующую таблицу Iceberg, используя предложение ADD COLUMN
.
Пример:
ALTER TABLE spark_catalog.default.transactions
ADD COLUMN (
description string comment "A transaction's description"
);
Вывод команды DESCRIBE
после выполнения команды:
DESCRIBE spark_catalog.default.transactions; txn_id int acc_id int txn_value double txn_date date description string A transaction's description
Чтобы добавить несколько столбцов одной командой, используйте предложение ADD COLUMNS
, разделив столбцы запятой.
Пример:
ALTER TABLE spark_catalog.default.transactions
ADD COLUMNS (
is_committed boolean,
is_acknowledged boolean
);
Вывод команды DESCRIBE
после выполнения команды:
DESCRIBE spark_catalog.default.transactions; txn_id int acc_id int txn_value double txn_date date description string A transaction's description is_committed boolean is_acknowledged boolean
Для добавления столбца с композитным типом данных struct, например struct<x, y>
, используйте следующий синтаксис:
ALTER TABLE spark_catalog.default.transactions
ADD COLUMN processor_metadata struct<x: double, y: double>;
Больше информации об использовании составных типов данных (массивы, карты и т. д.) доступно в документации Iceberg.
Iceberg позволяет изменять определение столбцов.
Например, вы можете изменить тип данных столбца, сделать столбец nullable, добавить комментарий и так далее.
Iceberg позволяет расширить тип данных столбца, если расширение безопасно, например:
-
int
→bigint
-
float
→double
-
decimal(i,j)
→decimal(i1,j)
приi1
>i
ALTER TABLE spark_catalog.default.transactions
ALTER COLUMN txn_id TYPE bigint;
Вывод команды DESCRIBE
после выполнения команды:
DESCRIBE spark_catalog.default.transactions; txn_id bigint acc_id int txn_value double txn_date date description string A transaction's description is_committed boolean is_acknowledged boolean processor_metadata struct<x:double,y:double>
Чтобы сделать поле nullable, используйте следующий синтаксис.
ALTER TABLE spark_catalog.default.transactions
ALTER COLUMN acc_id DROP NOT NULL;
Обратное изменение nullable-столбца на non-nullable недопустимо, поскольку Iceberg не располагает информацией о существующих записях со значениями NULL
.
Iceberg поддерживает эволюцию схемы партиционирования, позволяя изменять партиции в существующих таблицах.
Изменение схемы партиционирования является операцией над метаданными, которая не затрагивает существующие данные таблицы.
После изменения схемы новые данные будут записаны в соответствии с обновленной схемой партиционирования, однако уже существующие данные будут соответствовать старой схеме.
В старых файлах данных будут содержаться значения NULL
для новых полей партиций.
Пример:
ALTER TABLE spark_catalog.default.transactions_partitioned
ADD PARTITION FIELD txn_date;
Выполните команду DESCRIBE
, чтобы убедиться, что схема партиционирования таблицы была изменена.
Пример вывода показан ниже.
DESCRIBE spark_catalog.default.transactions_partitioned; txn_id bigint acc_id int txn_value decimal(10,2) txn_date date # Partition Information # col_name data_type comment acc_id int txn_date date
ВНИМАНИЕ
Изменение партиций в Iceberg-таблицах следует выполнять с осторожностью.
Некоторые изменения могут привести к неожиданным результатам и к повреждению метаданных таблицы.
Больше подробной информации доступно в разделах ADD PARTITION, DROP PARTITION и REPLACE PARTITION.
|
Вы можете указать порядок сортировки для таблицы Iceberg, который используется для автоматической сортировки данных, записываемых в эту таблицу.
Например, операция Spark MERGE INTO использует порядок сортировки таблицы.
ALTER TABLE spark_catalog.default.transactions
WRITE ORDERED BY txn_id;
ПРИМЕЧАНИЕ
Порядок записи в таблицу не гарантирует порядок данных при выборке.
Он влияет только на то, как данные записываются в таблицу.
|
Чтобы отменить порядок сортировки таблицы, используйте ключевое слово UNORDERED
, как показано в примере ниже.
ALTER TABLE spark_catalog.default.transactions
WRITE UNORDERED;
Вы можете создавать ветки (branches) для Iceberg-таблиц с помощью оператора CREATE BRANCH
.
Ниже приведены несколько примеров.
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH `test-branch`; (1)
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH IF NOT EXISTS `test-branch` RETAIN 7 DAYS; (2)
ALTER TABLE spark_catalog.default.transactions CREATE OR REPLACE BRANCH `test-branch`; (3)
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH `test-branch` AS OF VERSION 123 (4)
1 | Создание ветки с политикой хранения по умолчанию. |
2 | Создание ветки, если таковой не существует, и установка времени хранения 7 дней. |
3 | Создание или замена ветки, если ветка уже существует. |
4 | Создание ветки из снепшота 123 с использованием политики хранения по умолчанию.
Снепшот 123 должен быть создан до выполнения этой команды. |
Для замены и удаления веток используйте команды REPLACE BRANCH и DROP BRANCH.
Вы можете создавать теги (tags) для "маркировки" состояния таблицы и возврата к этому состоянию таблицы в будущем.
Ниже приведены несколько примеров.
ALTER TABLE spark_catalog.default.transactions CREATE TAG `test-tag`; (1)
ALTER TABLE spark_catalog.default.transactions CREATE TAG IF NOT EXISTS `test-tag` RETAIN 365 DAYS; (2)
ALTER TABLE spark_catalog.default.transactions CREATE OR REPLACE TAG `test-tag`; (3)
ALTER TABLE spark_catalog.default.transactions CREATE TAG `test-tag` AS OF VERSION 123 (4)
1 | Создание тега с политикой хранения по умолчанию. |
2 | Создание тега (если такого тега не существует) из текущего снепшота, и установка времени хранения 365 дней. |
3 | Создание или замена тега, если такой тег уже существует. |
4 | Создание тега из снепшота 123 с использованием политики хранения по умолчанию.
Снепшот 123 должен быть создан до выполнения этой команды. |
Запрос данных из таблицы с помощью тега test-tag
выглядит следующим образом:
SELECT * FROM spark_catalog.default.transactions
VERSION AS OF 'test-tag';
Для замены и удаления тегов используйте команды REPLACE TAG и DROP TAG.
Выборка данных
Чтобы запросить данные из таблицы Iceberg с помощью Spark SQL, необходимо указать имя каталога, пространство имен и имя таблицы.
Пример:
SELECT * FROM spark_catalog.default.transactions;
Где:
-
spark_catalog
— имя каталога по умолчанию. -
default
— имя пространства имен по умолчанию. -
transactions
— имя запрашиваемой таблицы.
Чтобы загрузить таблицу в виде DataFrame, используйте метод table()
.
Пример PySpark:
df = spark.table("spark_catalog.default.transactions")
Time travel
Iceberg поддерживают функцию time travel, которая позволяет запрашивать данные из определенного снепшота таблицы, созданного в прошлом, используя идентификатор снепшота или временную метку.
РЕКОМЕНДАЦИЯ
Информация о всех снепшотах, доступных для определенной таблицы, хранится в таблице метаданных snapshots .
|
В следующем сценарии показано, как выполнять time-travel запросы с помощью Spark.
-
Создайте тестовую таблицу с помощью spark3-sql:
CREATE TABLE spark_catalog.default.transactions_tt( txn_id int, acc_id int, txn_value double, txn_date date) USING iceberg;
-
Запишите тестовые данные в таблицу:
INSERT INTO spark_catalog.default.transactions_tt VALUES (1, 1002, 100.00, cast('2023-01-01' as date)), (2, 1001, 50.00, cast('2023-01-02' as date));
-
Выполните запрос к таблице метаданных
snapshot
:SELECT * FROM spark_catalog.default.transactions_tt.snapshots;
Вывод команды имеет следующий вид:
2024-06-27 14:54:05.664 6216997363030961663 NULL append hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-6216997363030961663-1-f55143c4-0351-4f6a-8978-f0c2fa5666b7.avro {"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"}
Вывод содержит информацию о единственном доступном снепшоте (
6216997363030961663
). Данный снепшот был создан сразу после выполнения операцииINSERT
. -
Добавьте в таблицу дополнительные тестовые данные:
INSERT INTO spark_catalog.default.transactions_tt VALUES (3, 1003, 150.00, cast('2024-01-03' as date));
-
Выполните запрос к таблице метаданных
snapshot
еще раз:SELECT * FROM spark_catalog.default.transactions_tt.snapshots;
Результат:
2024-06-27 14:54:05.664 6216997363030961663 NULL append hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-6216997363030961663-1-f55143c4-0351-4f6a-8978-f0c2fa5666b7.avro {"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"} 2024-06-27 14:56:20.781 3775233323500475562 6216997363030961663 append hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-3775233323500475562-1-6500b975-c67e-4171-8c5d-70623933abc3.avro {"added-data-files":"1","added-files-size":"1126","added-records":"1","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"3380","total-position-deletes":"0","total-records":"3"}
Теперь в выводе отображается информация о еще одном снепшоте. Данный снепшот был создан второй операцией
INSERT
. -
Выполните time-travel запрос, используя идентификатор или временную метку более старого снепшота. Для этого вы можете использовать предложения
TIMESTAMP AS OF <timestamp>
илиVERSION AS OF <id>
. ПредложениеVERSION AS OF
может принимать идентификатор снепшота, имя ветки или имя тега.SELECT * FROM spark_catalog.default.transactions_tt VERSION AS OF 6216997363030961663; (1) SELECT * FROM spark_catalog.default.transactions_tt TIMESTAMP AS OF '2024-06-27 14:55:20.780'; (2)
1 Выборка данных с использованием идентификатора снепшота. 2 Выборка данных по временной метке. В этом примере значение временной метки подобрано таким образом, чтобы выборка осуществлялась из более старого снепшота. Вывод:
1 1002 100.0 2023-01-01 2 1001 50.0 2023-01-02
В выводе содержатся только те данные, которые присутствовали в таблице после первой операции
INSERT
. -
Выполните обычный запрос
SELECT
:SELECT * FROM spark_catalog.default.transactions_tt;
Вывод:
1 1002 100.0 2023-01-01 2 1001 50.0 2023-01-02 3 1003 150.0 2024-01-03
В отличие от предыдущих запросов с использованием time-travel, вывод содержит все данные таблицы.
Таблицы метаданных
Iceberg хранит несколько таблиц с метаданными, например history
, files
, snapshots
и так далее.
В этих таблицах хранится ценная информация о снепшотах, файлах, операциях и т. д., которая может быть полезна для построения запросов, связанных с time-travel, ветками и тегами.
Вы можете просматривать таблицы метаданных, добавляя имя таблицы метаданных после имени исходной таблицы.
Пример:
SELECT * FROM spark_catalog.default.transactions.snapshots;
Пример вывода:
2024-06-21 14:30:36.001 7505187482383765765 NULL append hdfs://adh/apps/hive/warehouse/transactions/metadata/snap-7505187482383765765-1-be783958-0d56-4b2c-b002-54cdae7a349d.avro {"changed-partition-count":"0","total-data-files":"0","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"0","total-position-deletes":"0","total-records":"0"} 2024-06-21 16:09:08.645 5529064852849657419 NULL append hdfs://adh/apps/hive/warehouse/transactions/metadata/snap-5529064852849657419-1-af755165-d1e2-476e-85a9-40ea179c2e9f.avro {"added-data-files":"2","added-files-size":"2522","added-records":"9","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2522","total-position-deletes":"0","total-records":"9"}
Запись данных
Вы можете записывать данные в таблицы Iceberg, используя традиционный SQL-синтаксис, например INSERT INTO, INSERT OVERWRITE, UPDATE.
Кроме того, Iceberg поддерживает операции на уровне строк (row-level operations), такие как MERGE INTO
и DELETE FROM
, примеры использования которых показаны далее в статье.
При записи в таблицу Iceberg типы данных Spark преобразуются в типы Iceberg в соответствии с таблицей преобразования типов Spark.
Подробная информация о поддерживаемых операциях записи доступна в документации Iceberg.
Команда MERGE INTO
выполняет обновление данных на уровне строк.
Эта команда позволяет изменять содержимое целевой таблицы (target table), используя обновления в исходной таблице (source table).
Условие обновления определяется предложением ON
, которое используется аналогично условию JOIN
.
Синтаксис команды MERGE INTO
показан ниже:
MERGE INTO <target_table>
USING <source_table>
ON <merge_condition>
WHEN MATCHED <matched_condition> THEN <matched_action> |
WHEN NOT MATCHED THEN <not_matched_action>
Обновления строк в целевой таблице определяются с помощью предложения WHEN [NOT] MATCHED <matched_condition> THEN <matched_action>
.
Строки в целевой таблице можно обновлять и удалять, а те строки исходной таблицы, которые не совпадают с целевой, можно добавлять в целевую таблицу.
Рассмотрим следующий пример, демонстрирующий использование MERGE INTO
.
Предположим, что есть две таблицы Iceberg со следующим содержимым:
spark_catalog.default.transactions | spark_catalog.default.transactions_copy |
---|---|
txn_id acc_id txn_value txn_date 1 1001 25.0 2023-02-03 2 1001 50.0 2023-03-03 3 1002 10.0 2023-04-01 |
txn_id acc_id txn_value txn_date 1 1003 20.0 2023-02-03 3 1001 50.0 2023-03-03 5 1002 100.0 2023-04-02 |
Следующая команда MERGE INTO
удаляет все строки в таблице spark_catalog.default.transactions_copy
, которые имеют совпадение в таблице spark_catalog.default.transactions
по полю txn_id
.
MERGE INTO spark_catalog.default.transactions_copy USING spark_catalog.default.transactions
ON spark_catalog.default.transactions_copy.txn_id = spark_catalog.default.transactions.txn_id
WHEN MATCHED THEN DELETE;
После выполнения этой команды содержимое таблиц имеет следующий вид.
spark_catalog.default.transactions | spark_catalog.default.transactions_copy |
---|---|
txn_id acc_id txn_value txn_date 1 1001 25.0 2023-02-03 2 1001 50.0 2023-03-03 3 1002 10.0 2023-04-01 |
txn_id acc_id txn_value txn_date 5 1002 100.0 2023-04-02 |
Следующая команда MERGE INTO
обновляет все строки в таблице spark_catalog.default.transactions
, если поле acc_id
совпадает с acc_id
таблицы spark_catalog.default.transactions_copy
.
Для строк с совпадающими значениями acc_id
команда устанавливает значение txn_value=0
.
MERGE INTO spark_catalog.default.transactions USING spark_catalog.default.transactions_copy
ON spark_catalog.default.transactions.acc_id = spark_catalog.default.transactions_copy.acc_id
WHEN MATCHED THEN UPDATE SET txn_value=0;
После выполнения этой команды содержимое таблиц имеет следующий вид.
spark_catalog.default.transactions | spark_catalog.default.transactions_copy |
---|---|
txn_id acc_id txn_value txn_date 1 1001 25.0 2023-02-03 2 1001 50.0 2023-03-03 3 1002 10.0 2023-04-01 |
txn_id acc_id txn_value txn_date 5 1002 0.0 2023-04-02 |
Операция DELETE FROM
позволяет удалять данные из таблиц Iceberg на уровне строк.
В запросах на удаление указывается фильтр для поиска удаляемых строк, как показано ниже.
Если фильтр удаления соответствует всему разделу таблицы, Iceberg изменит только метаданные таблицы.
Если фильтр соответствует отдельным строкам, Iceberg перезапишет только релевантные файлы данных.
Примеры:
DELETE FROM spark_catalog.default.transactions
WHERE txn_date > '2023-03-03';
DELETE FROM spark_catalog.default.transactions
WHERE txn_value < (SELECT min(txn_value) FROM spark_catalog.default.transactions_copy);
Запись данных в ветку
Чтобы записать данные в определенную ветку (branch) Iceberg-таблицы, необходимо указать идентификатор ветки в запросе.
ПРИМЕЧАНИЕ
Перед выполнением записи целевая ветка должна быть создана.
|
Например, чтобы записать данные в ветку с именем testbranch
, используйте следующий синтаксис:
INSERT INTO spark_catalog.default.transactions.branch_testbranch VALUES
(1, 1003, 20.00, cast('2023-02-03' as date));
Если имя ветки содержит не только алфавитно-цифровые символы (например, test-branch
), имя ветки должно быть заключено в обратные кавычки, как показано в примере ниже.
INSERT INTO spark_catalog.default.transactions.`branch_test-branch` VALUES
(1, 1003, 20.00, cast('2023-02-03' as date));
Чтобы получить данные из созданной выше ветки testbranch
, используйте команду:
SELECT * FROM spark_catalog.default.transactions
VERSION AS OF 'testbranch';
Помимо записи данных в ветку с помощью INSERT
, вы также можете использовать команды UPDATE
, DELETE
и MERGE INTO
.
Примеры:
UPDATE spark_catalog.default.transactions.branch_testbranch
SET acc_id = 1005
WHERE acc_id = 1003
DELETE FROM spark_catalog.default.transactions.branch_testbranch
WHERE txn_id = 1;
MERGE INTO spark_catalog.default.transactions.branch_testbranch USING spark_catalog.default.transactions_copy
ON spark_catalog.default.transactions.branch_testbranch.txn_id = spark_catalog.default.transactions_copy.txn_id
WHEN MATCHED THEN DELETE;
Roll back
Iceberg предоставляет функцию rollback, которая позволяет откатить таблицу к более раннему состоянию. Данная функция может быть полезна, например, для отмены случайных или ошибочных изменений в таблицах. Данные таблицы можно откатить до нужного состояния, пока целевой снепшот доступен в метаданных Iceberg. Следующий пример демонстрирует базовое использование rollback-функции.
-
Создайте тестовую таблицу Iceberg со следующим содержимым:
txn_id acc_id txn_value txn_date 1 1002 10.0 2024-01-01 2 1001 20.0 2024-01-02
Для этого выполните запросы в spark3-sql:
CREATE TABLE spark_catalog.default.transactions_rollback_demo ( txn_id int, acc_id int, txn_value double, txn_date date) USING iceberg;
INSERT INTO spark_catalog.default.transactions_rollback_demo VALUES (1, 1002, 10.00, cast('2024-01-01' as timestamp)), (2, 1001, 20.00, cast('2024-01-02' as timestamp));
-
Удалите одну строку из тестовой таблицы. На следующих шагах будет выполнен откат данной операции.
DELETE FROM spark_catalog.default.transactions_rollback_demo WHERE txn_id = 2;
-
Запросите информацию о снепшотах таблицы. Для этого выполните запрос к базе метаданных, как показано ниже:
SELECT * FROM spark_catalog.default.transactions_rollback_demo.snapshots;
Пример вывода:
2024-07-17 07:59:56.31 6142059163774284478 NULL append hdfs://adh/apps/hive/warehouse/transactions_rollback_demo/metadata/snap-6142059163774284478-1-c2c3b442-648b-4e94-8b14-594ec65f66c5.avro{"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1720628013974_0014","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"} 2024-07-17 08:02:55.859 8329113954968361656 6142059163774284478 delete hdfs://adh/apps/hive/warehouse/transactions_rollback_demo/metadata/snap-8329113954968361656-1-375044c0-90fa-44f5-a6c6-72ebc12cc473.avro {"changed-partition-count":"1","deleted-data-files":"1","deleted-records":"1","removed-files-size":"1127","spark.app.id":"application_1720628013974_0015","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"1127","total-position-deletes":"0","total-records":"1"}
Вывод показывает, что для тестовой таблицы доступны два снепшота. Первый был создан сразу после вставки данных, а второй — после операции удаления.
-
Откатите содержимое таблицы до состояния, которое предшествовало операции удаления:
CALL spark_catalog.system.rollback_to_snapshot('spark_catalog.default.transactions_rollback_demo', <snapshot-id>);
Где
<snapshot-id>
— ID снепшота, который был создан до выполнения операции DELETE.РЕКОМЕНДАЦИЯВместо ID можно также указать временную метку для отката к нужному снепшоту. -
Запросите данные из таблицы, чтобы проверить результат выполнения rollback-функции.
SELECT * FROM spark_catalog.default.transactions_rollback_demo; 1 1002 10.0 2023-01-01 2 1001 20.0 2023-01-02
Вывод показывает, что удаленная строка была восстановлена.