Репликация Hive
Обзор
Репликация в контексте Hive — это процесс дублирования данных из одного хранилища Hive в другое, обычно расположенное в другом ADH-кластере. Основной задачей репликации является наличие актуальной и обновляемой реплики данных Hive, которая включает все изменения, вносимые в исходные данные. В Hive можно реплицировать базу данных, таблицу или партицию.
В этой статье описаны встроенные возможности репликации Hive, доступные в ADH "из коробки"; также статья содержит информацию о командах и ключевых понятиях механизма репликации. В примере показаны основные этапы репликации и описан порядок действий, которые необходимо выполнить для синхронизации двух баз данных Hive с помощью функции репликации.
Описанный в статье процесс репликации предполагает полностью ручной режим, что вряд ли может являться приемлемым кейсом для production-среды. Для автоматизации процессов репликации Hive необходимо использование внешних инструментов или кастомного программного обеспечения, которое будет отвечать за параметры конфигурации, координацию выполняемых команд, обеспечивать отказоустойчивость и так далее. Информация и примеры, приведенные в этой статье, могут быть полезными для создания кастомного фреймворка, который, учитывая все особенности вашего кластера, может быть использован для автоматизации процесса репликации.
Принцип работы
Обратите внимание на следующие понятия и термины, которые составляют основу механизма репликации Hive.
События и слушатели уведомлений
Слушатель уведомлений (notification listener) — это подключаемый Java-класс, который отслеживает модификацию данных/метаданных Hive и сохраняет информацию о каждом подобном событии (event) для дальнейшей обработки.
Класс слушателя устанавливается с помощью свойства hive.metastore.transactional.event.listeners
в ADCM (Clusters → <clusterName> → Services → Hive → Primary Configuration → hive-site.xml).
По умолчанию Hive использует org.apache.hive.hcatalog.listener.DbNotificationListener
, который сохраняет события в базу данных Hive Metastore (таблица hive.NOTIFICATION_LOG
).
Для каждой CRUD-операции в базе данных слушатель создает соответствующие записи, как показано в примере ниже.
MariaDB [hive]> select * from NOTIFICATION_LOG \G *************************** 1. row *************************** NL_ID: 26 EVENT_ID: 25 EVENT_TIME: 1713876508 EVENT_TYPE: ALTER_TABLE CAT_NAME: hive DB_NAME: default TBL_NAME: demo MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjBeforeJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1712916672\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","tableObjAfterJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","isTruncateOp":"false","timestamp":1713876508} MESSAGE_FORMAT: json-0.2 *************************** 2. row *************************** NL_ID: 27 EVENT_ID: 26 EVENT_TIME: 1713876508 EVENT_TYPE: INSERT CAT_NAME: hive DB_NAME: default TBL_NAME: demo MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","ptnObjJson":null,"timestamp":1713876508,"replace":"false","files":["hdfs://adh2/apps/hive/warehouse/demo/000000_0_copy_3###"]} MESSAGE_FORMAT: json-0.2 *************************** 3. row *************************** NL_ID: 28 EVENT_ID: 27 EVENT_TIME: 1713876508 EVENT_TYPE: ALTER_TABLE CAT_NAME: hive DB_NAME: default TBL_NAME: demo MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjBeforeJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","tableObjAfterJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"27\",\"numRows\":\"4\",\"rawDataSize\":\"23\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"4\",\"transient_lastDdlTime\":\"1713876508\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","isTruncateOp":"false","timestamp":1713876508} MESSAGE_FORMAT: json-0.2
В столбце MESSAGE
хранится информация об операциях с базами данных/таблицами Hive, состоянии данных до и после операции и так далее.
Столбец EVENT_ID
отмечает каждый факт обновления данных специальным идентификатором (аналогично коммитам в Git).
Это позволяет Hive точечно воспроизводить любую операцию по обновлению данных (например, результат одной или нескольких операций INSERT
) в целевом кластере ADH.
РЕКОМЕНДАЦИЯ
Записи событий, генерируемые слушателями, можно непрерывно парсить, чтобы отслеживать обновления сущностей Hive в режиме реального времени.
|
Начальный и инкрементный этапы репликации
Существует два основных этапа репликации данных:
-
Начальная загрузка (bootstrap). Первичная операция репликации, которая предполагает полное копирование данных из исходной базы данных Hive в целевой кластер. Эта операция выполняется один раз в самом начале процесса репликации. Дальнейшие обновления данных в реплицируемой сущности осуществляются по цепочке относительно начальной загрузки.
-
Инкрементная загрузка (incremental). Все модификации данных, выполняемые в исходной сущности после фазы начальной загрузки, предполагают перенос лишь недостающих данных (дельт) в целевой кластер.
Команды репликации
Hive предоставляет следующие команды для организации процесса репликации.
REPL DUMP
Команда REPL DUMP выполняется в исходном кластере и создает дамп с данными/метаданными Hive в HDFS.
РЕКОМЕНДАЦИЯ
Обычно эта команда выполняется либо по расписанию, регулярно создавая новые дамп-файлы, либо запускается по определенному событию в исходной базе/таблице.
|
Синтаксис:
REPL DUMP <repl_policy>
[FROM <evid-start> [TO <end-evid>] [LIMIT <num-evids>] ]
[WITH ('key1'='value1', 'key2'='value2')];
Выражение <repl_policy>
указывает сущность Hive, дамп которой необходимо создать (база данных Hive, таблица или партиция), а также фильтры для включения/исключения сущностей.
Синтаксис показан ниже:
<dbname>{{.[<comma_separated_include_tables_regex_list>]}{.[<comma_separated_exclude_tables_regex_list>]}}
ПРИМЕЧАНИЕ
Имена таблиц и представлений в Hive нечувствительны к регистру.
Поэтому имена таблиц test_table и TEST_TABLE , указанные в фильтре, относятся к одной и той же сущности.
|
Наличие предложения FROM <evid-start> [TO <end-evid>]
позволяет создавать инкрементный дамп данных.
Другими словами, вы можете указать диапазон событий, чтобы включить в дамп лишь те фрагменты данных, которые относятся к этим событиям.
Для команд REPL …
доступно необязательное выражение WITH
, используемое для установки параметров репликации Hive.
Указанные таким способом параметры действуют только для одного запроса REPL …
и не используются для других запросов, выполняющихся в той же сессии.
Например, с помощью WITH("hive.repl.rootdir", "/path/to/hdfs/")
можно явно переопределить HDFS-директорию для сохранения файлов дампа.
Запуск команды REPL DUMP
возвращает результат, как показано ниже.
+------------------------------------------------------+---------------+ | dump_dir | last_repl_id | +------------------------------------------------------+---------------+ | /user/hive/repl/eabe08dd-a524-4b8e-9520-e924cac73761 | 77 | +------------------------------------------------------+---------------+
Где:
-
dump_dir
— HDFS-директория с файлами дампа. -
last_repl_id
— идентификатор, отражающий состояние базы данных/таблицы в момент ее дампа. При последовательном создании множества дампов эти ID необходимы для идентификации дампов и их содержимого относительно друг друга.
Ниже показаны примеры использования REPL DUMP
с комментариями.
REPL DUMP hr; (1)
REPL DUMP hr WITH("hive.repl.rootdir", "/custom/hdfs/dir"); (2)
REPL DUMP hr.['employees', '[a-z]+']; (3)
REPL DUMP hr.['.*?'].['Q3[0-9]+', 'Q4']; (4)
REPL DUMP hr FROM 100; (5)
REPL DUMP hr FROM 100 to 1000 (6)
REPL DUMP hr.['[a-z]+'] REPLACE hr FROM 200 (7)
1 | Создание начального дампа базы данных hr , включая все таблицы и представления. |
2 | Указание кастомной директории HDFS для сохранения файлов дампа. |
3 | Создание дампа таблицы/представления с именем employees , а также включение всех сущностей, соответствующих регулярному выражению [a-z]+ , например departments , accounts и так далее. |
4 | Включает в дамп сущности с любыми именами, кроме Q4 и тех, чье имя содержит префикс Q3 , за которым следует числовая строка любой длины (например, Q3100 , Q320 ). |
5 | Создание инкрементного дампа, который включает в себя данные, начиная с определенного события. |
6 | Создание инкрементного дампа, который включает в себя данные, попадающие в указанный диапазон событий (100—1000). |
7 | Изменение политики репликации "на лету" для текущей инкрементной фазы.
Вместо политики hr указывается новая политика, включающая в дамп только те сущности, имена которых состоят из буквенных символов.
При дальнейшей загрузке такого дампа Hive автоматически удалит сущности, исключенные новой политикой. |
REPL LOAD
Команда REPL LOAD выполняется в целевом кластере для импорта файлов дампа из HDFS в Hive.
Синтаксис:
REPL LOAD <db_name>
FROM <dir_name>
[WITH ('key1'='value1', 'key2'='value2')];
Если указано имя <dbname>
и дамп является дампом базы данных, Hive переименует базу данных при импорте.
Если имя <dbname>
не указано, используется исходное имя базы данных (как записано в дампе).
В выводе этой команды содержится подробная информация об импортированных сущностях.
INFO : Completed compiling command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 0.019 seconds INFO : Concurrency mode is disabled, not creating a lock manager INFO : Executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983): REPL LOAD demo_repl_db_replica FROM '/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804' INFO : Starting task [Stage-0:REPL_BOOTSTRAP_LOAD] in serial mode INFO : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadStartTime":1716280801} INFO : Root Tasks / Total Tasks : 1 / 8 INFO : completed load task run : 1 INFO : Starting task [Stage-0:DDL] in serial mode INFO : Starting task [Stage-1:DDL] in serial mode INFO : Starting task [Stage-2:DDL] in serial mode INFO : Starting task [Stage-3:COPY] in serial mode INFO : Copying data from hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804/demo_repl_db/demo_repl_tbl/data to hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003 INFO : Starting task [Stage-4:MOVE] in serial mode INFO : Loading data to table demo_repl_db_replica.demo_repl_tbl from hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003 INFO : Starting task [Stage-5:DDL] in serial mode INFO : Starting task [Stage-6:REPL_STATE_LOG] in serial mode INFO : REPL::TABLE_LOAD: {"dbName":"demo_repl_db_replica","tableName":"demo_repl_tbl","tableType":"MANAGED_TABLE","tablesLoadProgress":"1/1","loadTime":1716280802} INFO : Starting task [Stage-7:REPL_STATE_LOG] in serial mode INFO : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadEndTime":1716280802,"dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","lastReplId":"152"} INFO : Starting task [Stage-8:DDL] in serial mode INFO : Completed executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 1.033 seconds INFO : OK INFO : Concurrency mode is disabled, not creating a lock manager
REPL STATUS
Команда REPL STATUS запускается на целевом кластере и возвращает последний реплицированный event_id
целевой базы данных/таблицы.
Используя REPL STATUS
, можно узнать, до какого состояния была реплицирована целевая база данных/таблица.
РЕКОМЕНДАЦИЯ
event_id , возвращаемый этой командой, обычно используется для составления последующей команды REPL DUMP для инкрементной репликации.
|
Синтаксис:
REPL STATUS <db_name>;
Пример вывода:
+---------------+ | last_repl_id | +---------------+ | 180 | +---------------+
Пример процесса репликации
Следующий пример демонстрирует основные этапы репликации. Данный сценарий описывает способ синхронизации двух баз данных Hive с помощью команд репликации. Описанные в примере шаги можно автоматизировать и адаптировать в соответствии с потребностями вашего бизнеса, чтобы превратить их из набора отдельных команд в полноценную утилиту для автоматической репликации.
-
Создайте тестовую базу данных Hive.
DROP DATABASE IF EXISTS demo_repl_db; CREATE DATABASE demo_repl_db; USE demo_repl_db;
-
В тестовой базе создайте таблицу и заполните ее тестовыми данными.
CREATE TABLE demo_repl_tbl ( `txn_id` int, `acc_id` int, `txn_amount` decimal(10,2), `txn_date` date);
INSERT INTO demo_repl_tbl VALUES (1, 1002, 10.00, '2024-01-01'), (2, 1002, 20.00, '2024-01-03'), (3, 1002, 30.00, '2024-01-02'), (4, 1001, 100.50, '2024-01-02'), (5, 1001, 150.50, '2024-01-04'), (6, 1001, 200.50, '2024-01-03'), (7, 1003, 50.00, '2024-01-03'), (8, 1003, 50.00, '2024-01-01'), (9, 1003, 75.00, '2024-01-04');
-
Укажите тестовую базу данных в качестве источника репликации и укажите политику репликации с помощью команды:
ALTER DATABASE demo_repl_db SET DBPROPERTIES ("repl.source.for"="testrepl");
Где
testrepl
— произвольная строка, используемая для логической идентификации текущей сессии репликации. -
Создайте начальный дамп всей базы данных, используя команду:
REPL DUMP demo_repl_db;
Запрос возвращает набор результатов с двумя столбцами, как показано ниже.
+------------------------------------------------------+---------------+ | dump_dir | last_repl_id | +------------------------------------------------------+---------------+ | /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106 | 120 | +------------------------------------------------------+---------------+
Где:
-
last_repl_id
— отражает события, которые попали в дамп; этот идентификатор понадобится на следующей фазе репликации. -
dump_dir
— HDFS-директория, в которой были сохранены файлы дампа.
Выведите содержимое
dump_dir
с помощью команды:$ hdfs dfs -ls -R <dump_dir>
Пример вывода:
-rw-r--r-- 3 hive hadoop 41 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/_dumpmetadata drwxr-xr-x - hive hadoop 0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db -rw-r--r-- 3 hive hadoop 257 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/_metadata drwxr-xr-x - hive hadoop 0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl -rw-r--r-- 3 hive hadoop 1653 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/_metadata drwxr-xr-x - hive hadoop 0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/data -rw-r--r-- 3 hive hadoop 73 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/data/_files
Директория дампа содержит несколько файлов _metadata, в которых хранятся метаданные базы данных/таблиц Hive, а также файл _files. В последнем хранится ссылка на локацию в хранилище Hive, где физически хранятся данные таблицы.
-
-
Загрузите дамп в Hive на целевом кластере. Вы можете использовать distcp для копирования дампа между кластерами. При загрузке дампа в один и тот же кластер ADH необходимо указать новое имя для целевой базы данных. Этот подход используется далее в примере.
REPL LOAD demo_repl_db_replica FROM '<dump_dir>';
Команда импортирует начальный дамп в Hive без учета отдельных событий. В выводе содержится подробная информация об импортированных сущностях, событиях, типе загрузки и так далее.
Пример выводаINFO : Completed compiling command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 0.019 seconds INFO : Concurrency mode is disabled, not creating a lock manager INFO : Executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983): REPL LOAD demo_repl_db_replica FROM '/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804' INFO : Starting task [Stage-0:REPL_BOOTSTRAP_LOAD] in serial mode INFO : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadStartTime":1716280801} INFO : Root Tasks / Total Tasks : 1 / 8 INFO : completed load task run : 1 INFO : Starting task [Stage-0:DDL] in serial mode INFO : Starting task [Stage-1:DDL] in serial mode INFO : Starting task [Stage-2:DDL] in serial mode INFO : Starting task [Stage-3:COPY] in serial mode INFO : Copying data from hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804/demo_repl_db/demo_repl_tbl/data to hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003 INFO : Starting task [Stage-4:MOVE] in serial mode INFO : Loading data to table demo_repl_db_replica.demo_repl_tbl from hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003 INFO : Starting task [Stage-5:DDL] in serial mode INFO : Starting task [Stage-6:REPL_STATE_LOG] in serial mode INFO : REPL::TABLE_LOAD: {"dbName":"demo_repl_db_replica","tableName":"demo_repl_tbl","tableType":"MANAGED_TABLE","tablesLoadProgress":"1/1","loadTime":1716280802} INFO : Starting task [Stage-7:REPL_STATE_LOG] in serial mode INFO : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadEndTime":1716280802,"dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","lastReplId":"152"} INFO : Starting task [Stage-8:DDL] in serial mode INFO : Completed executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 1.033 seconds INFO : OK INFO : Concurrency mode is disabled, not creating a lock manager
В результате выполнения команды Hive создает новую базу данных:
+-----------------------+ | database_name | +-----------------------+ | default | | demo_repl_db | | demo_repl_db_replica | <- +-----------------------+
На данном этапе исходная и целевая базы данных Hive имеют идентичное содержимое, так как данные были скопированы полностью.
-
Добавьте данные в исходную таблицу:
INSERT INTO demo_repl_tbl VALUES (10, 1003, 60.00, '2024-01-03'), (11, 1003, 50.00, '2024-01-05'), (12, 1003, 75.00, '2024-01-05');
-
Чтобы перенести результаты операции
INSERT
в целевую базу данных, создайте еще один дамп, как показано ниже:REPL DUMP demo_repl_db FROM <last_repl_id>; (1)
1 Вместо <last_repl_id>
используйте идентификатор, полученный предыдущим запросом REPL DUMP.Наличие предложения
FROM <last_repl_id>
делает дамп инкрементным, то есть в дамп попадут только те данные, которые отсутствуют в целевом кластере. Приведенный выше запрос можно интерпретировать следующим образом: "создать дамп, включающий все обновления исходной базы данных, начиная с события <last_repl_id>".Пример вывода:
+-----------------------------------------------------+---------------+ | dump_dir | last_repl_id | +-----------------------------------------------------+---------------+ | /user/hive/repl/765fc939-4270-4aca-bb9d-e0eada6b5980| 165 | +-----------------------------------------------------+---------------+
Как и в предыдущей фазе репликации, новая директория
dump_dir
содержит файл _files, который указывает на расположение данных дампа. Пример содержимого _files показан ниже.[admin@ka-adh-2 ~]$ hdfs dfs -cat /user/hive/repl/765fc939-4270-4aca-bb9d-e0eada6b5980/164/data/_files hdfs://adh/apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1###
Выведите содержимое файла /apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1:
$ hdfs dfs -cat /apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1
Следующий вывод показывает, что в дамп были включены только записи, отсутствующие в целевой базе данных (дельта).
7100360.002024-01-03 8100350.002024-01-05 9100375.002024-01-05
-
Загрузите инкрементный дамп в целевую базу данных:
REPL LOAD demo_repl_db_replica FROM '<dump_dir>'
-
Выполните
SELECT
для исходной и целевой таблицы, чтобы убедиться, что обе сущности содержат одинаковые данные.demo_repl_db.demo_repl_tbl demo_repl_db_replica.demo_repl_tbl SELECT * FROM demo_repl_db.demo_repl_tbl AS src;
SELECT * FROM demo_repl_db_replica.demo_repl_tbl AS tgt;
+-------------+-------------+-----------------+---------------+ | src.txn_id | src.acc_id | src.txn_amount | src.txn_date | +-------------+-------------+-----------------+---------------+ | 1 | 1002 | 10.00 | 2024-01-01 | | 2 | 1002 | 20.00 | 2024-01-03 | | 3 | 1002 | 30.00 | 2024-01-02 | | 4 | 1001 | 100.50 | 2024-01-02 | | 5 | 1001 | 150.50 | 2024-01-04 | | 6 | 1001 | 200.50 | 2024-01-03 | | 7 | 1003 | 50.00 | 2024-01-03 | | 8 | 1003 | 50.00 | 2024-01-01 | | 9 | 1003 | 75.00 | 2024-01-04 | | 10 | 1003 | 60.00 | 2024-01-03 | | 11 | 1003 | 50.00 | 2024-01-05 | | 12 | 1003 | 75.00 | 2024-01-05 | +-------------+-------------+-----------------+---------------+
+-------------+-------------+-----------------+---------------+ | tgt.txn_id | tgt.acc_id | tgt.txn_amount | tgt.txn_date | +-------------+-------------+-----------------+---------------+ | 1 | 1002 | 10.00 | 2024-01-01 | | 2 | 1002 | 20.00 | 2024-01-03 | | 3 | 1002 | 30.00 | 2024-01-02 | | 4 | 1001 | 100.50 | 2024-01-02 | | 5 | 1001 | 150.50 | 2024-01-04 | | 6 | 1001 | 200.50 | 2024-01-03 | | 7 | 1003 | 50.00 | 2024-01-03 | | 8 | 1003 | 50.00 | 2024-01-01 | | 9 | 1003 | 75.00 | 2024-01-04 | | 10 | 1003 | 60.00 | 2024-01-03 | | 11 | 1003 | 50.00 | 2024-01-05 | | 12 | 1003 | 75.00 | 2024-01-05 | +-------------+-------------+-----------------+---------------+
-
Удалите тестовую таблицу в исходной базе данных:
USE demo_repl_db; DROP TABLE demo_repl_db.demo_repl_tbl;
-
Выполните еще один цикл репликации, чтобы перенести результат операции
DROP TABLE
в целевую базу данных. Для этого выполните следующие действия.Создайте дамп:
REPL DUMP {demo_repl_db} FROM <last_repl_id>; (1)
1 Вместо <last_repl_id>
используйте идентификатор, полученный предыдущим запросом REPL DUMP.Пример вывода:
+-----------------------------------------------------+---------------+ | dump_dir | last_repl_id | +-----------------------------------------------------+---------------+ | /user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72| 180 | +-----------------------------------------------------+---------------+
Загрузите дамп в Hive:
REPL LOAD {demo_repl_db} FROM '<dump_dir>';
Строка
"eventType": "EVENT_DROP_TABLE"
в выводе подтверждает, что результаты операцииDROP TABLE
были импортированы в целевую таблицу.Пример выводаINFO : Compiling command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e): repl load demo_repl_db_replica from '/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72' INFO : Concurrency mode is disabled, not creating a lock manager INFO : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72","loadType":"INCREMENTAL","numEvents":1,"loadStartTime":1716294874} INFO : Semantic Analysis Completed (retrial = false) INFO : Returning Hive schema: Schema(fieldSchemas:null, properties:null) INFO : EXPLAIN output for queryid hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e : STAGE DEPENDENCIES: Stage-0 is a root stage [DEPENDENCY_COLLECTION] Stage-1 depends on stages: Stage-0 [DDL] Stage-2 depends on stages: Stage-1 [DEPENDENCY_COLLECTION] Stage-3 depends on stages: Stage-2 [DDL] Stage-4 depends on stages: Stage-3 [REPL_STATE_LOG] Stage-5 depends on stages: Stage-4 [REPL_STATE_LOG] STAGE PLANS: Stage: Stage-0 Dependency Collection Stage: Stage-1 Drop Table Operator: Drop Table table: demo_repl_db_replica.demo_repl_tbl Stage: Stage-2 Dependency Collection Stage: Stage-3 Stage: Stage-4 Repl State Log Stage: Stage-5 Repl State Log INFO : Completed compiling command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e); Time taken: 0.019 seconds INFO : Concurrency mode is disabled, not creating a lock manager INFO : Executing command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e): repl load demo_repl_db_replica from '/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72' INFO : Starting task [Stage-0:DEPENDENCY_COLLECTION] in serial mode INFO : Starting task [Stage-1:DDL] in serial mode INFO : Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode INFO : Starting task [Stage-3:DDL] in serial mode INFO : Starting task [Stage-4:REPL_STATE_LOG] in serial mode INFO : REPL::EVENT_LOAD: {"dbName":"demo_repl_db_replica","eventId":"180","eventType":"EVENT_DROP_TABLE","eventsLoadProgress":"1/1","loadTime":1716294874} INFO : Starting task [Stage-5:REPL_STATE_LOG] in serial mode INFO : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"INCREMENTAL","numEvents":1,"loadEndTime":1716294874,"dumpDir":"hdfs://adh/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72","lastReplId":"180"} INFO : Completed executing command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e); Time taken: 0.183 seconds INFO : OK INFO : Concurrency mode is disabled, not creating a lock manager No rows affected (0.207 seconds)
-
Выполните
SHOW TABLES
для исходной и целевой базы данных и убедитесь, что таблицаdemo_repl_tbl
отсутствует в обеих базах данных Hive.
Ограничения
Существуют некоторые ограничения по использованию функционала репликации Hive:
-
Реплицировать можно только managed-таблицы. Внешние (external) таблицы реплицируются как managed-таблицы.
-
Репликация ACID-таблиц не поддерживается.
-
Пользователь
hive
должен быть владельцем содержимого таблиц, которое хранится в HDFS. -
Текущие возможности репликации не подходят для балансировки нагрузки, поскольку нет встроенного механизма, который гарантировал бы синхронизацию исходных и целевых сущностей в любой момент времени. Балансировка нагрузки должна быть реализована с помощью внешних инструментов или кастомного программного обеспечения.