Hive performance tuning

This article describes optimization techniques and best practices that can improve your Hive’s performance.

Partitioning

In Hive parlance, partitioning is a way to split a single large table into smaller tables based on the column values. In a partitioned table, Hive creates a new partition for each distinct column value. For each partition, Hive creates a separate HDFS directory to store partition data instead of writing all the data to one HDFS file.

Partitioning can greatly speed up Hive operations by eliminating the need to scan unnecessary data that might be irrelevant for the current query. Partitioning is most effective when the data is evenly distributed across partitions. If the data is heavily skewed across several partitions, some worker nodes will have much more data to process than others, resulting in inefficient use of cluster resources. Also, the efficiency of partitioning depends on how the partition columns are used. For example, it is recommended to partition a table by those columns frequently used in the GROUP BY operations.

The syntax for creating a partitioned table is as follows:

CREATE TABLE transactions(
    txn_id int, acc_id int, txn_amount decimal(10,2)
 ) PARTITIONED BY (txn_date date);

Run the DESCRIBE command to view the table structure:

DESCRIBE transactions;

The command creates a table partitioned by the txn_date column as indicated in the output:

+--------------------------+----------------+----------+
|         col_name         |   data_type    | comment  |
+--------------------------+----------------+----------+
| txn_id                   | int            |          |
| acc_id                   | int            |          |
| txn_amount               | decimal(10,2)  |          |
| txn_date                 | date           |          |
|                          | NULL           | NULL     |
| # Partition Information  | NULL           | NULL     |
| # col_name               | data_type      | comment  |
| txn_date                 | date           |          |
+--------------------------+----------------+----------+

For every distinct value in the txn_date column, Hive will create a new partition to store rows related to that date. Check the HDFS files created by Hive to store the table data:

$ hdfs dfs -ls /apps/hive/warehouse/transactions

The output:

Found 3 items
drwxr-xr-x   - hive hadoop          0 2024-05-01 00:48 /apps/hive/warehouse/transactions/txn_date=2023-01-01
drwxr-xr-x   - hive hadoop          0 2024-05-01 00:49 /apps/hive/warehouse/transactions/txn_date=2023-01-02
drwxr-xr-x   - hive hadoop          0 2024-05-01 00:49 /apps/hive/warehouse/transactions/txn_date=2023-01-03

Hive creates a subdirectory for each partition and stores the related data there. Such partitioning is very beneficial for further reads since Hive will only need to scan specific partition directories instead of scanning the entire dataset.

NOTE
You should be careful when choosing a column for partitioning because too many partitions may load the NameNode.

You can also create multi-column partitions for more granular splitting of data. The example of creating a table with multiple partitions is shown below.

CREATE TABLE transactions_1 (
    id int, txn_amount decimal(10,2)
 ) PARTITIONED BY (txn_date date, acc_id int);

Check the table partitions using the command:

DESCRIBE transactions_1;

The output:

+--------------------------+----------------+----------+
|         col_name         |   data_type    | comment  |
+--------------------------+----------------+----------+
| id                       | int            |          |
| txn_amount               | decimal(10,2)  |          |
| txn_date                 | date           |          |
| acc_id                   | int            |          |
|                          | NULL           | NULL     |
| # Partition Information  | NULL           | NULL     |
| # col_name               | data_type      | comment  |
| txn_date                 | date           |          |
| acc_id                   | int            |          |
+--------------------------+----------------+----------+

When inserting data to this table, Hive stores the data in HDFS as shown below.

$ hdfs dfs -ls -R /apps/hive/warehouse/transactions_1

The output:

drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01/acc_id=1
-rw-r--r--   3 hive hadoop          8 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01/acc_id=1/000000_0
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01/acc_id=2
-rw-r--r--   3 hive hadoop          9 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-01/acc_id=2/000000_0
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-02
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-02/acc_id=3
-rw-r--r--   3 hive hadoop          8 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-02/acc_id=3/000000_0
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03/acc_id=1
-rw-r--r--   3 hive hadoop          8 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03/acc_id=1/000000_0
drwxr-xr-x   - hive hadoop          0 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03/acc_id=2
-rw-r--r--   3 hive hadoop          8 2024-05-15 17:45 /apps/hive/warehouse/transactions_1/txn_date=2024-01-03/acc_id=2/000000_0

Notice that some of the txn_date={date} HDFS directories might contain the acc_id=N subdirectories that store data partitioned by the acc_id column values.

Bucketing

Similar to partitioning, Hive bucketing (also called clustering) is an optimization technique intended to split a large table into several manageable files which are faster to scan. The main distinction between partitioning and bucketing is that Hive creates new partitions based on table data — one partition per each distinct value in the partition column. Whereas with bucketing you can specify a predefined number of buckets at the time of Hive table creation.

The following table highlights major differences between partitions and buckets.

Partitioning Bucketing

An HDFS directory is created for each partition

An HDFS file is created for each bucket

One or more columns can be used to split the table data into partitions

Only one column is used to split the table into buckets

The number of partitions depends on column values — one partition per one distinct value

You can manage the number of buckets to create by specifying the number explicitly

Created using the PARTITIONED BY clause

Created using the CLUSTERED BY clause

The use of Hive bucketing has the following impact on a Hive cluster:

  • Reduced total data size.

  • Reduced insert speed.

  • Faster selection by the sort key.

  • Joining two tables using sort merge join can be done without pre-sorting.

  • Hive and Spark buckets are incompatible due to different hashing algorithms.

To create a bucketed table, use the following syntax:

CREATE TABLE employees(
    emp_id int,
    first_name string,
    last_name string,
    department_id int
)
CLUSTERED BY (department_id) SORTED BY (emp_id) INTO 5 BUCKETS;

The above command creates a Hive table whose data will be stored in 5 HDFS files. When writing data to the bucketed table, Hive evenly distributes the data among 5 HDFS files (5 is specified in the CLUSTERED BY clause). The data in each bucket (file) is sorted by the emp_id column (SORTED BY clause).

Like with partitioning, keeping data in separate files can reduce total scan time. However, the number of buckets should be reasonable since too many small files can nullify the optimization efforts.

You can use buckets along with partitions for advanced splitting of your data. For example:

CREATE TABLE employees(
    emp_id int,
    first_name string,
    last_name string,
    office_num int
)
PARTITIONED BY (department_id int)
CLUSTERED BY (office_num) INTO 5 BUCKETS;
NOTE
The partition column is specified only in the PARTITIONED BY clause. The bucketing column is defined both in the table column specification and in the CLUSTERED BY clause.

Vectorization

Vectorized query execution is a built-in Hive optimization feature that helps to reduce the CPU load by processing blocks of rows rather than one row at a time. When processing blocks of rows, each column is stored in memory as an array (vector) of primitive data types. Simple arithmetic operations and comparisons are performed on the primitive values from the vector’s contents. This approach reduces numerous condition checks for each column value, allows more efficient CPU utilization with heavy use of caching, and improves overall processing time.

Hive vectorization execution works only for specific data types and operations and has a limited support for user-defined functions.

By default, Hive vectorization is disabled. To use vectorized queries, the following requirements must be met:

  • Use ORC as the storage format.

  • Set hive.vectorized.execution.enabled = true.

To run a vectorized query, follow the steps:

  1. Create a Hive table stored as ORC:

    CREATE TABLE test_table_vectorized (
        id int, value string
    ) STORED AS ORC;
  2. Enable Hive vectorization by setting the property:

    SET hive.vectorized.execution.enabled=true;
  3. Run EXPLAIN EXTENDED for an arbitrary aggregation command:

    EXPLAIN EXTENDED
        SELECT COUNT(*) FROM test_table_vectorized;

The output:

+----------------------------------------------------+
|                      Explain                       |
+----------------------------------------------------+
| STAGE DEPENDENCIES:                                |
|   Stage-1 is a root stage                          |
|   Stage-0 depends on stages: Stage-1               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-1                                   |
|     Tez                                            |
|       DagId: hive_20240504214716_0b61644b-d042-4333-8566-05a678208479:12 |
|       Edges:                                       |
|         Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)    |
|       DagName: hive_20240504214716_0b61644b-d042-4333-8566-05a678208479:12 |
|       Vertices:                                    |
|         Map 1                                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   alias: test_table_vectorized     |
|                   Statistics: Num rows: 2 Data size: 182 Basic stats: COMPLETE Column stats: COMPLETE |
|                   GatherStats: false               |
|                   Select Operator                  |
|                     Statistics: Num rows: 2 Data size: 182 Basic stats: COMPLETE Column stats: COMPLETE |
|                     Group By Operator              |
|                       aggregations: count()        |
|                       mode: hash                   |
|                       outputColumnNames: _col0     |
|                       Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE |
|                       Reduce Output Operator       |
|                         null sort order:           |
|                         sort order:                |
|                         Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE |
|                         tag: -1                    |
|                         value expressions: _col0 (type: bigint) |
|                         auto parallelism: false    |
|             Execution mode: vectorized             |
|             ...
|         Reducer 2                                  |
|             Execution mode: vectorized             |
|             ...
|                                                    |
|   Stage: Stage-0                                   |
|     Fetch Operator                                 |
|       limit: -1                                    |
|       Processor Tree:                              |
|         ListSink                                   |
|                                                    |
+----------------------------------------------------+

The output indicates that certain execution phases have the Execution mode: vectorized label. Setting hive.vectorized.execution.enabled=false will return the execution back to the row-at-a-time manner mode.

Cost-based optimization (CBO)

Hive cost-based optimization (CBO) is a built-in optimization feature that provides optimizations like query rewrites, JOINs re-ordering for more efficient processing, JOIN elimination on AST, etc.

The feature is based on Apache Calcite and comes disabled by default. To enable CBO, follow the steps below.

  1. Set the following Hive configuration properties in ADCM (Clusters → <clusterName> → Services → Hive → Primary configuration):

    • hive.cbo.enable=true

    • hive.compute.query.using.stats=true

    • hive.stats.fetch.column.stats=true

  2. Compute statistics for the table you need. This is required to prepare statistics data for those tables that should be optimized with CBO. The following command refreshes the table statistics in the metastore.

    ANALYZE TABLE transactions COMPUTE STATISTICS;

Use TEZ

Apache TEZ is an execution engine for Hive. It is considered a much more flexible and powerful successor to the MapReduce framework. TEZ should be the preferred choice for Hive computation tasks as it reduces disk access count and improves overall execution time with DAGs.

In ADH, TEZ is available as a separate component (Hive Tez) of the Hive service. To use TEZ, your ADH cluster must have the Hive Tez component installed as a part of the Hive service. If it is installed, TEZ is used as the default Hive execution engine using the hive.execution.engine property.

Use ORC format

Using a suitable storage format is crucial for Hive performance. Optimized row columnar (ORC) is a file format developed specially for Hadoop workloads. A columnar format by nature, it is optimized for column-oriented operations like filtering and aggregations. Compared to other data formats supported by Hive (Parquet, Avro, TextFile, etc.), ORC excels at compression and speed, allowing to save petabytes of data storage.

The default storage format used by Hive is TextFile. To create a Hive table that stores data as ORC, ensure to use the STORED AS ORC clause, for example:

CREATE TABLE test_tbl_orc (
  id int,
  data String
) STORED AS ORC;
Found a mistake? Seleсt text and press Ctrl+Enter to report it