Work with Iceberg tables in Spark
Apache Iceberg is an open, high-performance format for large analytic tables. The ADH Spark3 service adopts this format allowing you to work with Iceberg tables through Spark.
NOTE
Spark3 supports Iceberg tables starting ADH 3.2.4.3.
|
The support for Iceberg tables is enabled by default in the Spark3 ADH service. In this article, the Spark SQL module is used to demonstrate the examples of working with Iceberg tables through Spark. The Spark SQL module allows running traditional ANSI SQL commands on Spark DataFrame objects.
You can run Spark SQL queries in the spark3-sql shell. For this, run the command on a host where the Spark3 Client ADH component is installed.
$ spark3-sql
In this article, all SQL examples are designed for spark3-sql shell.
Spark catalogs
The Iceberg architecture introduces the concept of catalogs to the Spark API, allowing Spark to manage and load Iceberg tables by name.
An Iceberg catalog is a named repository of tables and their metadata, which are organized into namespaces.
Catalog names and namespaces are used in Spark SQL queries to qualify tables that belong to different catalogs, similarly to SELECT … FROM <database_name>.<table_name>
notation.
The following syntax is used to fully qualify a specific Iceberg table.
SELECT ... FROM <catalog_name>.<namespace>.<table_name> ...
Iceberg offers several catalog backends to track tables, such as REST, Hive, JDBC, etc. An example of creating a Hive-based Iceberg catalog that loads tables from a Hive Metastore is shown below:
$ spark3-sql
--conf spark.sql.catalog.test_catalog_hive=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.test_catalog_hive.type=hive
--conf spark.sql.catalog.test_catalog_hive.warehouse=hdfs://ka-adh-1.ru-central1.internal:8020/user/admin/test_warehouse
To load tables from the catalog created above, you should prefix the table names with test_catalog_hive
.
NOTE
Catalog configuration properties specified using --conf notation in the example above can also be set in ADCM (Clusters → <clusterName> → Services → Spark3 → Primary Configuration → Custom spark-defaults.conf).
|
Iceberg provides two catalog implementations:
-
org.apache.iceberg.spark.SparkCatalog
. Allows working only with Iceberg tables using Hive Metastore or a Hadoop warehouse directory. -
org.apache.iceberg.spark.SparkSessionCatalog
. Adds support for Iceberg tables to the built-in Spark catalog. When working with non-Iceberg tables,SparkSessionCatalog
delegates interaction to the built-in Spark catalog. In Spark3 service, this implementation is used by default, allowing you to work with both Iceberg and non-Iceberg tables.
In Spark3, the default catalog is named spark-catalog
.
This catalog supports single-level namespaces that correspond to Hive database names.
Throughout this article, the examples use the catalog named spark-catalog
and the default
namespace.
Catalog configuration properties
Iceberg catalogs can be configured using spark.sql.catalog.<catalog-name>.*
properties.
Common configuration properties for Hive and Hadoop are presented in the table below.
Property | Description |
---|---|
spark.sql.catalog.<catalog-name>.type |
Defines the underlying Iceberg catalog implementation. The following values are supported:
|
spark.sql.catalog.<catalog-name>.catalog-impl |
A custom Iceberg catalog implementation.
If |
spark.sql.catalog.<catalog-name>.io-impl |
A custom FileIO implementation |
spark.sql.catalog.<catalog-name>.metrics-reporter-impl |
A custom MetricsReporter implementation |
spark.sql.catalog.<catalog-name>.default-namespace |
The default current namespace for the catalog.
The default value is |
spark.sql.catalog.<catalog-name>.uri |
Hive Metastore URL (thrift://host:port) for Hive-based catalogs or REST URL for REST-based catalogs |
spark.sql.catalog.<catalog-name>.warehouse |
The base path to the warehouse directory. For example, hdfs://nn:8020/warehouse/path |
spark.sql.catalog.<catalog-name>.cache-enabled |
Defines whether to enable catalog caching.
Defaults to |
spark.sql.catalog.<catalog-name>.cache.expiration-interval-ms |
Sets a timeout after which cached catalog entries are expired.
This parameter is effective only if |
spark.sql.catalog.<catalog-name>.table-default.propertyKey |
Sets a default value for an Iceberg table property with |
spark.sql.catalog.<catalog-name>.table-override.propertyKey |
Sets a value for an Iceberg table property with |
For more information on creating and configuring Iceberg catalogs, see Iceberg documentation.
DDL commands
Create an Iceberg table
To create an Iceberg table, append the USING iceberg
clause to the standard CREATE TABLE
statement.
Example:
CREATE TABLE spark_catalog.default.transactions(
txn_id int,
acc_id int,
txn_value double,
txn_date date)
USING iceberg;
Run the DESCRIBE FORMATTED
command to view information about the newly created table:
DESCRIBE FORMATTED spark_catalog.default.transactions;
Sample output:
txn_id int acc_id int txn_value double txn_date date # Metadata Columns _spec_id int _partition struct<> _file string _pos bigint _deleted boolean # Detailed Table Information Name spark_catalog.default.transactions Type MANAGED Location hdfs://adh/apps/hive/warehouse/transactions Provider iceberg Owner hdfs Table Properties [current-snapshot-id=none,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
The Provider
value in the output indicates that the table has been created as an Iceberg table.
Create a partitioned table
To create a partitioned table, use the syntax as shown below:
CREATE TABLE spark_catalog.default.transactions_partitioned (
txn_id bigint,
acc_id int,
txn_value double,
txn_date date)
USING iceberg
PARTITIONED BY (acc_id);
You can also use transform expressions in the PARTITIONED BY
clause to create hidden partitions instead of specifying partitions explicitly.
An example is below.
CREATE TABLE spark_catalog.default.transactions_partitioned_transform (
id bigint,
acc_id int,
txn_value double,
txn_date timestamp)
USING iceberg
PARTITIONED BY (day(txn_date)); (1)
1 | day(txn_date) is a transformation expression to create partitions by day. |
To verify the partitioning, insert some data into the test table as shown below:
INSERT INTO spark_catalog.default.transactions_partitioned_transform VALUES
(1, 1002, 10.00, cast('2023-01-01' as timestamp)),
(2, 1001, 20.00, cast('2023-01-02' as timestamp));
Then, check the warehouse directory contents in HDFS:
$ hdfs dfs -ls -R /apps/hive/warehouse/transactions_partitioned_transform
Sample output:
drwxrwxr-x - hdfs hadoop 0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data drwxrwxr-x - hdfs hadoop 0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-01 -rw-r--r-- 3 hdfs hadoop 1206 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-01/00000-12-a029426f-ca10-4efc-b87c-389694c218bc-0-00001.parquet drwxrwxr-x - hdfs hadoop 0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-02 -rw-r--r-- 3 hdfs hadoop 1206 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/data/txn_date_day=2023-01-02/00000-12-a029426f-ca10-4efc-b87c-389694c218bc-0-00002.parquet drwxrwxr-x - hdfs hadoop 0 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata -rw-r--r-- 3 hdfs hadoop 1381 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/00000-28c5f350-f14a-4fcb-b509-10a327fa19cb.metadata.json -rw-r--r-- 3 hdfs hadoop 2519 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/00001-7c83b7d2-0a43-463e-b180-6589a0f696af.metadata.json -rw-r--r-- 3 hdfs hadoop 7157 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/28884384-0539-4ca5-8dcb-95d00de4419e-m0.avro -rw-r--r-- 3 hdfs hadoop 4264 2024-06-19 16:06 /apps/hive/warehouse/transactions_partitioned_transform/metadata/snap-2743337418634934372-1-28884384-0539-4ca5-8dcb-95d00de4419e.avro
According to the sample output, the test data has been distributed among several HDFS directories (partitions) named txn_date_day={date}, each representing a distinct day.
CREATE TABLE using CTAS/RTAS
You can use the CREATE TABLE AS <tbl_name>
(CTAS) and REPLACE TABLE AS <tbl_name>
(RTAS) syntax to create/replace an Iceberg table based on an existing table.
The examples are below.
CREATE TABLE spark_catalog.default.transactions_ctas USING iceberg
AS SELECT txn_id, txn_value
FROM spark_catalog.default.transactions;
REPLACE TABLE spark_catalog.default.transactions_rtas USING iceberg
AS SELECT txn_id, txn_value
FROM spark_catalog.default.transactions
WHERE txn_value < 50;
The table creation/replacement operation using CTAS/RTAS syntax is atomic only when using SparkCatalog. With SparkSessionCatalog implementation (used by default in Spark3), the atomicity of the CTAS/RTAS operations is not guaranteed.
DROP TABLE
To delete an Iceberg table, use the standard syntax:
DROP TABLE <table_name> [PURGE]
The optional PURGE
flag defines whether the table contents should also be deleted.
If PURGE
is not set, the table metadata is only deleted from the catalog.
ALTER TABLE
The Spark3 service supports rich ALTER TABLE
capabilities for Iceberg tables.
Below you can find common ALTER TABLE
operations with examples.
More details about Iceberg ALTER TABLE
commands can be found in Iceberg documentation.
The following command renames an Iceberg table.
The command affects only the table’s metadata, keeping the table data intact.
ALTER TABLE spark_catalog.default.transactions
RENAME TO spark_catalog.default.transactions_new;
The following example sets the write format to ORC using the Iceberg table properties.
ALTER TABLE spark_catalog.default.transactions
SET TBLPROPERTIES ('write.format.default'='orc');
TIP
To clear a table property, use the UNSET command.
|
You can add a column to an existing Iceberg table by using the ADD COLUMN
clause.
Example:
ALTER TABLE spark_catalog.default.transactions
ADD COLUMN (
description string comment "A transaction's description"
);
The DESCRIBE
command output:
DESCRIBE spark_catalog.default.transactions; txn_id int acc_id int txn_value double txn_date date description string A transaction's description
To add multiple columns with one command, use the ADD COLUMNS
clause and separate the columns by a comma.
Example:
ALTER TABLE spark_catalog.default.transactions
ADD COLUMNS (
is_committed boolean,
is_acknowledged boolean
);
The DESCRIBE
command output:
DESCRIBE spark_catalog.default.transactions; txn_id int acc_id int txn_value double txn_date date description string A transaction's description is_committed boolean is_acknowledged boolean
To add a column with a struct data type, such as struct<x, y>
, use the following syntax:
ALTER TABLE spark_catalog.default.transactions
ADD COLUMN processor_metadata struct<x: double, y: double>;
For more details on working with composite data types, like arrays, maps, etc., see Iceberg documentation.
You can change the definition of a column; for example, change the column data type, make the column nullable, add a comment, etc.
Iceberg allows changing the column data type if the update is safe, for example:
-
int
→bigint
-
float
→double
-
decimal(i,j)
→decimal(i1,j)
assumingi1
>i
ALTER TABLE spark_catalog.default.transactions
ALTER COLUMN txn_id TYPE bigint;
The DESCRIBE
command output:
DESCRIBE spark_catalog.default.transactions; txn_id bigint acc_id int txn_value double txn_date date description string A transaction's description is_committed boolean is_acknowledged boolean processor_metadata struct<x:double,y:double>
To make a field nullable, use the syntax as shown below.
ALTER TABLE spark_catalog.default.transactions
ALTER COLUMN acc_id DROP NOT NULL;
However, you cannot change a nullable column to a non-nullable since Iceberg does not know whether there are existing entries with NULL
values.
Iceberg supports partitioning scheme evolution, allowing you to modify partition fields in existing tables.
The modification of a partitioning scheme is a metadata operation that does not affect the existing table data.
New data will be written as per the new partitioning spec, however, the existing data will adhere to the old partitioning layout.
Old data files will have NULL
values for the new partition fields in the metadata tables.
Example:
ALTER TABLE spark_catalog.default.transactions_partitioned
ADD PARTITION FIELD txn_date;
Run the DESCRIBE
command to ensure the partitioning spec has been updated.
The output:
DESCRIBE spark_catalog.default.transactions_partitioned; txn_id bigint acc_id int txn_value decimal(10,2) txn_date date # Partition Information # col_name data_type comment acc_id int txn_date date
You can specify the sorting order for an Iceberg table that is used to automatically sort data written to that table.
For example, the Spark’s MERGE INTO operation uses table ordering.
ALTER TABLE spark_catalog.default.transactions
WRITE ORDERED BY txn_id;
NOTE
The table write order does not guarantee data order when querying data.
It only affects how data is written to the table.
|
To unset the sorting order for a table, use the UNORDERED
keyword as shown in the example below.
ALTER TABLE spark_catalog.default.transactions
WRITE UNORDERED;
You can create branches for an Iceberg table by using the CREATE BRANCH
statement.
Consider the examples below.
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH `test-branch`; (1)
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH IF NOT EXISTS `test-branch` RETAIN 7 DAYS; (2)
ALTER TABLE spark_catalog.default.transactions CREATE OR REPLACE BRANCH `test-branch`; (3)
ALTER TABLE spark_catalog.default.transactions CREATE BRANCH `test-branch` AS OF VERSION 123 (4)
1 | Creates a branch with the default retention policy. |
2 | Creates a branch if it does not exist and sets the retention time to 7 days. |
3 | Creates or replaces the branch if it already exists. |
4 | Creates a branch at the 123 snapshot using the default retention.
The 123 snapshot must be available when running this command. |
To replace and drop branches, use the REPLACE BRANCH and DROP BRANCH commands.
You can create Iceberg tags to "label" the state of a table and return to this table state in the future.
Consider the examples below.
ALTER TABLE spark_catalog.default.transactions CREATE TAG `test-tag`; (1)
ALTER TABLE spark_catalog.default.transactions CREATE TAG IF NOT EXISTS `test-tag` RETAIN 365 DAYS; (2)
ALTER TABLE spark_catalog.default.transactions CREATE OR REPLACE TAG `test-tag`; (3)
ALTER TABLE spark_catalog.default.transactions CREATE TAG `test-tag` AS OF VERSION 123 (4)
1 | Creates a tag with the default retention policy. |
2 | Creates a tag if it does not exist at the current snapshot and sets the retention time to 365 days. |
3 | Creates or replaces the tag if it already exists. |
4 | Creates a tag at the 123 snapshot using the default retention.
The 123 snapshot must be pre-created when running this command. |
Querying the table data using the tag created in the above example looks as follows:
SELECT * FROM spark_catalog.default.transactions
VERSION AS OF 'test-tag';
To replace and drop tags, use the REPLACE TAG and DROP TAG commands.
Query data
To query data from an Iceberg table using Spark SQL, you have to provide a catalog name, namespace, and table name.
Example:
SELECT * FROM spark_catalog.default.transactions;
Where:
-
spark_catalog
is the default catalog name. -
default
is the default namespace. -
transactions
is the requested table name.
To load a table as a DataFrame, use the table()
method.
For example (PySpark):
df = spark.table("spark_catalog.default.transactions")
Time travel reads
Iceberg tables support the time travel feature that allows you to query data from a specific table snapshot that was created at some point in the past and is identified by an ID or timestamp.
TIP
The information about all the snapshots available for a given table is stored in the snapshots metadata table.
|
The following scenario shows how to run time-travel queries through Spark.
-
Create a test table using spark3-sql:
CREATE TABLE spark_catalog.default.transactions_tt( txn_id int, acc_id int, txn_value double, txn_date date) USING iceberg;
-
Insert some data into the test table:
INSERT INTO spark_catalog.default.transactions_tt VALUES (1, 1002, 100.00, cast('2023-01-01' as date)), (2, 1001, 50.00, cast('2023-01-02' as date));
-
Inspect the
snapshot
metadata table:SELECT * FROM spark_catalog.default.transactions_tt.snapshots;
This produces an output similar to the following:
2024-06-27 14:54:05.664 6216997363030961663 NULL append hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-6216997363030961663-1-f55143c4-0351-4f6a-8978-f0c2fa5666b7.avro {"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"}
The output indicates that currently there is one snapshot (
6216997363030961663
) available for the table. This snapshot has been created right after theINSERT
operation. -
Add more test data to the table:
INSERT INTO spark_catalog.default.transactions_tt VALUES (3, 1003, 150.00, cast('2024-01-03' as date));
-
Query the
snapshot
metadata table again:SELECT * FROM spark_catalog.default.transactions_tt.snapshots;
The result:
2024-06-27 14:54:05.664 6216997363030961663 NULL append hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-6216997363030961663-1-f55143c4-0351-4f6a-8978-f0c2fa5666b7.avro {"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"} 2024-06-27 14:56:20.781 3775233323500475562 6216997363030961663 append hdfs://adh/apps/hive/warehouse/transactions_tt/metadata/snap-3775233323500475562-1-6500b975-c67e-4171-8c5d-70623933abc3.avro {"added-data-files":"1","added-files-size":"1126","added-records":"1","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"3380","total-position-deletes":"0","total-records":"3"}
Now the output indicates one more snapshot — the one that was created by the second
INSERT
operation. -
Run the time-travel query using the ID or the timestamp of the older snapshot. You can run time travel queries in SQL using the
TIMESTAMP AS OF <timestamp>
orVERSION AS OF <snapshot-id>
clauses. TheVERSION AS OF
clause can accept a snapshot ID, branch name, or tag name.Example:
SELECT * FROM spark_catalog.default.transactions_tt VERSION AS OF 6216997363030961663; (1) SELECT * FROM spark_catalog.default.transactions_tt TIMESTAMP AS OF '2024-06-27 14:55:20.780'; (2)
1 Selects data using the ID of the older snapshot. 2 Selects data using a timestamp. In this example, the timestamp value is chosen intentionally to pick data from the older snapshot. The output:
1 1002 100.0 2023-01-01 2 1001 50.0 2023-01-02
The result set contains only the data that was present in the table right after the first
INSERT
operation. -
Run an ordinary
SELECT
query:SELECT * FROM spark_catalog.default.transactions_tt;
The output:
1 1002 100.0 2023-01-01 2 1001 50.0 2023-01-02 3 1003 150.0 2024-01-03
Unlike the previous time-travel queries, this result set contains all the data related to both
INSERT
operations.
Metadata tables
Iceberg maintains metadata tables, like history
, files
, snapshots
, and so on.
These tables store valuable information about snapshots, files, operations, etc., that can be useful for time-travel, branching, and tag-related queries.
You can inspect metadata tables by adding the metadata table name after the original table name.
Example:
SELECT * FROM spark_catalog.default.transactions.snapshots;
Sample output:
2024-06-21 14:30:36.001 7505187482383765765 NULL append hdfs://adh/apps/hive/warehouse/transactions/metadata/snap-7505187482383765765-1-be783958-0d56-4b2c-b002-54cdae7a349d.avro {"changed-partition-count":"0","total-data-files":"0","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"0","total-position-deletes":"0","total-records":"0"} 2024-06-21 16:09:08.645 5529064852849657419 NULL append hdfs://adh/apps/hive/warehouse/transactions/metadata/snap-5529064852849657419-1-af755165-d1e2-476e-85a9-40ea179c2e9f.avro {"added-data-files":"2","added-files-size":"2522","added-records":"9","changed-partition-count":"1","spark.app.id":"application_1719409181169_0020","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2522","total-position-deletes":"0","total-records":"9"}
Write data
In Spark3, you can write data to Iceberg tables using traditional SQL syntax like INSERT INTO, INSERT OVERWRITE, UPDATE, and so on.
Also, Iceberg supports row-level operations like MERGE INTO
and DELETE FROM
, which are demonstrated below.
When writing to an Iceberg table, Spark data types are converted to Iceberg types according to the Spark types conversion table.
For more details on Iceberg write operations, see Iceberg documentation.
MERGE INTO
is a row-level update operation that modifies a target table using a set of updates from a source table.
The row update condition is defined by the ON
clause that is used similarly to a JOIN
condition.
The MERGE INTO
command syntax is shown below:
MERGE INTO <target_table>
USING <source_table>
ON <merge_condition>
WHEN MATCHED <matched_condition> THEN <matched_action> |
WHEN NOT MATCHED THEN <not_matched_action>
The updates to rows in the target table are defined using the WHEN [NOT] MATCHED <matched_condition> THEN <matched_action>
.
Target rows can be updated and deleted, and the source rows that do not match can be inserted.
Consider the following example that demonstrates MERGE INTO
usage.
Assume there are two Iceberg tables with the following contents:
spark_catalog.default.transactions | spark_catalog.default.transactions_copy |
---|---|
txn_id acc_id txn_value txn_date 1 1001 25.0 2023-02-03 2 1001 50.0 2023-03-03 3 1002 10.0 2023-04-01 |
txn_id acc_id txn_value txn_date 1 1003 20.0 2023-02-03 3 1001 50.0 2023-03-03 5 1002 100.0 2023-04-02 |
The following MERGE INTO
command deletes all rows in the spark_catalog.default.transactions_copy
table that have a match in the spark_catalog.default.transactions
table by the txn_id
field.
MERGE INTO spark_catalog.default.transactions_copy USING spark_catalog.default.transactions
ON spark_catalog.default.transactions_copy.txn_id = spark_catalog.default.transactions.txn_id
WHEN MATCHED THEN DELETE;
After running this command, the tables contents becomes as follows.
spark_catalog.default.transactions | spark_catalog.default.transactions_copy |
---|---|
txn_id acc_id txn_value txn_date 1 1001 25.0 2023-02-03 2 1001 50.0 2023-03-03 3 1002 10.0 2023-04-01 |
txn_id acc_id txn_value txn_date 5 1002 100.0 2023-04-02 |
The following MERGE INTO
command updates all rows in the spark_catalog.default.transactions
table whose acc_id
field matches acc_id
in the spark_catalog.default.transactions_copy
table.
For rows with matched acc_id
values, the query sets txn_value=0
.
MERGE INTO spark_catalog.default.transactions USING spark_catalog.default.transactions_copy
ON spark_catalog.default.transactions.acc_id = spark_catalog.default.transactions_copy.acc_id
WHEN MATCHED THEN UPDATE SET txn_value=0;
After running this command, the tables contents becomes as follows.
spark_catalog.default.transactions | spark_catalog.default.transactions_copy |
---|---|
txn_id acc_id txn_value txn_date 1 1001 25.0 2023-02-03 2 1001 50.0 2023-03-03 3 1002 10.0 2023-04-01 |
txn_id acc_id txn_value txn_date 5 1002 0.0 2023-04-02 |
DELETE FROM
is a row-level operation to remove data from Iceberg tables.
Delete queries accept a filter to match rows to delete, as shown below.
If a delete filter matches the entire table partition, Iceberg will only modify the table’s metadata.
If the filter matches individual rows, Iceberg will rewrite only the affected data files.
Examples:
DELETE FROM spark_catalog.default.transactions
WHERE txn_date > '2023-03-03';
DELETE FROM spark_catalog.default.transactions
WHERE txn_value < (SELECT min(txn_value) FROM spark_catalog.default.transactions_copy);
Branch writes
To write data to a specific Iceberg table’s branch, you have to specify a branch identifier in your query.
NOTE
The target branch must be created before performing a write.
|
For example, to write data to the branch named testbranch
, use the following syntax:
INSERT INTO spark_catalog.default.transactions.branch_testbranch VALUES
(1, 1003, 20.00, cast('2023-02-03' as date));
If a branch name includes non-alphanumeric characters (e.g. test-branch
), the branch name must be back-quotted as shown in the example below.
INSERT INTO spark_catalog.default.transactions.`branch_test-branch` VALUES
(1, 1003, 20.00, cast('2023-02-03' as date));
To read data from the testbranch
created above, use the syntax:
SELECT * FROM spark_catalog.default.transactions
VERSION AS OF 'testbranch';
Apart from writing data to a branch using INSERT
, you can also use UPDATE
, DELETE
, and MERGE INTO
operations.
Examples:
UPDATE spark_catalog.default.transactions.branch_testbranch
SET acc_id = 1005
WHERE acc_id = 1003
DELETE FROM spark_catalog.default.transactions.branch_testbranch
WHERE txn_id = 1;
MERGE INTO spark_catalog.default.transactions.branch_testbranch USING spark_catalog.default.transactions_copy
ON spark_catalog.default.transactions.branch_testbranch.txn_id = spark_catalog.default.transactions_copy.txn_id
WHEN MATCHED THEN DELETE;
Roll back
Iceberg supports the rollback function that allows rolling a table back to an older snapshot. This feature can be useful, for example, for undoing incidental or mistaken table updates. Table data can be rolled back to the desired state as long as the target snapshot is not expired. Consider the following use case that demonstrates the rollback feature usage.
-
Create a test Iceberg table with the following contents:
txn_id acc_id txn_value txn_date 1 1002 10.0 2024-01-01 2 1001 20.0 2024-01-02
For this, execute the following queries in spark3-sql:
CREATE TABLE spark_catalog.default.transactions_rollback_demo ( txn_id int, acc_id int, txn_value double, txn_date date) USING iceberg;
INSERT INTO spark_catalog.default.transactions_rollback_demo VALUES (1, 1002, 10.00, cast('2024-01-01' as timestamp)), (2, 1001, 20.00, cast('2024-01-02' as timestamp));
-
Delete a single row from the test table. This operation will be rolled back on further steps.
DELETE FROM spark_catalog.default.transactions_rollback_demo WHERE txn_id = 2;
-
Get information about the snapshots currently available for the test table. For this, query the metadata database as shown below:
SELECT * FROM spark_catalog.default.transactions_rollback_demo.snapshots;
Sample output:
2024-07-17 07:59:56.31 6142059163774284478 NULL append hdfs://adh/apps/hive/warehouse/transactions_rollback_demo/metadata/snap-6142059163774284478-1-c2c3b442-648b-4e94-8b14-594ec65f66c5.avro{"added-data-files":"2","added-files-size":"2254","added-records":"2","changed-partition-count":"1","spark.app.id":"application_1720628013974_0014","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2254","total-position-deletes":"0","total-records":"2"} 2024-07-17 08:02:55.859 8329113954968361656 6142059163774284478 delete hdfs://adh/apps/hive/warehouse/transactions_rollback_demo/metadata/snap-8329113954968361656-1-375044c0-90fa-44f5-a6c6-72ebc12cc473.avro {"changed-partition-count":"1","deleted-data-files":"1","deleted-records":"1","removed-files-size":"1127","spark.app.id":"application_1720628013974_0015","total-data-files":"1","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"1127","total-position-deletes":"0","total-records":"1"}
The output indicates that there are two snapshots available for the test table. The first snapshot was created right after inserting data, and the second — after the delete operation.
-
Roll back the table contents to a state before the delete operation:
CALL spark_catalog.system.rollback_to_snapshot('spark_catalog.default.transactions_rollback_demo', <snapshot-id>);
Where
<snapshot-id>
is the ID of the snapshot created before the delete request.TIPAlternatively, you can provide a timestamp to roll back to the required snapshot. -
Select data from the test table to verify the rollback results.
SELECT * FROM spark_catalog.default.transactions_rollback_demo; 1 1002 10.0 2023-01-01 2 1001 20.0 2023-01-02
The output indicates that the deleted row has been restored.