Hive replication
Overview
Replication in the context of Hive is the process of duplicating Hive data from one Hive warehouse to another warehouse on a different ADH cluster. The main goal of replication is to have an up-to-date replica of a Hive database that includes all the changes made to the source database. The unit of replication can be an entire Hive database, a table, or a partition.
This article highlights the built-in Hive replication functions, which are available in ADH out-of-the-box, describes core Hive replication concepts and the replication commands. The step-by-step example walks you through the major replication phases and provides guidance on the proper sequence of actions to be performed in order to keep two databases in sync using Hive replication.
The replication approach discussed in this article is fully manual, which is not the case for a real production environment. Automating certain replication steps assumes the use of external tools or custom software that would be responsible for coordinating the proper sequence of replication commands, fault tolerance/failure handling, and configuration options. The information and examples provided in the article will give you insights on building a custom framework to automate the replication steps.
How it works
Review the following replication concepts that make up the core of the Hive replication mechanism.
Events and notification listeners
A notification listener is a pluggable Java class that is responsible for catching the updates to Hive data/metadata (events) and logging these updates somewhere for further processing.
The listener class is set using the hive.metastore.transactional.event.listeners
property in ADCM (Clusters → <clusterName> → Services → Hive → Primary Configuration → hive-site.xml).
By default, Hive uses the org.apache.hive.hcatalog.listener.DbNotificationListener
listener that stores Hive modification events in the Hive Metastore database (the hive.NOTIFICATION_LOG
table).
For any CRUD operation on a Hive database, the listener creates corresponding records, as shown in the example below.
MariaDB [hive]> select * from NOTIFICATION_LOG \G *************************** 1. row *************************** NL_ID: 26 EVENT_ID: 25 EVENT_TIME: 1713876508 EVENT_TYPE: ALTER_TABLE CAT_NAME: hive DB_NAME: default TBL_NAME: demo MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjBeforeJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1712916672\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","tableObjAfterJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","isTruncateOp":"false","timestamp":1713876508} MESSAGE_FORMAT: json-0.2 *************************** 2. row *************************** NL_ID: 27 EVENT_ID: 26 EVENT_TIME: 1713876508 EVENT_TYPE: INSERT CAT_NAME: hive DB_NAME: default TBL_NAME: demo MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","ptnObjJson":null,"timestamp":1713876508,"replace":"false","files":["hdfs://adh2/apps/hive/warehouse/demo/000000_0_copy_3###"]} MESSAGE_FORMAT: json-0.2 *************************** 3. row *************************** NL_ID: 28 EVENT_ID: 27 EVENT_TIME: 1713876508 EVENT_TYPE: ALTER_TABLE CAT_NAME: hive DB_NAME: default TBL_NAME: demo MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjBeforeJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","tableObjAfterJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"27\",\"numRows\":\"4\",\"rawDataSize\":\"23\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"4\",\"transient_lastDdlTime\":\"1713876508\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","isTruncateOp":"false","timestamp":1713876508} MESSAGE_FORMAT: json-0.2
The MESSAGE
column stores the information about operations applied to the corresponding Hive database/table, the state of data before and after the operation, and so on.
The EVENT_ID
labels each update with an identifier (similarly to Git commits).
This allows Hive to granularly reproduce any specific data update operation (for example, the results of one or more INSERT
commands) on a different ADH cluster, thus making the replication possible.
TIP
The event records generated by listeners can be continuously parsed to track the updates to Hive entities in real time.
|
Bootstrap vs incremental replication
For any replication case, there are two types of data replication phases described below.
-
Bootstrap. This is the initial replication activity that assumes the full copy of data from the source Hive database to the target cluster. This operation runs once at the very beginning of the replication flow. Further updates to the replicated entity are chained relative to the initial bootstrap dump.
-
Incremental. All the subsequent updates to the source entity after the bootstrap phase assume transferring only portions of data (deltas) to synchronize the diffs between the replicated entities.
Replication commands
Hive provides the following commands to orchestrate the replication flow.
REPL DUMP
The REPL DUMP command creates a dump directory with Hive data/metadata in HDFS. The command runs on the source cluster.
TIP
You might want to run this command by a cron schedule for periodic dumps or run the command when a specific update occurs in the source entity.
|
Syntax:
REPL DUMP <repl_policy>
[FROM <evid-start> [TO <end-evid>] [LIMIT <num-evids>] ]
[WITH ('key1'='value1', 'key2'='value2')];
<repl_policy>
defines an object to be dumped (Hive database, table, or partition) with include/exclude filters.
It has the following syntax:
<dbname>{{.[<comma_separated_include_tables_regex_list>]}{.[<comma_separated_exclude_tables_regex_list>]}}
NOTE
Hive table/view names are case-insensitive by nature, so the table names test_table and TEST_TABLE refer to the same table in terms of include/exclude filters.
|
The FROM <evid-start> [TO <end-evid>]
expression allows you to create incremental dumps.
That is, you can specify a range of events to dump only those pieces of data related to the given events.
The REPL
commands have the optional WITH
clause used to set Hive replication parameters.
Configuration parameters specified in this way are effective only for a single REPL …
query and are not used for other queries running in the same session.
For example, WITH("hive.repl.rootdir", "/user/hive/repl/")
can be used to explicitly specify an HDFS directory to store the dump files.
Running REPL DUMP
returns a result set like the one below:
+------------------------------------------------------+---------------+ | dump_dir | last_repl_id | +------------------------------------------------------+---------------+ | /user/hive/repl/eabe08dd-a524-4b8e-9520-e924cac73761 | 77 | +------------------------------------------------------+---------------+
Where:
-
dump_dir
— an HDFS directory with the dump data. -
last_repl_id
— an ID that represents the last state of the database/table when it was dumped. When several dumps are created one-by-one, these IDs are needed to identify the dumps (and the data they carry) relatively to each other.
Below are several REPL DUMP
usage examples with comments.
REPL DUMP hr; (1)
REPL DUMP hr WITH("hive.repl.rootdir", "/custom/hdfs/dir"); (2)
REPL DUMP hr.['employees', '[a-z]+']; (3)
REPL DUMP hr.['.*?'].['Q3[0-9]+', 'Q4']; (4)
REPL DUMP hr FROM 100; (5)
REPL DUMP hr FROM 100 to 1000 (6)
REPL DUMP hr.['[a-z]+'] REPLACE hr FROM 200 (7)
1 | Creates a bootstrap dump of the entire hr database, including all tables/views. |
2 | Specifies a custom HDFS directory to store the dump files. |
3 | Dumps the table/view named employees as well as any tables/views that match the [a-z]+ regular expression, for example, departments , accounts , and so on. |
4 | Dumps tables/views with any name except Q4 and except those that have the Q3 prefix followed by a numeric string of any length (for example, Q3100 , Q320 ). |
5 | Creates an incremental dump starting from the specific event. This means that the resulting dump will contain only the updates made to the source entity after the specified event. |
6 | Creates an incremental dump that includes data updates that fall into the specified range of events (100—1000). |
7 | Dynamically changes the replication policy for the current incremental phase.
The hr replication policy is replaced to include only tables/views whose names consist of alphabetic characters.
Further loading of such a dump will automatically drop the tables which are excluded as per the new policy. |
REPL LOAD
The REPL LOAD command is executed on the target cluster to load dump files from HDFS into Hive.
Syntax:
REPL LOAD <db_name>
FROM <dir_name>
[WITH ('key1'='value1', 'key2'='value2')];
If <dbname>
is specified and the dump was created as a database-level dump, this allows Hive to rename the database on import.
If <dbname>
is not set, the original database name (as recorded in the dump) is used.
The command’s output contains detailed information about the imported entities.
INFO : Completed compiling command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 0.019 seconds INFO : Concurrency mode is disabled, not creating a lock manager INFO : Executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983): REPL LOAD demo_repl_db_replica FROM '/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804' INFO : Starting task [Stage-0:REPL_BOOTSTRAP_LOAD] in serial mode INFO : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadStartTime":1716280801} INFO : Root Tasks / Total Tasks : 1 / 8 INFO : completed load task run : 1 INFO : Starting task [Stage-0:DDL] in serial mode INFO : Starting task [Stage-1:DDL] in serial mode INFO : Starting task [Stage-2:DDL] in serial mode INFO : Starting task [Stage-3:COPY] in serial mode INFO : Copying data from hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804/demo_repl_db/demo_repl_tbl/data to hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003 INFO : Starting task [Stage-4:MOVE] in serial mode INFO : Loading data to table demo_repl_db_replica.demo_repl_tbl from hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003 INFO : Starting task [Stage-5:DDL] in serial mode INFO : Starting task [Stage-6:REPL_STATE_LOG] in serial mode INFO : REPL::TABLE_LOAD: {"dbName":"demo_repl_db_replica","tableName":"demo_repl_tbl","tableType":"MANAGED_TABLE","tablesLoadProgress":"1/1","loadTime":1716280802} INFO : Starting task [Stage-7:REPL_STATE_LOG] in serial mode INFO : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadEndTime":1716280802,"dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","lastReplId":"152"} INFO : Starting task [Stage-8:DDL] in serial mode INFO : Completed executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 1.033 seconds INFO : OK INFO : Concurrency mode is disabled, not creating a lock manager
REPL STATUS
The REPL STATUS command runs on the target cluster and returns the last replicated event_id
of the target database/table.
Using REPL STATUS
, you can know the state up to which your target entity has been replicated.
TIP
Typically, event_id returned by this command is used to construct the next REPL DUMP command for incremental replication.
|
Syntax:
REPL STATUS <db_name>;
Sample output:
+---------------+ | last_repl_id | +---------------+ | 180 | +---------------+
Replication process example
The following scenario walks you through major replication steps and shows how to keep two Hive databases in sync using replication commands. The steps described below can be (and actually should be) automated and adjusted to your business needs to become a full-fledged automatic replication utility.
-
Create a test Hive database.
DROP DATABASE IF EXISTS demo_repl_db; CREATE DATABASE demo_repl_db; USE demo_repl_db;
-
In the test database, create a sample table and populate it with dummy data.
CREATE TABLE demo_repl_tbl ( `txn_id` int, `acc_id` int, `txn_amount` decimal(10,2), `txn_date` date);
INSERT INTO demo_repl_tbl VALUES (1, 1002, 10.00, '2024-01-01'), (2, 1002, 20.00, '2024-01-03'), (3, 1002, 30.00, '2024-01-02'), (4, 1001, 100.50, '2024-01-02'), (5, 1001, 150.50, '2024-01-04'), (6, 1001, 200.50, '2024-01-03'), (7, 1003, 50.00, '2024-01-03'), (8, 1003, 50.00, '2024-01-01'), (9, 1003, 75.00, '2024-01-04');
-
Set the test database as a replication source and specify the replication policy using the command:
ALTER DATABASE demo_repl_db SET DBPROPERTIES ("repl.source.for"="testrepl");
Where
testrepl
is an arbitrary string used to logically identify the current replication session. -
Create a bootstrap dump of the entire database using the command:
REPL DUMP demo_repl_db;
The command returns a result set with two columns. A sample output is below.
+------------------------------------------------------+---------------+ | dump_dir | last_repl_id | +------------------------------------------------------+---------------+ | /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106 | 120 | +------------------------------------------------------+---------------+
Where:
-
last_repl_id
indicates the state of the created dump, i.e. it reflects the events that have been included in the dump; this ID will be needed on the next dump iteration. -
dump_dir
is an HDFS directory where the dump is stored.
Check the
dump_dir
contents using the command:$ hdfs dfs -ls -R <dump_dir>
Sample output:
-rw-r--r-- 3 hive hadoop 41 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/_dumpmetadata drwxr-xr-x - hive hadoop 0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db -rw-r--r-- 3 hive hadoop 257 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/_metadata drwxr-xr-x - hive hadoop 0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl -rw-r--r-- 3 hive hadoop 1653 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/_metadata drwxr-xr-x - hive hadoop 0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/data -rw-r--r-- 3 hive hadoop 73 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/data/_files
The HDFS directory contains several _metadata files that store Hive database/table metadata and the file named _files. The latter stores a pointer to the Hive warehouse location where actual table data is stored.
-
-
Load the dump into Hive on the target cluster. You can use distcp to copy the dump directory between the clusters. If you load the dump into the same ADH cluster, be sure to use a different name for the target database. This approach is described further in the article.
REPL LOAD demo_repl_db_replica FROM '<dump_dir>';
This loads the bootstrap dump into Hive, disregarding events. The output contains detailed information about imported entities, events, load type, etc.
Sample outputINFO : Completed compiling command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 0.019 seconds INFO : Concurrency mode is disabled, not creating a lock manager INFO : Executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983): REPL LOAD demo_repl_db_replica FROM '/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804' INFO : Starting task [Stage-0:REPL_BOOTSTRAP_LOAD] in serial mode INFO : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadStartTime":1716280801} INFO : Root Tasks / Total Tasks : 1 / 8 INFO : completed load task run : 1 INFO : Starting task [Stage-0:DDL] in serial mode INFO : Starting task [Stage-1:DDL] in serial mode INFO : Starting task [Stage-2:DDL] in serial mode INFO : Starting task [Stage-3:COPY] in serial mode INFO : Copying data from hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804/demo_repl_db/demo_repl_tbl/data to hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003 INFO : Starting task [Stage-4:MOVE] in serial mode INFO : Loading data to table demo_repl_db_replica.demo_repl_tbl from hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003 INFO : Starting task [Stage-5:DDL] in serial mode INFO : Starting task [Stage-6:REPL_STATE_LOG] in serial mode INFO : REPL::TABLE_LOAD: {"dbName":"demo_repl_db_replica","tableName":"demo_repl_tbl","tableType":"MANAGED_TABLE","tablesLoadProgress":"1/1","loadTime":1716280802} INFO : Starting task [Stage-7:REPL_STATE_LOG] in serial mode INFO : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadEndTime":1716280802,"dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","lastReplId":"152"} INFO : Starting task [Stage-8:DDL] in serial mode INFO : Completed executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 1.033 seconds INFO : OK INFO : Concurrency mode is disabled, not creating a lock manager
As a result of running the command, Hive creates a new database.
+-----------------------+ | database_name | +-----------------------+ | default | | demo_repl_db | | demo_repl_db_replica | <- +-----------------------+
At this moment, the source and target Hive databases have identical contents.
-
Insert some data to the source table:
INSERT INTO demo_repl_tbl VALUES (10, 1003, 60.00, '2024-01-03'), (11, 1003, 50.00, '2024-01-05'), (12, 1003, 75.00, '2024-01-05');
-
To sync the inserts with the target database, create one more dump using the following command.
REPL DUMP demo_repl_db FROM <last_repl_id>; (1)
1 Instead of <last_repl_id>
, use the ID returned by the previous REPL DUMP operation.The presence of the
FROM <last_repl_id>
clause makes the dump incremental, i.e. the dump will include only chunks of data that are missing on the target cluster. The above command can be interpreted as follows: "create a dump that includes all the updates made to the source database starting from the<last_repl_id>
event."Sample output:
+-----------------------------------------------------+---------------+ | dump_dir | last_repl_id | +-----------------------------------------------------+---------------+ | /user/hive/repl/765fc939-4270-4aca-bb9d-e0eada6b5980| 165 | +-----------------------------------------------------+---------------+
Just like with the previous dump iteration, the newly created
dump_dir
contains the _files file that points to actual dumped data. The sample _files contents are below.[admin@ka-adh-2 ~]$ hdfs dfs -cat /user/hive/repl/765fc939-4270-4aca-bb9d-e0eada6b5980/164/data/_files hdfs://adh/apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1###
Check the contents of the /apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1 file:
$ hdfs dfs -cat /apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1
The following sample output indicates that the dump contains only the missing (delta) records.
7100360.002024-01-03 8100350.002024-01-05 9100375.002024-01-05
-
Load the incremental dump to the target database:
REPL LOAD demo_repl_db_replica FROM '<dump_dir>'
-
SELECT
from source and target tables to ensure both tables have identical data.demo_repl_db.demo_repl_tbl demo_repl_db_replica.demo_repl_tbl SELECT * FROM demo_repl_db.demo_repl_tbl AS src;
SELECT * FROM demo_repl_db_replica.demo_repl_tbl AS tgt;
+-------------+-------------+-----------------+---------------+ | src.txn_id | src.acc_id | src.txn_amount | src.txn_date | +-------------+-------------+-----------------+---------------+ | 1 | 1002 | 10.00 | 2024-01-01 | | 2 | 1002 | 20.00 | 2024-01-03 | | 3 | 1002 | 30.00 | 2024-01-02 | | 4 | 1001 | 100.50 | 2024-01-02 | | 5 | 1001 | 150.50 | 2024-01-04 | | 6 | 1001 | 200.50 | 2024-01-03 | | 7 | 1003 | 50.00 | 2024-01-03 | | 8 | 1003 | 50.00 | 2024-01-01 | | 9 | 1003 | 75.00 | 2024-01-04 | | 10 | 1003 | 60.00 | 2024-01-03 | | 11 | 1003 | 50.00 | 2024-01-05 | | 12 | 1003 | 75.00 | 2024-01-05 | +-------------+-------------+-----------------+---------------+
+-------------+-------------+-----------------+---------------+ | tgt.txn_id | tgt.acc_id | tgt.txn_amount | tgt.txn_date | +-------------+-------------+-----------------+---------------+ | 1 | 1002 | 10.00 | 2024-01-01 | | 2 | 1002 | 20.00 | 2024-01-03 | | 3 | 1002 | 30.00 | 2024-01-02 | | 4 | 1001 | 100.50 | 2024-01-02 | | 5 | 1001 | 150.50 | 2024-01-04 | | 6 | 1001 | 200.50 | 2024-01-03 | | 7 | 1003 | 50.00 | 2024-01-03 | | 8 | 1003 | 50.00 | 2024-01-01 | | 9 | 1003 | 75.00 | 2024-01-04 | | 10 | 1003 | 60.00 | 2024-01-03 | | 11 | 1003 | 50.00 | 2024-01-05 | | 12 | 1003 | 75.00 | 2024-01-05 | +-------------+-------------+-----------------+---------------+
-
Delete the test table in the source database:
USE demo_repl_db; DROP TABLE demo_repl_db.demo_repl_tbl;
-
Run another dump iteration to transfer the
DROP TABLE
operation results to the target database. For this, follow the steps below.Create a dump:
REPL DUMP {demo_repl_db} FROM <last_repl_id>; (1)
1 Instead of <last_repl_id>
, use the ID returned by the previous REPL DUMP operation.Sample output:
+-----------------------------------------------------+---------------+ | dump_dir | last_repl_id | +-----------------------------------------------------+---------------+ | /user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72| 180 | +-----------------------------------------------------+---------------+
Load the dump:
REPL LOAD {demo_repl_db} FROM '<dump_dir>';
The
"eventType":"EVENT_DROP_TABLE"
string in the output confirms that theDROP TABLE
operation results have been picked to the target table.Sample outputINFO : Compiling command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e): repl load demo_repl_db_replica from '/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72' INFO : Concurrency mode is disabled, not creating a lock manager INFO : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72","loadType":"INCREMENTAL","numEvents":1,"loadStartTime":1716294874} INFO : Semantic Analysis Completed (retrial = false) INFO : Returning Hive schema: Schema(fieldSchemas:null, properties:null) INFO : EXPLAIN output for queryid hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e : STAGE DEPENDENCIES: Stage-0 is a root stage [DEPENDENCY_COLLECTION] Stage-1 depends on stages: Stage-0 [DDL] Stage-2 depends on stages: Stage-1 [DEPENDENCY_COLLECTION] Stage-3 depends on stages: Stage-2 [DDL] Stage-4 depends on stages: Stage-3 [REPL_STATE_LOG] Stage-5 depends on stages: Stage-4 [REPL_STATE_LOG] STAGE PLANS: Stage: Stage-0 Dependency Collection Stage: Stage-1 Drop Table Operator: Drop Table table: demo_repl_db_replica.demo_repl_tbl Stage: Stage-2 Dependency Collection Stage: Stage-3 Stage: Stage-4 Repl State Log Stage: Stage-5 Repl State Log INFO : Completed compiling command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e); Time taken: 0.019 seconds INFO : Concurrency mode is disabled, not creating a lock manager INFO : Executing command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e): repl load demo_repl_db_replica from '/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72' INFO : Starting task [Stage-0:DEPENDENCY_COLLECTION] in serial mode INFO : Starting task [Stage-1:DDL] in serial mode INFO : Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode INFO : Starting task [Stage-3:DDL] in serial mode INFO : Starting task [Stage-4:REPL_STATE_LOG] in serial mode INFO : REPL::EVENT_LOAD: {"dbName":"demo_repl_db_replica","eventId":"180","eventType":"EVENT_DROP_TABLE","eventsLoadProgress":"1/1","loadTime":1716294874} INFO : Starting task [Stage-5:REPL_STATE_LOG] in serial mode INFO : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"INCREMENTAL","numEvents":1,"loadEndTime":1716294874,"dumpDir":"hdfs://adh/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72","lastReplId":"180"} INFO : Completed executing command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e); Time taken: 0.183 seconds INFO : OK INFO : Concurrency mode is disabled, not creating a lock manager No rows affected (0.207 seconds)
-
Run
SHOW TABLES
for source and target databases to ensure that thedemo_repl_tbl
tables have been removed from both Hive databases.
Limitations
There are several limitations on using the Hive replication feature:
-
Only managed tables can be replicated. External tables are replicated as managed.
-
Replication of ACID tables is not supported.
-
The table contents stored in HDFS must be owned by the
hive
user. -
The current replication capabilities are unsuitable for load balancing since there is no built-in mechanism that would guarantee that source and target entities are in sync at any moment of time. Load balancing should be implemented by external tools/custom software.