Work with Iceberg tables in Impala
Apache Iceberg is an open, high-performance format for large analytic tables. The ADH Impala service adopts this format allowing you to work with Iceberg tables using SQL and perform analytics over them.
Iceberg catalogs
In Iceberg architecture, a catalog is a named repository of tables and their metadata. Using Impala, you can manage Iceberg tables in the following Iceberg catalogs:
-
HiveCatalog. The default catalog used by Impala. Uses Hive Metastore to store table metadata. This catalog is used in the examples throughout the article.
-
HadoopCatalog. A path-based catalog that assumes that there is a catalog location in HDFS where individual Iceberg tables are stored.
-
HadoopTables. A location-based catalog that assumes that there is a location in HDFS that contains several Iceberg tables.
Also, you can use custom catalogs for existing tables.
However, automatic metadata updates do not work for tables in a custom catalog.
You have to manually call REFRESH
on the table when it changes outside Impala.
DDL commands
Create a table
To create an Iceberg table in the default HiveCatalog, append the STORED AS ICEBERG
clause to the standard Impala CREATE TABLE
statement.
Example:
CREATE TABLE default.impala_ice_test (
txn_id int,
acc_id int,
txn_value double,
txn_date date)
STORED AS ICEBERG;
Run the DESCRIBE FORMATTED
command to retrieve information about the newly created table:
DESCRIBE FORMATTED default.impala_ice_test;
Query: describe formatted impala_ice_test +------------------------------+----------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | name | type | comment | +------------------------------+----------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | # col_name | data_type | comment | | | NULL | NULL | | txn_id | int | NULL | | acc_id | int | NULL | | txn_value | double | NULL | | txn_date | date | NULL | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | admin | NULL | | CreateTime: | Thu Jul 04 10:50:39 UTC 2024 | NULL | | LastAccessTime: | Sun Jan 25 08:15:21 UTC 1970 | NULL | | Retention: | 2147483647 | NULL | | Location: | hdfs://adh/apps/hive/warehouse/impala_ice_test | NULL | | Erasure Coding Policy: | NONE | NULL | | Table Type: | EXTERNAL_TABLE | NULL | | Table Parameters: | NULL | NULL | | | EXTERNAL | TRUE | | | OBJCAPABILITIES | EXTREAD,EXTWRITE | | | accessType | 8 | | | current-schema | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"txn_id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"acc_id\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"txn_value\",\"required\":false,\"type\":\"double\"},{\"id\":4,\"name\":\"txn_date\",\"required\":false,\"type\":\"date\"}]} | | | engine.hive.enabled | true | | | external.table.purge | TRUE | | | metadata_location | hdfs://adh/apps/hive/warehouse/impala_ice_test/metadata/00000-c3406504-fe59-4464-8b1f-01971fc3b949.metadata.json | | | numFiles | 1 | | | snapshot-count | 0 | | | storage_handler | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler | | | table_type | ICEBERG | | | totalSize | 1914 | | | transient_lastDdlTime | 1720090239 | | | uuid | 5495460c-53fe-4fb7-9120-0d5dad7fd022 | | | write.format.default | parquet | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.iceberg.mr.hive.HiveIcebergSerDe | NULL | | InputFormat: | org.apache.iceberg.mr.hive.HiveIcebergInputFormat | NULL | | OutputFormat: | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL | | Compressed: | No | NULL | | Sort Columns: | [] | NULL | | | NULL | NULL | | # Constraints | NULL | NULL | +------------------------------+----------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The table_type
parameter in the output indicates that the table has been created as an Iceberg table.
By default, Impala creates Iceberg tables using the Parquet data format.
To use ORC or AVRO, set the table property write.format.default
as shown in the following example:
CREATE TABLE default.impala_ice_test_orc (
txn_id int,
acc_id int,
txn_value double,
txn_date date)
STORED AS ICEBERG
TBLPROPERTIES('write.format.default'='ORC');
TIP
More Iceberg table properties can be found in the Impala documentation.
|
CREATE TABLE AS SELECT
You can also use the CREATE TABLE AS <tbl_name>
(CTAS) syntax to create an Iceberg table based on an existing table.
CREATE TABLE default.impala_ice_test_ctas
STORED AS ICEBERG
AS SELECT txn_value, txn_date FROM default.impala_ice_test;
DESCRIBE FORMATTED default.impala_ice_test_ctas; +------------------------------+-----------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | name | type | comment | +------------------------------+-----------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | # col_name | data_type | comment | | | NULL | NULL | | txn_value | double | NULL | | txn_date | date | NULL | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | OwnerType: | USER | NULL | | Owner: | admin | NULL | | CreateTime: | Thu Jul 04 11:07:31 UTC 2024 | NULL | | LastAccessTime: | Sun Jan 25 08:32:12 UTC 1970 | NULL | | Retention: | 2147483647 | NULL | | Location: | hdfs://adh/apps/hive/warehouse/impala_ice_test_ctas | NULL | | Erasure Coding Policy: | NONE | NULL | | Table Type: | EXTERNAL_TABLE | NULL | | Table Parameters: | NULL | NULL | | | EXTERNAL | TRUE | | | OBJCAPABILITIES | EXTREAD,EXTWRITE | | | accessType | 8 | | | current-schema | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"txn_value\",\"required\":false,\"type\":\"double\"},{\"id\":2,\"name\":\"txn_date\",\"required\":false,\"type\":\"date\"}]} | | | engine.hive.enabled | true | | | external.table.purge | TRUE | | | metadata_location | hdfs://adh/apps/hive/warehouse/impala_ice_test_ctas/metadata/00000-c3871ac4-ca86-4c6d-be6c-116f3bf7cdf9.metadata.json | | | numFiles | 1 | | | snapshot-count | 0 | | | storage_handler | org.apache.iceberg.mr.hive.HiveIcebergStorageHandler | | | table_type | ICEBERG | | | totalSize | 1531 | | | transient_lastDdlTime | 1720091251 | | | uuid | 5b792d0b-4e67-4a2b-957c-86e41970c977 | | | write.format.default | parquet | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.iceberg.mr.hive.HiveIcebergSerDe | NULL | | InputFormat: | org.apache.iceberg.mr.hive.HiveIcebergInputFormat | NULL | | OutputFormat: | org.apache.iceberg.mr.hive.HiveIcebergOutputFormat | NULL | | Compressed: | No | NULL | | Sort Columns: | [] | NULL | | | NULL | NULL | | # Constraints | NULL | NULL | +------------------------------+-----------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Delete a table
You can delete a table using the DROP TABLE <tbl_name>
command.
Example:
DROP TABLE default.impala_ice_test;
If the table property external.table.purge
is set to true
, the command also deletes the data files from HDFS.
By default, the external.table.purge
property is set to true
when a table is created using the Impala’s CREATE TABLE
command.
If the table is created using CREATE EXTERNAL TABLE
(the table already exists and gets registered in a catalog), this property is set to false
, so DROP TABLE
doesn’t remove table data, but only removes the table from the catalog.
Iceberg V2 tables
The Iceberg V2 table format supports row-level updates for DELETE
and UPDATE
operations using the "merge-on-read" approach.
Using this format, instead of rewriting existing data files, Iceberg creates delete files that store information about the deleted records.
These files actually contain file paths and positions of the deleted rows (position deletes).
To create an Iceberg table that uses the Iceberg V2 format, specify the format-version=2
table property when creating a table.
Example:
CREATE TABLE default.impala_ice_test_v2 (
txn_id int,
acc_id int,
txn_value double,
txn_date date)
STORED AS ICEBERG
TBLPROPERTIES('format-version'='2');
You can also upgrade an existing Iceberg V1 table to the Iceberg V2 version using ALTER TABLE
.
An example is shown below.
ALTER TABLE default.impala_ice_test
SET TBLPROPERTIES('format-version'='2');
Schema evolution
Iceberg supports schema evolution that allows reordering, deleting, or changing columns without affecting the readability of old (pre-evolution) data files.
NOTE
Impala does not support schema evolution for AVRO-formatted tables.
For more information, see Iceberg documentation.
|
Below are Iceberg table schema evolution commands supported by Impala.
The following command renames an Iceberg table.
ALTER TABLE default.impala_ice_test
RENAME TO default.impala_ice_test_new;
You can add a column to an existing Iceberg table as shown below.
ALTER TABLE default.impala_ice_test
ADD COLUMN `comment` string;
Sample output:
+----------------------------+ | summary | +----------------------------+ | Column(s) have been added. | +----------------------------+
You can delete a column in an existing Iceberg table as shown below.
ALTER TABLE default.impala_ice_test
DROP COLUMN `comment`;
Sample output:
+--------------------------+ | summary | +--------------------------+ | Column has been dropped. | +--------------------------+
You can change a column name and the column type, assuming the new type is compatible with the old one.
ALTER TABLE default.impala_ice_test
CHANGE txn_id txn_id BIGINT;
The output:
+--------------------------+ | summary | +--------------------------+ | Column has been altered. | +--------------------------+
Create a partitioned table
With Impala, you can create partitioned Iceberg tables using traditional value-based partitioning. The syntax is straightforward and is shown below:
CREATE TABLE default.impala_ice_test_partitioned (
txn_id int,
txn_value double,
txn_date date)
PARTITIONED BY (acc_id int)
STORED AS ICEBERG;
Also, you can create a partitioned table using one or more partition transforms. An example is shown below.
CREATE TABLE default.impala_ice_test_partitioned_transform (
txn_id int,
acc_id int,
txn_value double,
txn_date date)
PARTITIONED BY SPEC (
BUCKET(3, acc_id), (1)
day(txn_date)) (2)
STORED AS ICEBERG;
1 | The first part of the transformation expression instructs Iceberg to distribute inserted data among three buckets.
For each acc_id value to be inserted, Iceberg calculates a hash and gets mod 3
result (hash(acc_id) mod 3 ) that specifies the destination bucket. |
2 | Then, within each of the 3 buckets, Iceberg creates partitions by date. |
Impala creates the following HDFS directory structure for a table created by the above command.
[admin@ka-adh-2 ~]$ hdfs dfs -ls -R /apps/hive/warehouse drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=0 drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=0/txn_date_day=2023-01-03 -rw-r--r-- 3 impala hadoop 1153 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=0/txn_date_day=2023-01-03/314da83d47eee296-145c34e600000000_1592985449_data.0.parq drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=1 drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=1/txn_date_day=2023-01-02 -rw-r--r-- 3 impala hadoop 1153 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=1/txn_date_day=2023-01-02/314da83d47eee296-145c34e600000000_584411471_data.0.parq drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=2 drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=2/txn_date_day=2023-01-01 -rw-r--r-- 3 impala hadoop 1153 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/data/acc_id_bucket=2/txn_date_day=2023-01-01/314da83d47eee296-145c34e600000000_1148089692_data.0.parq drwxrwxr-x - impala hadoop 0 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/metadata -rw-r--r-- 3 impala hadoop 2385 2024-07-05 14:00 /apps/hive/warehouse/impala_ice_test_partitioned_transform/metadata/00000-b04f03f7-f770-44a1-825b-8e84e4392f41.metadata.json -rw-r--r-- 3 impala hadoop 3564 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/metadata/00001-13512042-67f5-4327-933c-978d41be5df1.metadata.json -rw-r--r-- 3 impala hadoop 6539 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/metadata/24a8910e-e616-4d4b-ace5-46987f83fe9a-m0.avro -rw-r--r-- 3 impala hadoop 3804 2024-07-05 14:01 /apps/hive/warehouse/impala_ice_test_partitioned_transform/metadata/snap-4359281668985942129-1-24a8910e-e616-4d4b-ace5-46987f83fe9a.avro
You can view partitions using the SHOW PARTITIONS
command.
A sample output is below.
+----------------------------------------------+----------------+-----------------+ | Partition | Number Of Rows | Number Of Files | +----------------------------------------------+----------------+-----------------+ | {"acc_id_bucket":"0","txn_date_day":"19360"} | 1 | 1 | | {"acc_id_bucket":"0","txn_date_day":"19361"} | 1 | 1 | | {"acc_id_bucket":"0","txn_date_day":"19723"} | 1 | 1 | | {"acc_id_bucket":"0","txn_date_day":"19725"} | 1 | 1 | | {"acc_id_bucket":"1","txn_date_day":"19359"} | 1 | 1 | | {"acc_id_bucket":"1","txn_date_day":"19360"} | 1 | 1 | | {"acc_id_bucket":"2","txn_date_day":"19358"} | 1 | 1 | | {"acc_id_bucket":"2","txn_date_day":"19361"} | 1 | 1 | | {"acc_id_bucket":"2","txn_date_day":"19392"} | 1 | 1 | +----------------------------------------------+----------------+-----------------+
Iceberg supports partition evolution that allows changing a table’s partitioning spec even without the need of rewriting existing data files.
You can change a table’s partitioning by using the ALTER TABLE SET PARTITION SPEC
statement as shown in the example below.
ALTER TABLE default.impala_ice_test_partitioned
SET PARTITION SPEC (DAY(txn_date));
The output:
+-------------------------+ | summary | +-------------------------+ | Updated partition spec. | +-------------------------+
DML commands
Insert data
With Impala, you can write data to an Iceberg table using the INSERT INTO
and INSERT OVERWRITE
commands.
Example:
INSERT INTO default.impala_ice_test VALUES
(1, 1001, 150.50, '2023-01-04'),
(2, 1002, 200.50, '2023-01-03'),
(3, 1003, 50.00, '2024-01-03');
Using INSERT OVERWRITE
, you can replace data in a table with the result of a query.
For example:
INSERT OVERWRITE TABLE default.impala_ice_test_tmp
SELECT * from default.impala_ice_test;
NOTE
INSERT OVERWRITE is not allowed for tables that use the BUCKET partition transform.
|
Delete data
You can delete data from Iceberg V2 tables using the DELETE
command.
Example:
DELETE FROM default.impala_ice_test
WHERE txn_id = 1;
NOTE
The DELETE operation is supported only for Iceberg V2 tables.
Attempting to delete rows in a V1 table throws an exception.
|
Update data
With Impala, you can update Iceberg V2 tables as shown in the example below.
UPDATE default.impala_ice_test
SET txn_value = txn_value + 100;
NOTE
The UPDATE operation is supported only for Iceberg V2 tables.
Attempting to update rows in a V1 table throws an exception.
|
You can also use the UPDATE FROM
statement to update a target Iceberg table based on a source table (the source can be a non-Iceberg table).
Example:
UPDATE default.impala_ice_test
SET default.impala_ice_test.acc_id = src.acc_id,
default.impala_ice_test.txn_date = src.txn_date,
FROM default.impala_ice_test, default.impala_ice_test_src src
WHERE default.impala_ice_test.txn_id = src.txn_internal_id;
If there are multiple matches in the WHERE
clause, Impala throws an error.
There are several limitations on using the UPDATE FROM
command, like Parquet-only format support, updating partitioned columns issues, and alike.
For more information, see Impala documentation.
Time travel
Iceberg tables support the time travel feature that allows you to query data from a specific table snapshot that was created at some point in the past and can be referenced by an ID or timestamp.
You can run time travel queries using the FOR SYSTEM_TIME AS OF <timestamp>
and FOR SYSTEM_VERSION AS OF <snapshot-id>
clauses.
Several usage examples are presented below.
The following example queries data from the closest snapshot that is older than the specified timestamp:
SELECT * FROM default.impala_ice_test
FOR SYSTEM_TIME AS OF '2024-07-02 12:00:00';
The result:
+--------+--------+-----------+------------+ | txn_id | acc_id | txn_value | txn_date | +--------+--------+-----------+------------+ | 2 | 1002 | 63.5 | 2024-02-04 | | 1 | 1001 | 110.0 | 2023-01-01 | +--------+--------+-----------+------------+
The next query identifies the snapshot to work with by subtracting time units from the current time:
SELECT * FROM default.impala_ice_test
FOR SYSTEM_TIME AS OF now() - interval 1 minute;
The output:
+--------+--------+-----------+------------+ | txn_id | acc_id | txn_value | txn_date | +--------+--------+-----------+------------+ | 2 | 1002 | 63.5 | 2024-02-04 | +--------+--------+-----------+------------+
The following query retrieves data from a snapshot explicitly identified by ID:
SELECT * FROM default.impala_ice_test
FOR SYSTEM_VERSION AS OF 7308000224696874146;
The result:
+--------+--------+-----------+------------+ | txn_id | acc_id | txn_value | txn_date | +--------+--------+-----------+------------+ | 1 | 1001 | 10.0 | 2023-01-01 | +--------+--------+-----------+------------+
To get information about the snapshots available for a given table, use the DESCRIBE HISTORY
command as shown in the following example:
DESCRIBE HISTORY default.impala_ice_test;
Sample output:
+-------------------------------+---------------------+---------------------+---------------------+ | creation_time | snapshot_id | parent_id | is_current_ancestor | +-------------------------------+---------------------+---------------------+---------------------+ | 2024-04-07 21:41:56.959000000 | 7308000224696874146 | NULL | TRUE | | 2024-04-07 21:42:19.537000000 | 3739750708916322181 | 7308000224696874146 | TRUE | | 2024-04-07 22:04:58.577000000 | 3634103131974849338 | 3739750708916322181 | TRUE | +-------------------------------+---------------------+---------------------+---------------------+
Roll back
Whenever an Iceberg table gets modified, Iceberg creates a snapshot for that table.
Using snapshots, you can roll back the table state to some version in the past labeled by the snapshot ID.
When a rollback is done for a table, a new snapshot is created with the same snapshot ID, but with a new creation timestamp.
By using the ALTER TABLE <tbl_name> EXECUTE ROLLBACK
statement, you can roll back a table to a previous snapshot as shown below.
For example, assume there is an Iceberg table with the following contents:
+--------+--------+-----------+------------+ | txn_id | acc_id | txn_value | txn_date | +--------+--------+-----------+------------+ | 2 | 1002 | 63.5 | 2024-02-04 | | 1 | 1001 | 110.0 | 2023-01-01 | +--------+--------+-----------+------------+
The following command rolls back the table state to a specific snapshot using the snapshot ID:
ALTER TABLE default.impala_ice_test
EXECUTE ROLLBACK(7308000224696874146);
After executing this command, running SELECT
returns a result set the table used to store at the moment of creation of the specified snapshot.
For example:
+--------+--------+-----------+------------+ | txn_id | acc_id | txn_value | txn_date | +--------+--------+-----------+------------+ | 1 | 1001 | 10.0 | 2023-01-01 | +--------+--------+-----------+------------+
The following command performs a rollback to a snapshot whose creation timestamp is older than the specified timestamp:
ALTER TABLE default.impala_ice_test
EXECUTE ROLLBACK('2024-07-05 12:00:00');
By default, Iceberg accumulates snapshots until they are deleted by a user command.
You can expire unnecessary snapshots by using the ALTER TABLE … EXECUTE expire_snapshots(<timestamp>)
statement.
Examples:
ALTER TABLE default.impala_ice_test
EXECUTE expire_snapshots('2024-07-05 12:00:00');
ALTER TABLE default.impala_ice_test
EXECUTE expire_snapshots(now() - interval 10 days);
Iceberg data types mapping
The following table describes the mapping of Iceberg data types and SQL types in Impala.
Iceberg type | SQL type in Impala |
---|---|
boolean |
BOOLEAN |
int |
INTEGER |
long |
BIGINT |
float |
FLOAT |
double |
DOUBLE |
decimal(P, S) |
DECIMAL(P, S) |
date |
DATE |
time |
— |
timestamp |
TIMESTAMP |
timestamptz |
Only read support via TIMESTAMP |
string |
STRING |
uuid |
— |
fixed(L) |
— |
binary |
— |
struct |
STRUCT (read only) |
list |
ARRAY (read only) |
map |
MAP (read only) |