Работа с таблицами Iceberg в Spark

Apache Iceberg — это открытый, высокопроизводительный формат для больших аналитических таблиц. Сервис ADH Spark3 поддерживает данный формат, позволяя вам работать с таблицами Iceberg через Spark.

ПРИМЕЧАНИЕ
Spark3 поддерживает Iceberg-таблицы начиная с версии ADH 3.2.4.3.

В сервисе Spark3 поддержка Iceberg-таблиц включена по умолчанию. В этой статье для демонстрации примеров работы с Iceberg-таблицами используется Spark SQL — модуль, позволяющий выполнять традиционные ANSI SQL-команды на объектах Spark DataFrame.

Вы можете выполнять запросы Spark SQL, используя оболочку spark3-sql. Для запуска spark3-sql выполните следующую команду на хосте, где установлен компонент Spark3 Client.

$ spark3-sql

В данной статье все SQL-примеры предназначены для запуска в оболочке spark3-sql.

Каталоги Spark

Архитектура Iceberg добавляет в Spark API понятие каталогов (catalogs), которые позволяют Spark обращаться к Iceberg-таблицам по имени. Каталог Iceberg — это именованное хранилище для таблиц и их метаданных, разделенное на пространства имен. Имена каталогов и пространства имен используются в SQL-запросах для идентификации таблиц, принадлежащих к разным каталогам, аналогично нотации SELECT …​ FROM <database_name>.<table_name>. Для доступа к конкретной Iceberg-таблице используется следующий синтаксис:

SELECT ... FROM <catalog_name>.<namespace>.<table_name> ...

Iceberg предоставляет несколько бэкендов каталогов, таких как REST, Hive, JDBC и прочих. Ниже показан пример создания Iceberg-каталога типа Hive, который загружает таблицы из Hive Metastore:

$ spark3-sql
    --conf spark.sql.catalog.test_catalog_hive=org.apache.iceberg.spark.SparkCatalog
    --conf spark.sql.catalog.test_catalog_hive.type=hive
    --conf spark.sql.catalog.test_catalog_hive.warehouse=hdfs://ka-adh-1.ru-central1.internal:8020/user/admin/test_warehouse

Для обращения к таблицам внутри такого каталога необходимо добавить к именам таблиц префикс test_catalog_hive.

ПРИМЕЧАНИЕ
Параметры конфигурации каталогов, указанные с помощью --conf в примере выше, также можно указать в ADCM (Clusters → <clusterName> → Services → Spark3 → Primary Configuration → Custom spark-defaults.conf).

Iceberg предоставляет 2 имплементации каталогов:

  • org.apache.iceberg.spark.SparkCatalog. Позволяет работать только с Iceberg-таблицами, используя Hive Metastore или warehouse-директорию в Hadoop.

  • org.apache.iceberg.spark.SparkSessionCatalog. Добавляет поддержку Iceberg-таблиц к функционалу встроенного Spark-каталога. При работе с обычными таблицами (не Iceberg), SparkSessionCatalog делегирует выполнение встроенному каталогу Spark. В сервисе Spark3 данная реализация используется по умолчанию, позволяя работать как с Iceberg-таблицами, так и с обычными (не Iceberg) таблицами.

В сервисе Spark3 каталог по умолчанию называется spark-catalog. Данный каталог поддерживает одноуровневые пространства имен, которые соответствуют именам баз данных Hive. Примеры запросов в данной статье предназначены для выполнения в дефолтном каталоге spark-catalog с пространством имен default.

Параметры конфигурации каталогов

Каталоги Iceberg можно настроить с помощью конфигурационных свойств spark.sql.catalog.<catalog-name>.*. Общие параметры конфигурации для Hive и Hadoop представлены в таблице ниже.

Свойство Описание

spark.sql.catalog.<catalog-name>.type

Определяет реализацию Iceberg-каталога. Доступны следующие значения

  • hive (HiveCatalog);

  • hadoop (HadoopCatalog);

  • rest (RESTCatalog);

  • glue (GlueCatalog);

  • jdbc (JdbcCatalog);

  • nessie (NessieCatalog);

  • пустое значение, если используется кастомная реализация каталога.

spark.sql.catalog.<catalog-name>.catalog-impl

Кастомная реализация каталога Iceberg. Если не задано значение spark.sql.catalog.<catalog-name>.type, значение свойства spark.sql.catalog.<catalog-name>.catalog-impl не должно быть пустым

spark.sql.catalog.<catalog-name>.io-impl

Пользовательская реализация FileIO

spark.sql.catalog.<catalog-name>.metrics-reporter-impl

Пользовательская реализация MetricsReporter

spark.sql.catalog.<catalog-name>.default-namespace

Пространство имен, используемое по умолчанию для каталога. Значение по умолчанию: default

spark.sql.catalog.<catalog-name>.uri

Hive Metastore URL для каталогов Hive (thrift://host:port) либо REST URL для каталогов типа REST

spark.sql.catalog.<catalog-name>.warehouse

Базовый путь к warehouse-директории. Например, hdfs://nn:8020/warehouse/path

spark.sql.catalog.<catalog-name>.cache-enabled

Включает или отключает кеширование для каталога. По умолчанию используется true (кеширование активно)

spark.sql.catalog.<catalog-name>.cache.expiration-interval-ms

Время, по истечении которого кешированные записи каталога будут удалены. Данный параметр актуален, если spark.sql.catalog.<catalog-name>.cache-enabled=true. Установка значения -1 отключает срок действия кеша, а использование значения 0 полностью отключает механизм кеширования, независимо от значения cache-enabled. Значение по умолчанию: 30000 (30 секунд)

spark.sql.catalog.<catalog-name>.table-default.propertyKey

Устанавливает значение по умолчанию для свойства таблицы Iceberg с ключом propertyKey. Это значение будет использовано для всех таблиц, создаваемых в текущем каталоге, если только оно не переопределено

spark.sql.catalog.<catalog-name>.table-override.propertyKey

Устанавливает значение для свойства таблицы Iceberg с ключом propertyKey, которое не может быть переопределено

Дополнительная информация о создании и настройке каталогов доступна в документации Iceberg.

Команды DDL

Создание таблицы Iceberg

Для создания Iceberg-таблицы добавьте предложение USING iceberg к стандартному оператору CREATE TABLE.

Пример:

CREATE TABLE spark_catalog.default.transactions(
    txn_id int,
    acc_id int,
    txn_value double,
    txn_date date)
USING iceberg;

Выполните команду DESCRIBE FORMATTED, чтобы увидеть информацию о только что созданной таблице:

DESCRIBE FORMATTED spark_catalog.default.transactions;

Пример вывода:

txn_id                  int
acc_id                  int
txn_value               double
txn_date                date

# Metadata Columns
_spec_id                int
_partition              struct<>
_file                   string
_pos                    bigint
_deleted                boolean

# Detailed Table Information
Name                    spark_catalog.default.transactions
Type                    MANAGED
Location                hdfs://adh/apps/hive/warehouse/transactions
Provider                iceberg
Owner                   hdfs
Table Properties        [current-snapshot-id=none,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]

Значение Provider в выводе указывает на то, что таблица была создана как Iceberg-таблица.

Создание партиционированной таблицы

Чтобы создать таблицу с партициями, используйте синтаксис, как показано в следующем примере:

CREATE TABLE spark_catalog.default.transactions_partitioned (
    txn_id bigint,
    acc_id int,
    txn_value double,
    txn_date date)
USING iceberg
PARTITIONED BY (acc_id);

Вы также можете использовать выражения преобразования (transform expressions) в предложении PARTITIONED BY для создания скрытых партиций вместо явного указания партиции. Пример приведен ниже.

CREATE TABLE spark_catalog.default.transactions_partitioned_transform (
    id bigint,
    acc_id int,
    txn_value double,
    txn_date timestamp)
USING iceberg
PARTITIONED BY (day(txn_date)); (1)
1 day(txn_date) — выражение преобразования для создания партиций по дням.

Чтобы проверить разбиение таблицы на партиции, запишите в таблицу тестовые данные с помощью следующей команды:

INSERT INTO spark_catalog.default.transactions_partitioned_transform VALUES
(1, 1002, 10.00, cast('2023-01-01' as timestamp)),
(2, 1001, 20.00, cast('2023-01-02' as timestamp));

Затем проверьте содержимое warehouse-директории в HDFS:

$ hdfs dfs -ls -R /apps/hive/warehouse/transactions_partitioned_transform

Пример вывода:

drwxrwxr-x   - hdfs hadoop          0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data
drwxrwxr-x   - hdfs hadoop          0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-01
-rw-r--r--   3 hdfs hadoop       1206 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-01/00000-12-a029426f-ca10-4efc-b87c-389694c218bc-0-00001.parquet
drwxrwxr-x   - hdfs hadoop          0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-02
-rw-r--r--   3 hdfs hadoop       1206 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-02/00000-12-a029426f-ca10-4efc-b87c-389694c218bc-0-00002.parquet
drwxrwxr-x   - hdfs hadoop          0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata
-rw-r--r--   3 hdfs hadoop       1381 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/00000-28c5f350-f14a-4fcb-b509-10a327fa19cb.metadata.json
-rw-r--r--   3 hdfs hadoop       2519 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/00001-7c83b7d2-0a43-463e-b180-6589a0f696af.metadata.json
-rw-r--r--   3 hdfs hadoop       7157 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/28884384-0539-4ca5-8dcb-95d00de4419e-m0.avro
-rw-r--r--   3 hdfs hadoop       4264 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/snap-2743337418634934372-1-28884384-0539-4ca5-8dcb-95d00de4419e.avro

Согласно выводу, тестовые данные были распределены по нескольким HDFS-директориям (партициям) с именами txn_date_day={date}, каждая из которых представляет отдельный день.

CREATE TABLE: CTAS/RTAS

Вы можете использовать синтаксис CREATE TABLE AS <tbl_name> (CTAS) и REPLACE TABLE AS <tbl_name> (RTAS) для создания/перезаписи таблицы Iceberg на основе уже существующей таблицы. Примеры приведены ниже.

CREATE TABLE spark_catalog.default.transactions_ctas USING iceberg
AS SELECT txn_id, txn_value
FROM spark_catalog.default.transactions;
REPLACE TABLE spark_catalog.default.transactions_rtas USING iceberg
AS SELECT txn_id, txn_value
FROM spark_catalog.default.transactions
WHERE txn_value < 50;

Операции создания/перезаписи таблицы с использованием синтаксиса CTAS/RTAS являются атомарными при использовании реализации каталога SparkCatalog. При использовании SparkSessionCatalog (используется по умолчанию в Spark3) атомарность операций CTAS/RTAS не гарантируется.

DROP TABLE

Для удаления Iceberg-таблицы используйте стандартный синтаксис:

DROP TABLE <table_name> [PURGE]

Необязательный флаг PURGE определяет, нужно ли удалять содержимое таблицы. Если PURGE не установлен, из каталога удаляются только метаданные таблицы.

ALTER TABLE

Сервис Spark3 предоставляет богатую поддержку операций ALTER TABLE для Iceberg-таблиц. Ниже показаны примеры основных операций ALTER TABLE. Более подробную информацию о командах ALTER TABLE для Iceberg-таблиц можно найти в документации Iceberg.

ALTER TABLE …​ RENAME TO

 
Следующая команда переименовывает таблицу Iceberg. Команда изменяет только метаданные таблицы, не затрагивая данные таблицы.

ALTER TABLE spark_catalog.default.transactions
RENAME TO spark_catalog.default.transactions_new;
ALTER TABLE …​ SET TBLPROPERTIES

 
Следующий пример устанавливает формат записи ORC, используя параметры таблиц Iceberg.

ALTER TABLE spark_catalog.default.transactions
SET TBLPROPERTIES ('write.format.default'='orc');
РЕКОМЕНДАЦИЯ
Чтобы очистить свойство таблицы, используйте команду UNSET.
ALTER TABLE …​ ADD COLUMN(S)

 
Вы можете добавить столбец в существующую таблицу Iceberg, используя предложение ADD COLUMN.

Пример:

ALTER TABLE spark_catalog.default.transactions
ADD COLUMN (
    description string comment "A transaction's description"
);

Вывод команды DESCRIBE после выполнения команды:

DESCRIBE spark_catalog.default.transactions;
txn_id                  int
acc_id                  int
txn_value               double
txn_date                date
description             string                  A transaction's description

Чтобы добавить несколько столбцов одной командой, используйте предложение ADD COLUMNS, разделив столбцы запятой.

Пример:

ALTER TABLE spark_catalog.default.transactions
ADD COLUMNS (
    is_committed boolean,
    is_acknowledged boolean
);

Вывод команды DESCRIBE после выполнения команды:

DESCRIBE spark_catalog.default.transactions;
txn_id                  int
acc_id                  int
txn_value               double
txn_date                date
description             string                  A transaction's description
is_committed            boolean
is_acknowledged         boolean

Для добавления столбца с композитным типом данных struct, например struct<x, y>, используйте следующий синтаксис:

ALTER TABLE spark_catalog.default.transactions
ADD COLUMN processor_metadata struct<x: double, y: double>;

Больше информации об использовании составных типов данных (массивы, карты и т. д.) доступно в документации Iceberg.

ALTER TABLE …​ ALTER COLUMN

 
Iceberg позволяет изменять определение столбцов. Например, вы можете изменить тип данных столбца, сделать столбец nullable, добавить комментарий и так далее. Iceberg позволяет расширить тип данных столбца, если расширение безопасно, например:

  • intbigint

  • floatdouble

  • decimal(i,j)decimal(i1,j) при i1 > i

ALTER TABLE spark_catalog.default.transactions
ALTER COLUMN txn_id TYPE bigint;

Вывод команды DESCRIBE после выполнения команды:

DESCRIBE spark_catalog.default.transactions;
txn_id                  bigint
acc_id                  int
txn_value               double
txn_date                date
description             string                  A transaction's description
is_committed            boolean
is_acknowledged         boolean
processor_metadata      struct<x:double,y:double>

Чтобы сделать поле nullable, используйте следующий синтаксис.

ALTER TABLE spark_catalog.default.transactions
ALTER COLUMN acc_id DROP NOT NULL;

Обратное изменение nullable-столбца на non-nullable недопустимо, поскольку Iceberg не располагает информацией о существующих записях со значениями NULL.

ALTER TABLE …​ ADD PARTITION FIELD

 
Iceberg поддерживает эволюцию схемы партиционирования, позволяя изменять партиции в существующих таблицах. Изменение схемы партиционирования является операцией над метаданными, которая не затрагивает существующие данные таблицы. После изменения схемы новые данные будут записаны в соответствии с обновленной схемой партиционирования, однако уже существующие данные будут соответствовать старой схеме. В старых файлах данных будут содержаться значения NULL для новых полей партиций.

Пример:

ALTER TABLE spark_catalog.default.transactions_partitioned
ADD PARTITION FIELD txn_date;

Выполните команду DESCRIBE, чтобы убедиться, что схема партиционирования таблицы была изменена. Пример вывода показан ниже.

DESCRIBE spark_catalog.default.transactions_partitioned;
txn_id                  bigint
acc_id                  int
txn_value               decimal(10,2)
txn_date                date
# Partition Information
# col_name              data_type               comment
acc_id                  int
txn_date                date
ВНИМАНИЕ
Изменение партиций в Iceberg-таблицах следует выполнять с осторожностью. Некоторые изменения могут привести к неожиданным результатам и к повреждению метаданных таблицы. Больше подробной информации доступно в разделах ADD PARTITION, DROP PARTITION и REPLACE PARTITION.
ALTER TABLE …​ WRITE ORDERED BY

 
Вы можете указать порядок сортировки для таблицы Iceberg, который используется для автоматической сортировки данных, записываемых в эту таблицу. Например, операция Spark MERGE INTO использует порядок сортировки таблицы.

ALTER TABLE spark_catalog.default.transactions
WRITE ORDERED BY txn_id;
ПРИМЕЧАНИЕ
Порядок записи в таблицу не гарантирует порядок данных при выборке. Он влияет только на то, как данные записываются в таблицу.

Чтобы отменить порядок сортировки таблицы, используйте ключевое слово UNORDERED, как показано в примере ниже.

ALTER TABLE spark_catalog.default.transactions
WRITE UNORDERED;
ALTER TABLE …​ CREATE BRANCH

 
Вы можете создавать ветки (branches) для Iceberg-таблиц с помощью оператора CREATE BRANCH. Ниже приведены несколько примеров.

ALTER TABLE spark_catalog.default.transactions CREATE BRANCH `test-branch`; (1)
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH IF NOT EXISTS `test-branch` RETAIN 7 DAYS; (2)
ALTER TABLE spark_catalog.default.transactions CREATE OR REPLACE BRANCH `test-branch`; (3)
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH `test-branch` AS OF VERSION 123 (4)
1 Создание ветки с политикой хранения по умолчанию.
2 Создание ветки, если таковой не существует, и установка времени хранения 7 дней.
3 Создание или замена ветки, если ветка уже существует.
4 Создание ветки из снепшота 123 с использованием политики хранения по умолчанию. Снепшот 123 должен быть создан до выполнения этой команды.

Для замены и удаления веток используйте команды REPLACE BRANCH и DROP BRANCH.

ALTER TABLE …​ CREATE TAG

 
Вы можете создавать теги (tags) для "маркировки" состояния таблицы и возврата к этому состоянию таблицы в будущем. Ниже приведены несколько примеров.

ALTER TABLE spark_catalog.default.transactions CREATE TAG `test-tag`; (1)
ALTER TABLE spark_catalog.default.transactions CREATE TAG IF NOT EXISTS `test-tag` RETAIN 365 DAYS; (2)
ALTER TABLE spark_catalog.default.transactions CREATE OR REPLACE TAG `test-tag`; (3)
ALTER TABLE spark_catalog.default.transactions CREATE TAG `test-tag` AS OF VERSION 123 (4)
1 Создание тега с политикой хранения по умолчанию.
2 Создание тега (если такого тега не существует) из текущего снепшота, и установка времени хранения 365 дней.
3 Создание или замена тега, если такой тег уже существует.
4 Создание тега из снепшота 123 с использованием политики хранения по умолчанию. Снепшот 123 должен быть создан до выполнения этой команды.

Запрос данных из таблицы с помощью тега test-tag выглядит следующим образом:

SELECT * FROM spark_catalog.default.transactions
VERSION AS OF 'test-tag';

Для замены и удаления тегов используйте команды REPLACE TAG и DROP TAG.

Выборка данных

Чтобы запросить данные из таблицы Iceberg с помощью Spark SQL, необходимо указать имя каталога, пространство имен и имя таблицы.

Пример:

SELECT * FROM spark_catalog.default.transactions;

Где:

  • spark_catalog — имя каталога по умолчанию.

  • default — имя пространства имен по умолчанию.

  • transactions — имя запрашиваемой таблицы.

Чтобы загрузить таблицу в виде DataFrame, используйте метод table(). Пример PySpark:

df = spark.table("spark_catalog.default.transactions")

Time travel

Iceberg поддерживают функцию time travel, которая позволяет запрашивать данные из определенного снепшота таблицы, созданного в прошлом, используя идентификатор снепшота или временную метку.

РЕКОМЕНДАЦИЯ
Информация о всех снепшотах, доступных для определенной таблицы, хранится в таблице метаданных snapshots.

В следующем сценарии показано, как выполнять time-travel запросы с помощью Spark.

  1. Создайте тестовую таблицу с помощью spark3-sql:

    CREATE TABLE spark_catalog.default.transactions_tt(
        txn_id int,
        acc_id int,
        txn_value double,
        txn_date date)
    USING iceberg;
  2. Запишите тестовые данные в таблицу:

    INSERT INTO spark_catalog.default.transactions_tt VALUES
    (1, 1002, 100.00, cast('2023-01-01' as date)),
    (2, 1001, 50.00, cast('2023-01-02' as date));
  3. Выполните запрос к таблице метаданных snapshot:

    SELECT * FROM spark_catalog.default.transactions_tt.snapshots;

    Вывод команды имеет следующий вид:

    2024-06-27 14:54:05.664 6216997363030961663     NULL    append  hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-6216997363030961663-1-f55143c4-0351-4f6a-8978-f0c2fa5666b7.avro    {"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"}

    Вывод содержит информацию о единственном доступном снепшоте (6216997363030961663). Данный снепшот был создан сразу после выполнения операции INSERT.

  4. Добавьте в таблицу дополнительные тестовые данные:

    INSERT INTO spark_catalog.default.transactions_tt VALUES
    (3, 1003, 150.00, cast('2024-01-03' as date));
  5. Выполните запрос к таблице метаданных snapshot еще раз:

    SELECT * FROM spark_catalog.default.transactions_tt.snapshots;

    Результат:

    2024-06-27 14:54:05.664 6216997363030961663     NULL    append  hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-6216997363030961663-1-f55143c4-0351-4f6a-8978-f0c2fa5666b7.avro    {"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"}
    2024-06-27 14:56:20.781 3775233323500475562     6216997363030961663     append  hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-3775233323500475562-1-6500b975-c67e-4171-8c5d-70623933abc3.avro    {"added-data-files":"1","added-files-size":"1126","added-records":"1","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"3380","total-position-deletes":"0","total-records":"3"}

    Теперь в выводе отображается информация о еще одном снепшоте. Данный снепшот был создан второй операцией INSERT.

  6. Выполните time-travel запрос, используя идентификатор или временную метку более старого снепшота. Для этого вы можете использовать предложения TIMESTAMP AS OF <timestamp> или VERSION AS OF <id>. Предложение VERSION AS OF может принимать идентификатор снепшота, имя ветки или имя тега.

    SELECT * FROM spark_catalog.default.transactions_tt VERSION AS OF 6216997363030961663; (1)
    SELECT * FROM spark_catalog.default.transactions_tt TIMESTAMP AS OF '2024-06-27 14:55:20.780'; (2)
    1 Выборка данных с использованием идентификатора снепшота.
    2 Выборка данных по временной метке. В этом примере значение временной метки подобрано таким образом, чтобы выборка осуществлялась из более старого снепшота.

    Вывод:

    1       1002    100.0   2023-01-01
    2       1001    50.0    2023-01-02

    В выводе содержатся только те данные, которые присутствовали в таблице после первой операции INSERT.

  7. Выполните обычный запрос SELECT:

    SELECT * FROM spark_catalog.default.transactions_tt;

    Вывод:

    1       1002    100.0   2023-01-01
    2       1001    50.0    2023-01-02
    3       1003    150.0   2024-01-03

    В отличие от предыдущих запросов с использованием time-travel, вывод содержит все данные таблицы.

Таблицы метаданных

Iceberg хранит несколько таблиц с метаданными, например history, files, snapshots и так далее. В этих таблицах хранится ценная информация о снепшотах, файлах, операциях и т. д., которая может быть полезна для построения запросов, связанных с time-travel, ветками и тегами. Вы можете просматривать таблицы метаданных, добавляя имя таблицы метаданных после имени исходной таблицы.

Пример:

SELECT * FROM spark_catalog.default.transactions.snapshots;

Пример вывода:

2024-06-21 14:30:36.001 7505187482383765765     NULL    append  hdfs://adh/apps/hive/warehouse/transactions/metadata/snap-7505187482383765765-1-be783958-0d56-4b2c-b002-54cdae7a349d.avro       {"changed-partition-count":"0","total-data-files":"0","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"0","total-position-deletes":"0","total-records":"0"}
2024-06-21 16:09:08.645 5529064852849657419     NULL    append  hdfs://adh/apps/hive/warehouse/transactions/metadata/snap-5529064852849657419-1-af755165-d1e2-476e-85a9-40ea179c2e9f.avro       {"added-data-files":"2","added-files-size":"2522","added-records":"9","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2522","total-position-deletes":"0","total-records":"9"}

Запись данных

Вы можете записывать данные в таблицы Iceberg, используя традиционный SQL-синтаксис, например INSERT INTO, INSERT OVERWRITE, UPDATE. Кроме того, Iceberg поддерживает операции на уровне строк (row-level operations), такие как MERGE INTO и DELETE FROM, примеры использования которых показаны далее в статье. При записи в таблицу Iceberg типы данных Spark преобразуются в типы Iceberg в соответствии с таблицей преобразования типов Spark. Подробная информация о поддерживаемых операциях записи доступна в документации Iceberg.

MERGE INTO

 

Команда MERGE INTO выполняет обновление данных на уровне строк. Эта команда позволяет изменять содержимое целевой таблицы (target table), используя обновления в исходной таблице (source table). Условие обновления определяется предложением ON, которое используется аналогично условию JOIN.

Синтаксис команды MERGE INTO показан ниже:

MERGE INTO <target_table>
   USING <source_table>
   ON <merge_condition>
   WHEN MATCHED <matched_condition> THEN <matched_action> |
   WHEN NOT MATCHED THEN <not_matched_action>

Обновления строк в целевой таблице определяются с помощью предложения WHEN [NOT] MATCHED <matched_condition> THEN <matched_action>. Строки в целевой таблице можно обновлять и удалять, а те строки исходной таблицы, которые не совпадают с целевой, можно добавлять в целевую таблицу. Рассмотрим следующий пример, демонстрирующий использование MERGE INTO.

Предположим, что есть две таблицы Iceberg со следующим содержимым:

spark_catalog.default.transactions spark_catalog.default.transactions_copy
txn_id  acc_id  txn_value txn_date
1       1001    25.0      2023-02-03
2       1001    50.0      2023-03-03
3       1002    10.0      2023-04-01
txn_id  acc_id  txn_value txn_date
1       1003    20.0      2023-02-03
3       1001    50.0      2023-03-03
5       1002    100.0     2023-04-02

Следующая команда MERGE INTO удаляет все строки в таблице spark_catalog.default.transactions_copy, которые имеют совпадение в таблице spark_catalog.default.transactions по полю txn_id.

MERGE INTO spark_catalog.default.transactions_copy USING spark_catalog.default.transactions
  ON spark_catalog.default.transactions_copy.txn_id = spark_catalog.default.transactions.txn_id
  WHEN MATCHED THEN DELETE;

После выполнения этой команды содержимое таблиц имеет следующий вид.

spark_catalog.default.transactions spark_catalog.default.transactions_copy
txn_id  acc_id  txn_value txn_date
1       1001    25.0       2023-02-03
2       1001    50.0       2023-03-03
3       1002    10.0       2023-04-01
txn_id  acc_id  txn_value txn_date
5       1002    100.0     2023-04-02

Следующая команда MERGE INTO обновляет все строки в таблице spark_catalog.default.transactions, если поле acc_id совпадает с acc_id таблицы spark_catalog.default.transactions_copy. Для строк с совпадающими значениями acc_id команда устанавливает значение txn_value=0.

MERGE INTO spark_catalog.default.transactions USING spark_catalog.default.transactions_copy
  ON spark_catalog.default.transactions.acc_id = spark_catalog.default.transactions_copy.acc_id
  WHEN MATCHED THEN UPDATE SET txn_value=0;

После выполнения этой команды содержимое таблиц имеет следующий вид.

spark_catalog.default.transactions spark_catalog.default.transactions_copy
txn_id  acc_id  txn_value txn_date
1       1001    25.0      2023-02-03
2       1001    50.0      2023-03-03
3       1002    10.0      2023-04-01
txn_id  acc_id  txn_value txn_date
5       1002    0.0       2023-04-02
DELETE FROM

 

Операция DELETE FROM позволяет удалять данные из таблиц Iceberg на уровне строк. В запросах на удаление указывается фильтр для поиска удаляемых строк, как показано ниже. Если фильтр удаления соответствует всему разделу таблицы, Iceberg изменит только метаданные таблицы. Если фильтр соответствует отдельным строкам, Iceberg перезапишет только релевантные файлы данных.

Примеры:

DELETE FROM spark_catalog.default.transactions
WHERE txn_date > '2023-03-03';
DELETE FROM spark_catalog.default.transactions
WHERE txn_value < (SELECT min(txn_value) FROM spark_catalog.default.transactions_copy);

Запись данных в ветку

Чтобы записать данные в определенную ветку (branch) Iceberg-таблицы, необходимо указать идентификатор ветки в запросе.

ПРИМЕЧАНИЕ
Перед выполнением записи целевая ветка должна быть создана.

Например, чтобы записать данные в ветку с именем testbranch, используйте следующий синтаксис:

INSERT INTO spark_catalog.default.transactions.branch_testbranch VALUES
(1, 1003, 20.00, cast('2023-02-03' as date));

Если имя ветки содержит не только алфавитно-цифровые символы (например, test-branch), имя ветки должно быть заключено в обратные кавычки, как показано в примере ниже.

INSERT INTO spark_catalog.default.transactions.`branch_test-branch` VALUES
(1, 1003, 20.00, cast('2023-02-03' as date));

Чтобы получить данные из созданной выше ветки testbranch, используйте команду:

SELECT * FROM spark_catalog.default.transactions
VERSION AS OF 'testbranch';

Помимо записи данных в ветку с помощью INSERT, вы также можете использовать команды UPDATE, DELETE и MERGE INTO.

Примеры:

UPDATE spark_catalog.default.transactions.branch_testbranch
SET acc_id = 1005
WHERE acc_id = 1003
DELETE FROM spark_catalog.default.transactions.branch_testbranch
WHERE txn_id = 1;
MERGE INTO spark_catalog.default.transactions.branch_testbranch USING spark_catalog.default.transactions_copy
  ON spark_catalog.default.transactions.branch_testbranch.txn_id = spark_catalog.default.transactions_copy.txn_id
  WHEN MATCHED THEN DELETE;

Roll back

Iceberg предоставляет функцию rollback, которая позволяет откатить таблицу к более раннему состоянию. Данная функция может быть полезна, например, для отмены случайных или ошибочных изменений в таблицах. Данные таблицы можно откатить до нужного состояния, пока целевой снепшот доступен в метаданных Iceberg. Следующий пример демонстрирует базовое использование rollback-функции.

  1. Создайте тестовую таблицу Iceberg со следующим содержимым:

    txn_id  acc_id  txn_value txn_date
    1       1002    10.0      2024-01-01
    2       1001    20.0      2024-01-02

    Для этого выполните запросы в spark3-sql:

    CREATE TABLE spark_catalog.default.transactions_rollback_demo (
        txn_id int,
        acc_id int,
        txn_value double,
        txn_date date)
    USING iceberg;
    INSERT INTO spark_catalog.default.transactions_rollback_demo VALUES
    (1, 1002, 10.00, cast('2024-01-01' as timestamp)),
    (2, 1001, 20.00, cast('2024-01-02' as timestamp));
  2. Удалите одну строку из тестовой таблицы. На следующих шагах будет выполнен откат данной операции.

    DELETE FROM spark_catalog.default.transactions_rollback_demo
    WHERE txn_id = 2;
  3. Запросите информацию о снепшотах таблицы. Для этого выполните запрос к базе метаданных, как показано ниже:

    SELECT * FROM spark_catalog.default.transactions_rollback_demo.snapshots;

    Пример вывода:

    2024-07-17 07:59:56.31  6142059163774284478     NULL    append  hdfs://adh/apps/hive/warehouse/transactions_rollback_demo/metadata/snap-6142059163774284478-1-c2c3b442-648b-4e94-8b14-594ec65f66c5.avro{"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1720628013974_0014","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"}
    2024-07-17 08:02:55.859 8329113954968361656     6142059163774284478     delete  hdfs://adh/apps/hive/warehouse/transactions_rollback_demo/metadata/snap-8329113954968361656-1-375044c0-90fa-44f5-a6c6-72ebc12cc473.avro {"changed-partition-count":"1","deleted-data-files":"1","deleted-records":"1","removed-files-size":"1127","spark.app.id":"application_1720628013974_0015","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"1127","total-position-deletes":"0","total-records":"1"}

    Вывод показывает, что для тестовой таблицы доступны два снепшота. Первый был создан сразу после вставки данных, а второй — после операции удаления.

  4. Откатите содержимое таблицы до состояния, которое предшествовало операции удаления:

    CALL spark_catalog.system.rollback_to_snapshot('spark_catalog.default.transactions_rollback_demo', <snapshot-id>);

    Где <snapshot-id> — ID снепшота, который был создан до выполнения операции DELETE.

    РЕКОМЕНДАЦИЯ
    Вместо ID можно также указать временную метку для отката к нужному снепшоту.
  5. Запросите данные из таблицы, чтобы проверить результат выполнения rollback-функции.

    SELECT * FROM spark_catalog.default.transactions_rollback_demo;
    1       1002    10.0    2023-01-01
    2       1001    20.0    2023-01-02

    Вывод показывает, что удаленная строка была восстановлена.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней