Hive replication

Overview

Replication in the context of Hive is the process of duplicating Hive data from one Hive warehouse to another warehouse on a different ADH cluster. The main goal of replication is to have an up-to-date replica of a Hive database that includes all the changes made to the source database. The unit of replication can be an entire Hive database, a table, or a partition.

This article highlights the built-in Hive replication functions, which are available in ADH out-of-the-box, describes core Hive replication concepts and the replication commands. The step-by-step example walks you through the major replication phases and provides guidance on the proper sequence of actions to be performed in order to keep two databases in sync using Hive replication.

The replication approach discussed in this article is fully manual, which is not the case for a real production environment. Automating certain replication steps assumes the use of external tools or custom software that would be responsible for coordinating the proper sequence of replication commands, fault tolerance/failure handling, and configuration options. The information and examples provided in the article will give you insights on building a custom framework to automate the replication steps.

How it works

Review the following replication concepts that make up the core of the Hive replication mechanism.

Events and notification listeners

A notification listener is a pluggable Java class that is responsible for catching the updates to Hive data/metadata (events) and logging these updates somewhere for further processing. The listener class is set using the hive.metastore.transactional.event.listeners property in ADCM (Clusters → <clusterName> → Services → Hive → Primary Configuration → hive-site.xml). By default, Hive uses the org.apache.hive.hcatalog.listener.DbNotificationListener listener that stores Hive modification events in the Hive Metastore database (the hive.NOTIFICATION_LOG table). For any CRUD operation on a Hive database, the listener creates corresponding records, as shown in the example below.

Sample NOTIFICATION_LOG contents
MariaDB [hive]> select * from NOTIFICATION_LOG \G
*************************** 1. row ***************************
         NL_ID: 26
      EVENT_ID: 25
    EVENT_TIME: 1713876508
    EVENT_TYPE: ALTER_TABLE
      CAT_NAME: hive
       DB_NAME: default
      TBL_NAME: demo
       MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjBeforeJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1712916672\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","tableObjAfterJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","isTruncateOp":"false","timestamp":1713876508}
MESSAGE_FORMAT: json-0.2
*************************** 2. row ***************************
         NL_ID: 27
      EVENT_ID: 26
    EVENT_TIME: 1713876508
    EVENT_TYPE: INSERT
      CAT_NAME: hive
       DB_NAME: default
      TBL_NAME: demo
       MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","ptnObjJson":null,"timestamp":1713876508,"replace":"false","files":["hdfs://adh2/apps/hive/warehouse/demo/000000_0_copy_3###"]}
MESSAGE_FORMAT: json-0.2
*************************** 3. row ***************************
         NL_ID: 28
      EVENT_ID: 27
    EVENT_TIME: 1713876508
    EVENT_TYPE: ALTER_TABLE
      CAT_NAME: hive
       DB_NAME: default
      TBL_NAME: demo
       MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjBeforeJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","tableObjAfterJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"27\",\"numRows\":\"4\",\"rawDataSize\":\"23\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"4\",\"transient_lastDdlTime\":\"1713876508\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","isTruncateOp":"false","timestamp":1713876508}
MESSAGE_FORMAT: json-0.2

The MESSAGE column stores the information about operations applied to the corresponding Hive database/table, the state of data before and after the operation, and so on. The EVENT_ID labels each update with an identifier (similarly to Git commits). This allows Hive to granularly reproduce any specific data update operation (for example, the results of one or more INSERT commands) on a different ADH cluster, thus making the replication possible.

TIP
The event records generated by listeners can be continuously parsed to track the updates to Hive entities in real time.

Bootstrap vs incremental replication

For any replication case, there are two types of data replication phases described below.

  • Bootstrap. This is the initial replication activity that assumes the full copy of data from the source Hive database to the target cluster. This operation runs once at the very beginning of the replication flow. Further updates to the replicated entity are chained relative to the initial bootstrap dump.

  • Incremental. All the subsequent updates to the source entity after the bootstrap phase assume transferring only portions of data (deltas) to synchronize the diffs between the replicated entities.

Replication commands

Hive provides the following commands to orchestrate the replication flow.

REPL DUMP

The REPL DUMP command creates a dump directory with Hive data/metadata in HDFS. The command runs on the source cluster.

TIP
You might want to run this command by a cron schedule for periodic dumps or run the command when a specific update occurs in the source entity.

Syntax:

REPL DUMP <repl_policy>
    [FROM <evid-start> [TO <end-evid>] [LIMIT <num-evids>] ]
    [WITH ('key1'='value1', 'key2'='value2')];

<repl_policy> defines an object to be dumped (Hive database, table, or partition) with include/exclude filters. It has the following syntax:

<dbname>{{.[<comma_separated_include_tables_regex_list>]}{.[<comma_separated_exclude_tables_regex_list>]}}
NOTE
Hive table/view names are case-insensitive by nature, so the table names test_table and TEST_TABLE refer to the same table in terms of include/exclude filters.

The FROM <evid-start> [TO <end-evid>] expression allows you to create incremental dumps. That is, you can specify a range of events to dump only those pieces of data related to the given events.

The REPL commands have the optional WITH clause used to set Hive replication parameters. Configuration parameters specified in this way are effective only for a single REPL …​ query and are not used for other queries running in the same session. For example, WITH("hive.repl.rootdir", "/user/hive/repl/") can be used to explicitly specify an HDFS directory to store the dump files.

Running REPL DUMP returns a result set like the one below:

+------------------------------------------------------+---------------+
|                      dump_dir                        | last_repl_id  |
+------------------------------------------------------+---------------+
| /user/hive/repl/eabe08dd-a524-4b8e-9520-e924cac73761 | 77            |
+------------------------------------------------------+---------------+

Where:

  • dump_dir — an HDFS directory with the dump data.

  • last_repl_id — an ID that represents the last state of the database/table when it was dumped. When several dumps are created one-by-one, these IDs are needed to identify the dumps (and the data they carry) relatively to each other.

Below are several REPL DUMP usage examples with comments.

REPL DUMP hr; (1)
REPL DUMP hr WITH("hive.repl.rootdir", "/custom/hdfs/dir"); (2)
REPL DUMP hr.['employees', '[a-z]+']; (3)
REPL DUMP hr.['.*?'].['Q3[0-9]+', 'Q4']; (4)
REPL DUMP hr FROM 100; (5)
REPL DUMP hr FROM 100 to 1000 (6)
REPL DUMP hr.['[a-z]+'] REPLACE hr FROM 200 (7)
1 Creates a bootstrap dump of the entire hr database, including all tables/views.
2 Specifies a custom HDFS directory to store the dump files.
3 Dumps the table/view named employees as well as any tables/views that match the [a-z]+ regular expression, for example, departments, accounts, and so on.
4 Dumps tables/views with any name except Q4 and except those that have the Q3 prefix followed by a numeric string of any length (for example, Q3100, Q320).
5 Creates an incremental dump starting from the specific event. This means that the resulting dump will contain only the updates made to the source entity after the specified event.
6 Creates an incremental dump that includes data updates that fall into the specified range of events (100—​1000).
7 Dynamically changes the replication policy for the current incremental phase. The hr replication policy is replaced to include only tables/views whose names consist of alphabetic characters. Further loading of such a dump will automatically drop the tables which are excluded as per the new policy.

REPL LOAD

The REPL LOAD command is executed on the target cluster to load dump files from HDFS into Hive.

Syntax:

REPL LOAD <db_name>
    FROM <dir_name>
    [WITH ('key1'='value1', 'key2'='value2')];

If <dbname> is specified and the dump was created as a database-level dump, this allows Hive to rename the database on import. If <dbname> is not set, the original database name (as recorded in the dump) is used. The command’s output contains detailed information about the imported entities.

Sample output
INFO  : Completed compiling command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 0.019 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983): REPL LOAD demo_repl_db_replica FROM '/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804'
INFO  : Starting task [Stage-0:REPL_BOOTSTRAP_LOAD] in serial mode
INFO  : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadStartTime":1716280801}
INFO  : Root Tasks / Total Tasks : 1 / 8
INFO  : completed load task run : 1
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Starting task [Stage-1:DDL] in serial mode
INFO  : Starting task [Stage-2:DDL] in serial mode
INFO  : Starting task [Stage-3:COPY] in serial mode
INFO  : Copying data from hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804/demo_repl_db/demo_repl_tbl/data to hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003
INFO  : Starting task [Stage-4:MOVE] in serial mode
INFO  : Loading data to table demo_repl_db_replica.demo_repl_tbl from hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003
INFO  : Starting task [Stage-5:DDL] in serial mode
INFO  : Starting task [Stage-6:REPL_STATE_LOG] in serial mode
INFO  : REPL::TABLE_LOAD: {"dbName":"demo_repl_db_replica","tableName":"demo_repl_tbl","tableType":"MANAGED_TABLE","tablesLoadProgress":"1/1","loadTime":1716280802}
INFO  : Starting task [Stage-7:REPL_STATE_LOG] in serial mode
INFO  : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadEndTime":1716280802,"dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","lastReplId":"152"}
INFO  : Starting task [Stage-8:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 1.033 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager

REPL STATUS

The REPL STATUS command runs on the target cluster and returns the last replicated event_id of the target database/table. Using REPL STATUS, you can know the state up to which your target entity has been replicated.

TIP
Typically, event_id returned by this command is used to construct the next REPL DUMP command for incremental replication.

Syntax:

REPL STATUS <db_name>;

Sample output:

+---------------+
| last_repl_id  |
+---------------+
| 180           |
+---------------+

Replication process example

The following scenario walks you through major replication steps and shows how to keep two Hive databases in sync using replication commands. The steps described below can be (and actually should be) automated and adjusted to your business needs to become a full-fledged automatic replication utility.

  1. Create a test Hive database.

    DROP DATABASE IF EXISTS demo_repl_db;
    CREATE DATABASE demo_repl_db;
    USE demo_repl_db;
  2. In the test database, create a sample table and populate it with dummy data.

    CREATE TABLE demo_repl_tbl (
        `txn_id` int,
        `acc_id` int,
        `txn_amount` decimal(10,2),
        `txn_date` date);
    INSERT INTO demo_repl_tbl VALUES
    (1, 1002, 10.00, '2024-01-01'),
    (2, 1002, 20.00, '2024-01-03'),
    (3, 1002, 30.00, '2024-01-02'),
    (4, 1001, 100.50, '2024-01-02'),
    (5, 1001, 150.50, '2024-01-04'),
    (6, 1001, 200.50, '2024-01-03'),
    (7, 1003, 50.00, '2024-01-03'),
    (8, 1003, 50.00, '2024-01-01'),
    (9, 1003, 75.00, '2024-01-04');
  3. Set the test database as a replication source and specify the replication policy using the command:

    ALTER DATABASE demo_repl_db
    SET DBPROPERTIES ("repl.source.for"="testrepl");

    Where testrepl is an arbitrary string used to logically identify the current replication session.

  4. Create a bootstrap dump of the entire database using the command:

    REPL DUMP demo_repl_db;

    The command returns a result set with two columns. A sample output is below.

    +------------------------------------------------------+---------------+
    |                      dump_dir                        | last_repl_id  |
    +------------------------------------------------------+---------------+
    | /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106 | 120           |
    +------------------------------------------------------+---------------+

    Where:

    • last_repl_id indicates the state of the created dump, i.e. it reflects the events that have been included in the dump; this ID will be needed on the next dump iteration.

    • dump_dir is an HDFS directory where the dump is stored.

    Check the dump_dir contents using the command:

    $ hdfs dfs -ls -R <dump_dir>

    Sample output:

    -rw-r--r--   3 hive hadoop         41 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/_dumpmetadata
    drwxr-xr-x   - hive hadoop          0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db
    -rw-r--r--   3 hive hadoop        257 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/_metadata
    drwxr-xr-x   - hive hadoop          0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl
    -rw-r--r--   3 hive hadoop       1653 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/_metadata
    drwxr-xr-x   - hive hadoop          0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/data
    -rw-r--r--   3 hive hadoop         73 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/data/_files

    The HDFS directory contains several _metadata files that store Hive database/table metadata and the file named _files. The latter stores a pointer to the Hive warehouse location where actual table data is stored.

  5. Load the dump into Hive on the target cluster. You can use distcp to copy the dump directory between the clusters. If you load the dump into the same ADH cluster, be sure to use a different name for the target database. This approach is described further in the article.

    REPL LOAD demo_repl_db_replica FROM '<dump_dir>';

    This loads the bootstrap dump into Hive, disregarding events. The output contains detailed information about imported entities, events, load type, etc.

    Sample output
    INFO  : Completed compiling command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 0.019 seconds
    INFO  : Concurrency mode is disabled, not creating a lock manager
    INFO  : Executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983): REPL LOAD demo_repl_db_replica FROM '/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804'
    INFO  : Starting task [Stage-0:REPL_BOOTSTRAP_LOAD] in serial mode
    INFO  : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadStartTime":1716280801}
    INFO  : Root Tasks / Total Tasks : 1 / 8
    INFO  : completed load task run : 1
    INFO  : Starting task [Stage-0:DDL] in serial mode
    INFO  : Starting task [Stage-1:DDL] in serial mode
    INFO  : Starting task [Stage-2:DDL] in serial mode
    INFO  : Starting task [Stage-3:COPY] in serial mode
    INFO  : Copying data from hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804/demo_repl_db/demo_repl_tbl/data to hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003
    INFO  : Starting task [Stage-4:MOVE] in serial mode
    INFO  : Loading data to table demo_repl_db_replica.demo_repl_tbl from hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003
    INFO  : Starting task [Stage-5:DDL] in serial mode
    INFO  : Starting task [Stage-6:REPL_STATE_LOG] in serial mode
    INFO  : REPL::TABLE_LOAD: {"dbName":"demo_repl_db_replica","tableName":"demo_repl_tbl","tableType":"MANAGED_TABLE","tablesLoadProgress":"1/1","loadTime":1716280802}
    INFO  : Starting task [Stage-7:REPL_STATE_LOG] in serial mode
    INFO  : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadEndTime":1716280802,"dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","lastReplId":"152"}
    INFO  : Starting task [Stage-8:DDL] in serial mode
    INFO  : Completed executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 1.033 seconds
    INFO  : OK
    INFO  : Concurrency mode is disabled, not creating a lock manager

    As a result of running the command, Hive creates a new database.

    +-----------------------+
    |     database_name     |
    +-----------------------+
    | default               |
    | demo_repl_db          |
    | demo_repl_db_replica  | <-
    +-----------------------+

    At this moment, the source and target Hive databases have identical contents.

  6. Insert some data to the source table:

    INSERT INTO demo_repl_tbl VALUES
    (10, 1003, 60.00, '2024-01-03'),
    (11, 1003, 50.00, '2024-01-05'),
    (12, 1003, 75.00, '2024-01-05');
  7. To sync the inserts with the target database, create one more dump using the following command.

    REPL DUMP demo_repl_db
    FROM <last_repl_id>; (1)
    1 Instead of <last_repl_id>, use the ID returned by the previous REPL DUMP operation.

    The presence of the FROM <last_repl_id> clause makes the dump incremental, i.e. the dump will include only chunks of data that are missing on the target cluster. The above command can be interpreted as follows: "create a dump that includes all the updates made to the source database starting from the <last_repl_id> event."

    Sample output:

    +-----------------------------------------------------+---------------+
    |                      dump_dir                       | last_repl_id  |
    +-----------------------------------------------------+---------------+
    | /user/hive/repl/765fc939-4270-4aca-bb9d-e0eada6b5980| 165           |
    +-----------------------------------------------------+---------------+

    Just like with the previous dump iteration, the newly created dump_dir contains the _files file that points to actual dumped data. The sample _files contents are below.

    [admin@ka-adh-2 ~]$ hdfs dfs -cat /user/hive/repl/765fc939-4270-4aca-bb9d-e0eada6b5980/164/data/_files
    hdfs://adh/apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1###

    Check the contents of the /apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1 file:

    $ hdfs dfs -cat /apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1

    The following sample output indicates that the dump contains only the missing (delta) records.

    7100360.002024-01-03
    8100350.002024-01-05
    9100375.002024-01-05
  8. Load the incremental dump to the target database:

    REPL LOAD demo_repl_db_replica
    FROM '<dump_dir>'
  9. SELECT from source and target tables to ensure both tables have identical data.

    demo_repl_db.demo_repl_tbl demo_repl_db_replica.demo_repl_tbl
    SELECT * FROM demo_repl_db.demo_repl_tbl AS src;
    SELECT * FROM demo_repl_db_replica.demo_repl_tbl AS tgt;
    +-------------+-------------+-----------------+---------------+
    | src.txn_id  | src.acc_id  | src.txn_amount  | src.txn_date  |
    +-------------+-------------+-----------------+---------------+
    | 1           | 1002        | 10.00           | 2024-01-01    |
    | 2           | 1002        | 20.00           | 2024-01-03    |
    | 3           | 1002        | 30.00           | 2024-01-02    |
    | 4           | 1001        | 100.50          | 2024-01-02    |
    | 5           | 1001        | 150.50          | 2024-01-04    |
    | 6           | 1001        | 200.50          | 2024-01-03    |
    | 7           | 1003        | 50.00           | 2024-01-03    |
    | 8           | 1003        | 50.00           | 2024-01-01    |
    | 9           | 1003        | 75.00           | 2024-01-04    |
    | 10          | 1003        | 60.00           | 2024-01-03    |
    | 11          | 1003        | 50.00           | 2024-01-05    |
    | 12          | 1003        | 75.00           | 2024-01-05    |
    +-------------+-------------+-----------------+---------------+
    +-------------+-------------+-----------------+---------------+
    | tgt.txn_id  | tgt.acc_id  | tgt.txn_amount  | tgt.txn_date  |
    +-------------+-------------+-----------------+---------------+
    | 1           | 1002        | 10.00           | 2024-01-01    |
    | 2           | 1002        | 20.00           | 2024-01-03    |
    | 3           | 1002        | 30.00           | 2024-01-02    |
    | 4           | 1001        | 100.50          | 2024-01-02    |
    | 5           | 1001        | 150.50          | 2024-01-04    |
    | 6           | 1001        | 200.50          | 2024-01-03    |
    | 7           | 1003        | 50.00           | 2024-01-03    |
    | 8           | 1003        | 50.00           | 2024-01-01    |
    | 9           | 1003        | 75.00           | 2024-01-04    |
    | 10          | 1003        | 60.00           | 2024-01-03    |
    | 11          | 1003        | 50.00           | 2024-01-05    |
    | 12          | 1003        | 75.00           | 2024-01-05    |
    +-------------+-------------+-----------------+---------------+
  10. Delete the test table in the source database:

    USE demo_repl_db;
    DROP TABLE demo_repl_db.demo_repl_tbl;
  11. Run another dump iteration to transfer the DROP TABLE operation results to the target database. For this, follow the steps below.

    Create a dump:

    REPL DUMP {demo_repl_db}
    FROM <last_repl_id>; (1)
    1 Instead of <last_repl_id>, use the ID returned by the previous REPL DUMP operation.

    Sample output:

    +-----------------------------------------------------+---------------+
    |                      dump_dir                       | last_repl_id  |
    +-----------------------------------------------------+---------------+
    | /user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72| 180           |
    +-----------------------------------------------------+---------------+

    Load the dump:

    REPL LOAD {demo_repl_db} FROM '<dump_dir>';

    The "eventType":"EVENT_DROP_TABLE" string in the output confirms that the DROP TABLE operation results have been picked to the target table.

    Sample output
    INFO  : Compiling command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e): repl load demo_repl_db_replica from '/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72'
    INFO  : Concurrency mode is disabled, not creating a lock manager
    INFO  : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72","loadType":"INCREMENTAL","numEvents":1,"loadStartTime":1716294874}
    INFO  : Semantic Analysis Completed (retrial = false)
    INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
    INFO  : EXPLAIN output for queryid hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e : STAGE DEPENDENCIES:
      Stage-0 is a root stage [DEPENDENCY_COLLECTION]
      Stage-1 depends on stages: Stage-0 [DDL]
      Stage-2 depends on stages: Stage-1 [DEPENDENCY_COLLECTION]
      Stage-3 depends on stages: Stage-2 [DDL]
      Stage-4 depends on stages: Stage-3 [REPL_STATE_LOG]
      Stage-5 depends on stages: Stage-4 [REPL_STATE_LOG]
    
    STAGE PLANS:
      Stage: Stage-0
        Dependency Collection
    
      Stage: Stage-1
          Drop Table Operator:
            Drop Table
              table: demo_repl_db_replica.demo_repl_tbl
    
      Stage: Stage-2
        Dependency Collection
    
      Stage: Stage-3
    
      Stage: Stage-4
        Repl State Log
    
      Stage: Stage-5
        Repl State Log
    
    
    INFO  : Completed compiling command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e); Time taken: 0.019 seconds
    INFO  : Concurrency mode is disabled, not creating a lock manager
    INFO  : Executing command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e): repl load demo_repl_db_replica from '/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72'
    INFO  : Starting task [Stage-0:DEPENDENCY_COLLECTION] in serial mode
    INFO  : Starting task [Stage-1:DDL] in serial mode
    INFO  : Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode
    INFO  : Starting task [Stage-3:DDL] in serial mode
    INFO  : Starting task [Stage-4:REPL_STATE_LOG] in serial mode
    INFO  : REPL::EVENT_LOAD: {"dbName":"demo_repl_db_replica","eventId":"180","eventType":"EVENT_DROP_TABLE","eventsLoadProgress":"1/1","loadTime":1716294874}
    INFO  : Starting task [Stage-5:REPL_STATE_LOG] in serial mode
    INFO  : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"INCREMENTAL","numEvents":1,"loadEndTime":1716294874,"dumpDir":"hdfs://adh/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72","lastReplId":"180"}
    INFO  : Completed executing command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e); Time taken: 0.183 seconds
    INFO  : OK
    INFO  : Concurrency mode is disabled, not creating a lock manager
    No rows affected (0.207 seconds)
  12. Run SHOW TABLES for source and target databases to ensure that the demo_repl_tbl tables have been removed from both Hive databases.

Limitations

There are several limitations on using the Hive replication feature:

  • Only managed tables can be replicated. External tables are replicated as managed.

  • Replication of ACID tables is not supported.

  • The table contents stored in HDFS must be owned by the hive user.

  • The current replication capabilities are unsuitable for load balancing since there is no built-in mechanism that would guarantee that source and target entities are in sync at any moment of time. Load balancing should be implemented by external tools/custom software.

Found a mistake? Seleсt text and press Ctrl+Enter to report it