Конференция Arenadata
Новое время — новый Greenplum
Мы приглашаем вас принять участие в конференции, посвященной будущему Open-Source Greenplum 19 сентября в 18:00:00 UTC +3. Встреча будет проходить в гибридном формате — и офлайн, и онлайн. Онлайн-трансляция будет доступна для всех желающих.
Внезапное закрытие Greenplum его владельцем — компанией Broadcom - стало неприятным сюрпризом для всех, кто использует или планирует начать использовать решения на базе этой технологии. Многие ожидают выхода стабильной версии Greenplum 7 и надеются на её дальнейшее активное развитие.
Arenadata не могла допустить, чтобы разрабатываемый годами Open-Source проект Greenplum прекратил своё существование, поэтому 19 сентября мы представим наш ответ на данное решение Broadcom, а участники сообщества получат исчерпывающие разъяснения на все вопросы о дальнейшей судьбе этой технологии.

На конференции вас ждёт обсуждение следующих тем:

  • План возрождения Greenplum;
  • Дорожная карта;
  • Экспертное обсуждение и консультации.
Осталось до события

Work with Iceberg tables in Spark

Apache Iceberg is an open, high-performance format for large analytic tables. The ADH Spark3 service adopts this format allowing you to work with Iceberg tables through Spark.

NOTE
Spark3 supports Iceberg tables starting ADH 3.2.4.3.

The support for Iceberg tables is enabled by default in the Spark3 ADH service. In this article, the Spark SQL module is used to demonstrate the examples of working with Iceberg tables through Spark. The Spark SQL module allows running traditional ANSI SQL commands on Spark DataFrame objects.

You can run Spark SQL queries in the spark3-sql shell. For this, run the command on a host where the Spark3 Client ADH component is installed.

$ spark3-sql

In this article, all SQL examples are designed for spark3-sql shell.

Spark catalogs

The Iceberg architecture introduces the concept of catalogs to the Spark API, allowing Spark to manage and load Iceberg tables by name. An Iceberg catalog is a named repository of tables and their metadata, which are organized into namespaces. Catalog names and namespaces are used in Spark SQL queries to qualify tables that belong to different catalogs, similarly to SELECT …​ FROM <database_name>.<table_name> notation. The following syntax is used to fully qualify a specific Iceberg table.

SELECT ... FROM <catalog_name>.<namespace>.<table_name> ...

Iceberg offers several catalog backends to track tables, such as REST, Hive, JDBC, etc. An example of creating a Hive-based Iceberg catalog that loads tables from a Hive Metastore is shown below:

$ spark3-sql
    --conf spark.sql.catalog.test_catalog_hive=org.apache.iceberg.spark.SparkCatalog
    --conf spark.sql.catalog.test_catalog_hive.type=hive
    --conf spark.sql.catalog.test_catalog_hive.warehouse=hdfs://ka-adh-1.ru-central1.internal:8020/user/admin/test_warehouse

To load tables from the catalog created above, you should prefix the table names with test_catalog_hive.

NOTE
Catalog configuration properties specified using --conf notation in the example above can also be set in ADCM (Clusters → <clusterName> → Services → Spark3 → Primary Configuration → Custom spark-defaults.conf).

Iceberg provides two catalog implementations:

  • org.apache.iceberg.spark.SparkCatalog. Allows working only with Iceberg tables using Hive Metastore or a Hadoop warehouse directory.

  • org.apache.iceberg.spark.SparkSessionCatalog. Adds support for Iceberg tables to the built-in Spark catalog. When working with non-Iceberg tables, SparkSessionCatalog delegates interaction to the built-in Spark catalog. In Spark3 service, this implementation is used by default, allowing you to work with both Iceberg and non-Iceberg tables.

In Spark3, the default catalog is named spark-catalog. This catalog supports single-level namespaces that correspond to Hive database names. Throughout this article, the examples use the catalog named spark-catalog and the default namespace.

Catalog configuration properties

Iceberg catalogs can be configured using spark.sql.catalog.<catalog-name>.* properties. Common configuration properties for Hive and Hadoop are presented in the table below.

Property Description

spark.sql.catalog.<catalog-name>.type

Defines the underlying Iceberg catalog implementation. The following values are supported:

  • hive (HiveCatalog);

  • hadoop (HadoopCatalog);

  • rest (RESTCatalog);

  • glue (GlueCatalog);

  • jdbc (JdbcCatalog);

  • nessie (NessieCatalog);

  • empty value if a custom catalog implementation is used.

spark.sql.catalog.<catalog-name>.catalog-impl

A custom Iceberg catalog implementation. If spark.sql.catalog.<catalog-name>.type is null, spark.sql.catalog.<catalog-name>.catalog-impl must not be null

spark.sql.catalog.<catalog-name>.io-impl

A custom FileIO implementation

spark.sql.catalog.<catalog-name>.metrics-reporter-impl

A custom MetricsReporter implementation

spark.sql.catalog.<catalog-name>.default-namespace

The default current namespace for the catalog. The default value is default

spark.sql.catalog.<catalog-name>.uri

Hive Metastore URL (thrift://host:port) for Hive-based catalogs or REST URL for REST-based catalogs

spark.sql.catalog.<catalog-name>.warehouse

The base path to the warehouse directory. For example, hdfs://nn:8020/warehouse/path

spark.sql.catalog.<catalog-name>.cache-enabled

Defines whether to enable catalog caching. Defaults to true

spark.sql.catalog.<catalog-name>.cache.expiration-interval-ms

Sets a timeout after which cached catalog entries are expired. This parameter is effective only if spark.sql.catalog.<catalog-name>.cache-enabled=true. Setting -1 disables cache expiration and setting 0 disables caching entirely, regardless of the cache-enabled value. Defaults to 30000 (30 seconds)

spark.sql.catalog.<catalog-name>.table-default.propertyKey

Sets a default value for an Iceberg table property with propertyKey. This value will be set for all tables created by the current catalog unless overridden

spark.sql.catalog.<catalog-name>.table-override.propertyKey

Sets a value for an Iceberg table property with propertyKey that cannot be overridden

For more information on creating and configuring Iceberg catalogs, see Iceberg documentation.

DDL commands

Create an Iceberg table

To create an Iceberg table, append the USING iceberg clause to the standard CREATE TABLE statement.

Example:

CREATE TABLE spark_catalog.default.transactions(
    txn_id int,
    acc_id int,
    txn_value double,
    txn_date date)
USING iceberg;

Run the DESCRIBE FORMATTED command to view information about the newly created table:

DESCRIBE FORMATTED spark_catalog.default.transactions;

Sample output:

txn_id                  int
acc_id                  int
txn_value               double
txn_date                date

# Metadata Columns
_spec_id                int
_partition              struct<>
_file                   string
_pos                    bigint
_deleted                boolean

# Detailed Table Information
Name                    spark_catalog.default.transactions
Type                    MANAGED
Location                hdfs://adh/apps/hive/warehouse/transactions
Provider                iceberg
Owner                   hdfs
Table Properties        [current-snapshot-id=none,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]

The Provider value in the output indicates that the table has been created as an Iceberg table.

Create a partitioned table

To create a partitioned table, use the syntax as shown below:

CREATE TABLE spark_catalog.default.transactions_partitioned (
    txn_id bigint,
    acc_id int,
    txn_value double,
    txn_date date)
USING iceberg
PARTITIONED BY (acc_id);

You can also use transform expressions in the PARTITIONED BY clause to create hidden partitions instead of specifying partitions explicitly. An example is below.

CREATE TABLE spark_catalog.default.transactions_partitioned_transform (
    id bigint,
    acc_id int,
    txn_value double,
    txn_date timestamp)
USING iceberg
PARTITIONED BY (day(txn_date)); (1)
1 day(txn_date) is a transformation expression to create partitions by day.

To verify the partitioning, insert some data into the test table as shown below:

INSERT INTO spark_catalog.default.transactions_partitioned_transform VALUES
(1, 1002, 10.00, cast('2023-01-01' as timestamp)),
(2, 1001, 20.00, cast('2023-01-02' as timestamp));

Then, check the warehouse directory contents in HDFS:

$ hdfs dfs -ls -R /apps/hive/warehouse/transactions_partitioned_transform

Sample output:

drwxrwxr-x   - hdfs hadoop          0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data
drwxrwxr-x   - hdfs hadoop          0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-01
-rw-r--r--   3 hdfs hadoop       1206 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-01/00000-12-a029426f-ca10-4efc-b87c-389694c218bc-0-00001.parquet
drwxrwxr-x   - hdfs hadoop          0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-02
-rw-r--r--   3 hdfs hadoop       1206 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-02/00000-12-a029426f-ca10-4efc-b87c-389694c218bc-0-00002.parquet
drwxrwxr-x   - hdfs hadoop          0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata
-rw-r--r--   3 hdfs hadoop       1381 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/00000-28c5f350-f14a-4fcb-b509-10a327fa19cb.metadata.json
-rw-r--r--   3 hdfs hadoop       2519 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/00001-7c83b7d2-0a43-463e-b180-6589a0f696af.metadata.json
-rw-r--r--   3 hdfs hadoop       7157 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/28884384-0539-4ca5-8dcb-95d00de4419e-m0.avro
-rw-r--r--   3 hdfs hadoop       4264 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/snap-2743337418634934372-1-28884384-0539-4ca5-8dcb-95d00de4419e.avro

According to the sample output, the test data has been distributed among several HDFS directories (partitions) named txn_date_day={date}, each representing a distinct day.

CREATE TABLE using CTAS/RTAS

You can use the CREATE TABLE AS <tbl_name> (CTAS) and REPLACE TABLE AS <tbl_name> (RTAS) syntax to create/replace an Iceberg table based on an existing table. The examples are below.

CREATE TABLE spark_catalog.default.transactions_ctas USING iceberg
AS SELECT txn_id, txn_value
FROM spark_catalog.default.transactions;
REPLACE TABLE spark_catalog.default.transactions_rtas USING iceberg
AS SELECT txn_id, txn_value
FROM spark_catalog.default.transactions
WHERE txn_value < 50;

The table creation/replacement operation using CTAS/RTAS syntax is atomic only when using SparkCatalog. With SparkSessionCatalog implementation (used by default in Spark3), the atomicity of the CTAS/RTAS operations is not guaranteed.

DROP TABLE

To delete an Iceberg table, use the standard syntax:

DROP TABLE <table_name> [PURGE]

The optional PURGE flag defines whether the table contents should also be deleted. If PURGE is not set, the table metadata is only deleted from the catalog.

ALTER TABLE

The Spark3 service supports rich ALTER TABLE capabilities for Iceberg tables. Below you can find common ALTER TABLE operations with examples. More details about Iceberg ALTER TABLE commands can be found in Iceberg documentation.

ALTER TABLE …​ RENAME TO

 
The following command renames an Iceberg table. The command affects only the table’s metadata, keeping the table data intact.

ALTER TABLE spark_catalog.default.transactions
RENAME TO spark_catalog.default.transactions_new;
ALTER TABLE …​ SET TBLPROPERTIES

 
The following example sets the write format to ORC using the Iceberg table properties.

ALTER TABLE spark_catalog.default.transactions
SET TBLPROPERTIES ('write.format.default'='orc');
TIP
To clear a table property, use the UNSET command.
ALTER TABLE …​ ADD COLUMN(S)

 
You can add a column to an existing Iceberg table by using the ADD COLUMN clause. Example:

ALTER TABLE spark_catalog.default.transactions
ADD COLUMN (
    description string comment "A transaction's description"
);

The DESCRIBE command output:

DESCRIBE spark_catalog.default.transactions;
txn_id                  int
acc_id                  int
txn_value               double
txn_date                date
description             string                  A transaction's description

To add multiple columns with one command, use the ADD COLUMNS clause and separate the columns by a comma. Example:

ALTER TABLE spark_catalog.default.transactions
ADD COLUMNS (
    is_committed boolean,
    is_acknowledged boolean
);

The DESCRIBE command output:

DESCRIBE spark_catalog.default.transactions;
txn_id                  int
acc_id                  int
txn_value               double
txn_date                date
description             string                  A transaction's description
is_committed            boolean
is_acknowledged         boolean

To add a column with a struct data type, such as struct<x, y>, use the following syntax:

ALTER TABLE spark_catalog.default.transactions
ADD COLUMN processor_metadata struct<x: double, y: double>;

For more details on working with composite data types, like arrays, maps, etc., see Iceberg documentation.

ALTER TABLE …​ ALTER COLUMN

 
You can change the definition of a column; for example, change the column data type, make the column nullable, add a comment, etc. Iceberg allows changing the column data type if the update is safe, for example:

  • intbigint

  • floatdouble

  • decimal(i,j)decimal(i1,j) assuming i1 > i

ALTER TABLE spark_catalog.default.transactions
ALTER COLUMN txn_id TYPE bigint;

The DESCRIBE command output:

DESCRIBE spark_catalog.default.transactions;
txn_id                  bigint
acc_id                  int
txn_value               double
txn_date                date
description             string                  A transaction's description
is_committed            boolean
is_acknowledged         boolean
processor_metadata      struct<x:double,y:double>

To make a field nullable, use the syntax as shown below.

ALTER TABLE spark_catalog.default.transactions
ALTER COLUMN acc_id DROP NOT NULL;

However, you cannot change a nullable column to a non-nullable since Iceberg does not know whether there are existing entries with NULL values.

ALTER TABLE …​ ADD PARTITION FIELD

 
Iceberg supports partitioning scheme evolution, allowing you to modify partition fields in existing tables. The modification of a partitioning scheme is a metadata operation that does not affect the existing table data. New data will be written as per the new partitioning spec, however, the existing data will adhere to the old partitioning layout. Old data files will have NULL values for the new partition fields in the metadata tables. Example:

ALTER TABLE spark_catalog.default.transactions_partitioned
ADD PARTITION FIELD txn_date;

Run the DESCRIBE command to ensure the partitioning spec has been updated. The output:

DESCRIBE spark_catalog.default.transactions_partitioned;
txn_id                  bigint
acc_id                  int
txn_value               decimal(10,2)
txn_date                date
# Partition Information
# col_name              data_type               comment
acc_id                  int
txn_date                date
CAUTION
Changing the partitioning scheme of an Iceberg table should always be done with care. Some changes may lead to unexpected behavior and can result in corrupted table metadata. For more details, see ADD, DROP, and REPLACE partition field commands.
ALTER TABLE …​ WRITE ORDERED BY

 
You can specify the sorting order for an Iceberg table that is used to automatically sort data written to that table. For example, the Spark’s MERGE INTO operation uses table ordering.

ALTER TABLE spark_catalog.default.transactions
WRITE ORDERED BY txn_id;
NOTE
The table write order does not guarantee data order when querying data. It only affects how data is written to the table.

To unset the sorting order for a table, use the UNORDERED keyword as shown in the example below.

ALTER TABLE spark_catalog.default.transactions
WRITE UNORDERED;
ALTER TABLE …​ CREATE BRANCH

 
You can create branches for an Iceberg table by using the CREATE BRANCH statement. Consider the examples below.

ALTER TABLE spark_catalog.default.transactions CREATE BRANCH `test-branch`; (1)
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH IF NOT EXISTS `test-branch` RETAIN 7 DAYS; (2)
ALTER TABLE spark_catalog.default.transactions CREATE OR REPLACE BRANCH `test-branch`; (3)
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH `test-branch` AS OF VERSION 123 (4)
1 Creates a branch with the default retention policy.
2 Creates a branch if it does not exist and sets the retention time to 7 days.
3 Creates or replaces the branch if it already exists.
4 Creates a branch at the 123 snapshot using the default retention. The 123 snapshot must be available when running this command.

To replace and drop branches, use the REPLACE BRANCH and DROP BRANCH commands.

ALTER TABLE …​ CREATE TAG

 
You can create Iceberg tags to "label" the state of a table and return to this table state in the future. Consider the examples below.

ALTER TABLE spark_catalog.default.transactions CREATE TAG `test-tag`; (1)
ALTER TABLE spark_catalog.default.transactions CREATE TAG IF NOT EXISTS `test-tag` RETAIN 365 DAYS; (2)
ALTER TABLE spark_catalog.default.transactions CREATE OR REPLACE TAG `test-tag`; (3)
ALTER TABLE spark_catalog.default.transactions CREATE TAG `test-tag` AS OF VERSION 123 (4)
1 Creates a tag with the default retention policy.
2 Creates a tag if it does not exist at the current snapshot and sets the retention time to 365 days.
3 Creates or replaces the tag if it already exists.
4 Creates a tag at the 123 snapshot using the default retention. The 123 snapshot must be pre-created when running this command.

Querying the table data using the tag created in the above example looks as follows:

SELECT * FROM spark_catalog.default.transactions
VERSION AS OF 'test-tag';

To replace and drop tags, use the REPLACE TAG and DROP TAG commands.

Query data

To query data from an Iceberg table using Spark SQL, you have to provide a catalog name, namespace, and table name.

Example:

SELECT * FROM spark_catalog.default.transactions;

Where:

  • spark_catalog is the default catalog name.

  • default is the default namespace.

  • transactions is the requested table name.

To load a table as a DataFrame, use the table() method. For example (PySpark):

df = spark.table("spark_catalog.default.transactions")

Time travel reads

Iceberg tables support the time travel feature that allows you to query data from a specific table snapshot that was created at some point in the past and is identified by an ID or timestamp.

TIP
The information about all the snapshots available for a given table is stored in the snapshots metadata table.

The following scenario shows how to run time-travel queries through Spark.

  1. Create a test table using spark3-sql:

    CREATE TABLE spark_catalog.default.transactions_tt(
        txn_id int,
        acc_id int,
        txn_value double,
        txn_date date)
    USING iceberg;
  2. Insert some data into the test table:

    INSERT INTO spark_catalog.default.transactions_tt VALUES
    (1, 1002, 100.00, cast('2023-01-01' as date)),
    (2, 1001, 50.00, cast('2023-01-02' as date));
  3. Inspect the snapshot metadata table:

    SELECT * FROM spark_catalog.default.transactions_tt.snapshots;

    This produces an output similar to the following:

    2024-06-27 14:54:05.664 6216997363030961663     NULL    append  hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-6216997363030961663-1-f55143c4-0351-4f6a-8978-f0c2fa5666b7.avro    {"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"}

    The output indicates that currently there is one snapshot (6216997363030961663) available for the table. This snapshot has been created right after the INSERT operation.

  4. Add more test data to the table:

    INSERT INTO spark_catalog.default.transactions_tt VALUES
    (3, 1003, 150.00, cast('2024-01-03' as date));
  5. Query the snapshot metadata table again:

    SELECT * FROM spark_catalog.default.transactions_tt.snapshots;

    The result:

    2024-06-27 14:54:05.664 6216997363030961663     NULL    append  hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-6216997363030961663-1-f55143c4-0351-4f6a-8978-f0c2fa5666b7.avro    {"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"}
    2024-06-27 14:56:20.781 3775233323500475562     6216997363030961663     append  hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-3775233323500475562-1-6500b975-c67e-4171-8c5d-70623933abc3.avro    {"added-data-files":"1","added-files-size":"1126","added-records":"1","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"3380","total-position-deletes":"0","total-records":"3"}

    Now the output indicates one more snapshot — the one that was created by the second INSERT operation.

  6. Run the time-travel query using the ID or the timestamp of the older snapshot. You can run time travel queries in SQL using the TIMESTAMP AS OF <timestamp> or VERSION AS OF <snapshot-id> clauses. The VERSION AS OF clause can accept a snapshot ID, branch name, or tag name.

    Example:

    SELECT * FROM spark_catalog.default.transactions_tt VERSION AS OF 6216997363030961663; (1)
    SELECT * FROM spark_catalog.default.transactions_tt TIMESTAMP AS OF '2024-06-27 14:55:20.780'; (2)
    1 Selects data using the ID of the older snapshot.
    2 Selects data using a timestamp. In this example, the timestamp value is chosen intentionally to pick data from the older snapshot.

    The output:

    1       1002    100.0   2023-01-01
    2       1001    50.0    2023-01-02

    The result set contains only the data that was present in the table right after the first INSERT operation.

  7. Run an ordinary SELECT query:

    SELECT * FROM spark_catalog.default.transactions_tt;

    The output:

    1       1002    100.0   2023-01-01
    2       1001    50.0    2023-01-02
    3       1003    150.0   2024-01-03

    Unlike the previous time-travel queries, this result set contains all the data related to both INSERT operations.

Metadata tables

Iceberg maintains metadata tables, like history, files, snapshots, and so on. These tables store valuable information about snapshots, files, operations, etc., that can be useful for time-travel, branching, and tag-related queries. You can inspect metadata tables by adding the metadata table name after the original table name.

Example:

SELECT * FROM spark_catalog.default.transactions.snapshots;

Sample output:

2024-06-21 14:30:36.001 7505187482383765765     NULL    append  hdfs://adh/apps/hive/warehouse/transactions/metadata/snap-7505187482383765765-1-be783958-0d56-4b2c-b002-54cdae7a349d.avro       {"changed-partition-count":"0","total-data-files":"0","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"0","total-position-deletes":"0","total-records":"0"}
2024-06-21 16:09:08.645 5529064852849657419     NULL    append  hdfs://adh/apps/hive/warehouse/transactions/metadata/snap-5529064852849657419-1-af755165-d1e2-476e-85a9-40ea179c2e9f.avro       {"added-data-files":"2","added-files-size":"2522","added-records":"9","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2522","total-position-deletes":"0","total-records":"9"}

Write data

In Spark3, you can write data to Iceberg tables using traditional SQL syntax like INSERT INTO, INSERT OVERWRITE, UPDATE, and so on. Also, Iceberg supports row-level operations like MERGE INTO and DELETE FROM, which are demonstrated below. When writing to an Iceberg table, Spark data types are converted to Iceberg types according to the Spark types conversion table. For more details on Iceberg write operations, see Iceberg documentation.

MERGE INTO

 

MERGE INTO is a row-level update operation that modifies a target table using a set of updates from a source table. The row update condition is defined by the ON clause that is used similarly to a JOIN condition.

The MERGE INTO command syntax is shown below:

MERGE INTO <target_table>
   USING <source_table>
   ON <merge_condition>
   WHEN MATCHED <matched_condition> THEN <matched_action> |
   WHEN NOT MATCHED THEN <not_matched_action>

The updates to rows in the target table are defined using the WHEN [NOT] MATCHED <matched_condition> THEN <matched_action>. Target rows can be updated and deleted, and the source rows that do not match can be inserted. Consider the following example that demonstrates MERGE INTO usage.

Assume there are two Iceberg tables with the following contents:

spark_catalog.default.transactions spark_catalog.default.transactions_copy
txn_id  acc_id  txn_value txn_date
1       1001    25.0      2023-02-03
2       1001    50.0      2023-03-03
3       1002    10.0      2023-04-01
txn_id  acc_id  txn_value txn_date
1       1003    20.0      2023-02-03
3       1001    50.0      2023-03-03
5       1002    100.0     2023-04-02

The following MERGE INTO command deletes all rows in the spark_catalog.default.transactions_copy table that have a match in the spark_catalog.default.transactions table by the txn_id field.

MERGE INTO spark_catalog.default.transactions_copy USING spark_catalog.default.transactions
  ON spark_catalog.default.transactions_copy.txn_id = spark_catalog.default.transactions.txn_id
  WHEN MATCHED THEN DELETE;

After running this command, the tables contents becomes as follows.

spark_catalog.default.transactions spark_catalog.default.transactions_copy
txn_id  acc_id  txn_value txn_date
1       1001    25.0      2023-02-03
2       1001    50.0      2023-03-03
3       1002    10.0      2023-04-01
txn_id  acc_id  txn_value txn_date
5       1002    100.0     2023-04-02

The following MERGE INTO command updates all rows in the spark_catalog.default.transactions table whose acc_id field matches acc_id in the spark_catalog.default.transactions_copy table. For rows with matched acc_id values, the query sets txn_value=0.

MERGE INTO spark_catalog.default.transactions USING spark_catalog.default.transactions_copy
  ON spark_catalog.default.transactions.acc_id = spark_catalog.default.transactions_copy.acc_id
  WHEN MATCHED THEN UPDATE SET txn_value=0;

After running this command, the tables contents becomes as follows.

spark_catalog.default.transactions spark_catalog.default.transactions_copy
txn_id  acc_id  txn_value txn_date
1       1001    25.0      2023-02-03
2       1001    50.0      2023-03-03
3       1002    10.0      2023-04-01
txn_id  acc_id  txn_value txn_date
5       1002    0.0       2023-04-02
DELETE FROM

 
DELETE FROM is a row-level operation to remove data from Iceberg tables. Delete queries accept a filter to match rows to delete, as shown below. If a delete filter matches the entire table partition, Iceberg will only modify the table’s metadata. If the filter matches individual rows, Iceberg will rewrite only the affected data files.

Examples:

DELETE FROM spark_catalog.default.transactions
WHERE txn_date > '2023-03-03';
DELETE FROM spark_catalog.default.transactions
WHERE txn_value < (SELECT min(txn_value) FROM spark_catalog.default.transactions_copy);

Branch writes

To write data to a specific Iceberg table’s branch, you have to specify a branch identifier in your query.

NOTE
The target branch must be created before performing a write.

For example, to write data to the branch named testbranch, use the following syntax:

INSERT INTO spark_catalog.default.transactions.branch_testbranch VALUES
(1, 1003, 20.00, cast('2023-02-03' as date));

If a branch name includes non-alphanumeric characters (e.g. test-branch), the branch name must be back-quotted as shown in the example below.

INSERT INTO spark_catalog.default.transactions.`branch_test-branch` VALUES
(1, 1003, 20.00, cast('2023-02-03' as date));

To read data from the testbranch created above, use the syntax:

SELECT * FROM spark_catalog.default.transactions
VERSION AS OF 'testbranch';

Apart from writing data to a branch using INSERT, you can also use UPDATE, DELETE, and MERGE INTO operations.

Examples:

UPDATE spark_catalog.default.transactions.branch_testbranch
SET acc_id = 1005
WHERE acc_id = 1003
DELETE FROM spark_catalog.default.transactions.branch_testbranch
WHERE txn_id = 1;
MERGE INTO spark_catalog.default.transactions.branch_testbranch USING spark_catalog.default.transactions_copy
  ON spark_catalog.default.transactions.branch_testbranch.txn_id = spark_catalog.default.transactions_copy.txn_id
  WHEN MATCHED THEN DELETE;

Roll back

Iceberg supports the rollback function that allows rolling a table back to an older snapshot. This feature can be useful, for example, for undoing incidental or mistaken table updates. Table data can be rolled back to the desired state as long as the target snapshot is not expired. Consider the following use case that demonstrates the rollback feature usage.

  1. Create a test Iceberg table with the following contents:

    txn_id  acc_id  txn_value txn_date
    1       1002    10.0      2024-01-01
    2       1001    20.0      2024-01-02

    For this, execute the following queries in spark3-sql:

    CREATE TABLE spark_catalog.default.transactions_rollback_demo (
        txn_id int,
        acc_id int,
        txn_value double,
        txn_date date)
    USING iceberg;
    INSERT INTO spark_catalog.default.transactions_rollback_demo VALUES
    (1, 1002, 10.00, cast('2024-01-01' as timestamp)),
    (2, 1001, 20.00, cast('2024-01-02' as timestamp));
  2. Delete a single row from the test table. This operation will be rolled back on further steps.

    DELETE FROM spark_catalog.default.transactions_rollback_demo
    WHERE txn_id = 2;
  3. Get information about the snapshots currently available for the test table. For this, query the metadata database as shown below:

    SELECT * FROM spark_catalog.default.transactions_rollback_demo.snapshots;

    Sample output:

    2024-07-17 07:59:56.31  6142059163774284478     NULL    append  hdfs://adh/apps/hive/warehouse/transactions_rollback_demo/metadata/snap-6142059163774284478-1-c2c3b442-648b-4e94-8b14-594ec65f66c5.avro{"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1720628013974_0014","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"}
    2024-07-17 08:02:55.859 8329113954968361656     6142059163774284478     delete  hdfs://adh/apps/hive/warehouse/transactions_rollback_demo/metadata/snap-8329113954968361656-1-375044c0-90fa-44f5-a6c6-72ebc12cc473.avro {"changed-partition-count":"1","deleted-data-files":"1","deleted-records":"1","removed-files-size":"1127","spark.app.id":"application_1720628013974_0015","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"1127","total-position-deletes":"0","total-records":"1"}

    The output indicates that there are two snapshots available for the test table. The first snapshot was created right after inserting data, and the second — after the delete operation.

  4. Roll back the table contents to a state before the delete operation:

    CALL spark_catalog.system.rollback_to_snapshot('spark_catalog.default.transactions_rollback_demo', <snapshot-id>);

    Where <snapshot-id> is the ID of the snapshot created before the delete request.

    TIP
    Alternatively, you can provide a timestamp to roll back to the required snapshot.
  5. Select data from the test table to verify the rollback results.

    SELECT * FROM spark_catalog.default.transactions_rollback_demo;
    1       1002    10.0    2023-01-01
    2       1001    20.0    2023-01-02

    The output indicates that the deleted row has been restored.

Found a mistake? Seleсt text and press Ctrl+Enter to report it