Репликация Hive

Обзор

Репликация в контексте Hive — это процесс дублирования данных из одного хранилища Hive в другое, обычно расположенное в другом ADH-кластере. Основной задачей репликации является наличие актуальной и обновляемой реплики данных Hive, которая включает все изменения, вносимые в исходные данные. В Hive можно реплицировать базу данных, таблицу или партицию.

В этой статье описаны встроенные возможности репликации Hive, доступные в ADH "из коробки"; также статья содержит информацию о командах и ключевых понятиях механизма репликации. В примере показаны основные этапы репликации и описан порядок действий, которые необходимо выполнить для синхронизации двух баз данных Hive с помощью функции репликации.

Описанный в статье процесс репликации предполагает полностью ручной режим, что вряд ли может являться приемлемым кейсом для production-среды. Для автоматизации процессов репликации Hive необходимо использование внешних инструментов или кастомного программного обеспечения, которое будет отвечать за параметры конфигурации, координацию выполняемых команд, обеспечивать отказоустойчивость и так далее. Информация и примеры, приведенные в этой статье, могут быть полезными для создания кастомного фреймворка, который, учитывая все особенности вашего кластера, может быть использован для автоматизации процесса репликации.

Принцип работы

Обратите внимание на следующие понятия и термины, которые составляют основу механизма репликации Hive.

События и слушатели уведомлений

Слушатель уведомлений (notification listener) — это подключаемый Java-класс, который отслеживает модификацию данных/метаданных Hive и сохраняет информацию о каждом подобном событии (event) для дальнейшей обработки. Класс слушателя устанавливается с помощью свойства hive.metastore.transactional.event.listeners в ADCM (Clusters → <clusterName> → Services → Hive → Primary Configuration → hive-site.xml). По умолчанию Hive использует org.apache.hive.hcatalog.listener.DbNotificationListener, который сохраняет события в базу данных Hive Metastore (таблица hive.NOTIFICATION_LOG). Для каждой CRUD-операции в базе данных слушатель создает соответствующие записи, как показано в примере ниже.

Пример содержимого таблицы NOTIFICATION_LOG
MariaDB [hive]> select * from NOTIFICATION_LOG \G
*************************** 1. row ***************************
         NL_ID: 26
      EVENT_ID: 25
    EVENT_TIME: 1713876508
    EVENT_TYPE: ALTER_TABLE
      CAT_NAME: hive
       DB_NAME: default
      TBL_NAME: demo
       MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjBeforeJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1712916672\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","tableObjAfterJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","isTruncateOp":"false","timestamp":1713876508}
MESSAGE_FORMAT: json-0.2
*************************** 2. row ***************************
         NL_ID: 27
      EVENT_ID: 26
    EVENT_TIME: 1713876508
    EVENT_TYPE: INSERT
      CAT_NAME: hive
       DB_NAME: default
      TBL_NAME: demo
       MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","ptnObjJson":null,"timestamp":1713876508,"replace":"false","files":["hdfs://adh2/apps/hive/warehouse/demo/000000_0_copy_3###"]}
MESSAGE_FORMAT: json-0.2
*************************** 3. row ***************************
         NL_ID: 28
      EVENT_ID: 27
    EVENT_TIME: 1713876508
    EVENT_TYPE: ALTER_TABLE
      CAT_NAME: hive
       DB_NAME: default
      TBL_NAME: demo
       MESSAGE: {"server":"thrift://ka-adh-5.ru-central1.internal:9083","servicePrincipal":"","db":"default","table":"demo","tableType":"MANAGED_TABLE","tableObjBeforeJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"20\",\"numRows\":\"3\",\"rawDataSize\":\"17\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"3\",\"transient_lastDdlTime\":\"1713876507\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","tableObjAfterJson":"{\"1\":{\"str\":\"demo\"},\"2\":{\"str\":\"default\"},\"3\":{\"str\":\"hive\"},\"4\":{\"i32\":1712915140},\"5\":{\"i32\":0},\"6\":{\"i32\":0},\"7\":{\"rec\":{\"1\":{\"lst\":[\"rec\",2,{\"1\":{\"str\":\"id\"},\"2\":{\"str\":\"int\"}},{\"1\":{\"str\":\"value\"},\"2\":{\"str\":\"string\"}}]},\"2\":{\"str\":\"hdfs://adh2/apps/hive/warehouse/demo\"},\"3\":{\"str\":\"org.apache.hadoop.mapred.TextInputFormat\"},\"4\":{\"str\":\"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\"},\"5\":{\"tf\":0},\"6\":{\"i32\":-1},\"7\":{\"rec\":{\"2\":{\"str\":\"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\"},\"3\":{\"map\":[\"str\",\"str\",1,{\"serialization.format\":\"1\"}]}}},\"8\":{\"lst\":[\"str\",0]},\"9\":{\"lst\":[\"rec\",0]},\"10\":{\"map\":[\"str\",\"str\",0,{}]},\"11\":{\"rec\":{\"1\":{\"lst\":[\"str\",0]},\"2\":{\"lst\":[\"lst\",0]},\"3\":{\"map\":[\"lst\",\"str\",0,{}]}}},\"12\":{\"tf\":0}}},\"8\":{\"lst\":[\"rec\",0]},\"9\":{\"map\":[\"str\",\"str\",7,{\"totalSize\":\"27\",\"numRows\":\"4\",\"rawDataSize\":\"23\",\"COLUMN_STATS_ACCURATE\":\"{\\\"BASIC_STATS\\\":\\\"true\\\",\\\"COLUMN_STATS\\\":{\\\"id\\\":\\\"true\\\",\\\"value\\\":\\\"true\\\"}}\",\"numFiles\":\"4\",\"transient_lastDdlTime\":\"1713876508\",\"bucketing_version\":\"2\"}]},\"12\":{\"str\":\"MANAGED_TABLE\"},\"15\":{\"tf\":0},\"17\":{\"str\":\"hive\"},\"18\":{\"i32\":1}}","isTruncateOp":"false","timestamp":1713876508}
MESSAGE_FORMAT: json-0.2

В столбце MESSAGE хранится информация об операциях с базами данных/таблицами Hive, состоянии данных до и после операции и так далее. Столбец EVENT_ID отмечает каждый факт обновления данных специальным идентификатором (аналогично коммитам в Git). Это позволяет Hive точечно воспроизводить любую операцию по обновлению данных (например, результат одной или нескольких операций INSERT) в целевом кластере ADH.

РЕКОМЕНДАЦИЯ
Записи событий, генерируемые слушателями, можно непрерывно парсить, чтобы отслеживать обновления сущностей Hive в режиме реального времени.

Начальный и инкрементный этапы репликации

Существует два основных этапа репликации данных:

  • Начальная загрузка (bootstrap). Первичная операция репликации, которая предполагает полное копирование данных из исходной базы данных Hive в целевой кластер. Эта операция выполняется один раз в самом начале процесса репликации. Дальнейшие обновления данных в реплицируемой сущности осуществляются по цепочке относительно начальной загрузки.

  • Инкрементная загрузка (incremental). Все модификации данных, выполняемые в исходной сущности после фазы начальной загрузки, предполагают перенос лишь недостающих данных (дельт) в целевой кластер.

Команды репликации

Hive предоставляет следующие команды для организации процесса репликации.

REPL DUMP

Команда REPL DUMP выполняется в исходном кластере и создает дамп с данными/метаданными Hive в HDFS.

РЕКОМЕНДАЦИЯ
Обычно эта команда выполняется либо по расписанию, регулярно создавая новые дамп-файлы, либо запускается по определенному событию в исходной базе/таблице.

Синтаксис:

REPL DUMP <repl_policy>
    [FROM <evid-start> [TO <end-evid>] [LIMIT <num-evids>] ]
    [WITH ('key1'='value1', 'key2'='value2')];

Выражение <repl_policy> указывает сущность Hive, дамп которой необходимо создать (база данных Hive, таблица или партиция), а также фильтры для включения/исключения сущностей. Синтаксис показан ниже:

<dbname>{{.[<comma_separated_include_tables_regex_list>]}{.[<comma_separated_exclude_tables_regex_list>]}}
ПРИМЕЧАНИЕ
Имена таблиц и представлений в Hive нечувствительны к регистру. Поэтому имена таблиц test_table и TEST_TABLE, указанные в фильтре, относятся к одной и той же сущности.

Наличие предложения FROM <evid-start> [TO <end-evid>] позволяет создавать инкрементный дамп данных. Другими словами, вы можете указать диапазон событий, чтобы включить в дамп лишь те фрагменты данных, которые относятся к этим событиям.

Для команд REPL …​ доступно необязательное выражение WITH, используемое для установки параметров репликации Hive. Указанные таким способом параметры действуют только для одного запроса REPL …​ и не используются для других запросов, выполняющихся в той же сессии. Например, с помощью WITH("hive.repl.rootdir", "/path/to/hdfs/") можно явно переопределить HDFS-директорию для сохранения файлов дампа.

Запуск команды REPL DUMP возвращает результат, как показано ниже.

+------------------------------------------------------+---------------+
|                      dump_dir                        | last_repl_id  |
+------------------------------------------------------+---------------+
| /user/hive/repl/eabe08dd-a524-4b8e-9520-e924cac73761 | 77            |
+------------------------------------------------------+---------------+

Где:

  • dump_dir — HDFS-директория с файлами дампа.

  • last_repl_id — идентификатор, отражающий состояние базы данных/таблицы в момент ее дампа. При последовательном создании множества дампов эти ID необходимы для идентификации дампов и их содержимого относительно друг друга.

Ниже показаны примеры использования REPL DUMP с комментариями.

REPL DUMP hr; (1)
REPL DUMP hr WITH("hive.repl.rootdir", "/custom/hdfs/dir"); (2)
REPL DUMP hr.['employees', '[a-z]+']; (3)
REPL DUMP hr.['.*?'].['Q3[0-9]+', 'Q4']; (4)
REPL DUMP hr FROM 100; (5)
REPL DUMP hr FROM 100 to 1000 (6)
REPL DUMP hr.['[a-z]+'] REPLACE hr FROM 200 (7)
1 Создание начального дампа базы данных hr, включая все таблицы и представления.
2 Указание кастомной директории HDFS для сохранения файлов дампа.
3 Создание дампа таблицы/представления с именем employees, а также включение всех сущностей, соответствующих регулярному выражению [a-z]+, например departments, accounts и так далее.
4 Включает в дамп сущности с любыми именами, кроме Q4 и тех, чье имя содержит префикс Q3, за которым следует числовая строка любой длины (например, Q3100, Q320).
5 Создание инкрементного дампа, который включает в себя данные, начиная с определенного события.
6 Создание инкрементного дампа, который включает в себя данные, попадающие в указанный диапазон событий (100—​1000).
7 Изменение политики репликации "на лету" для текущей инкрементной фазы. Вместо политики hr указывается новая политика, включающая в дамп только те сущности, имена которых состоят из буквенных символов. При дальнейшей загрузке такого дампа Hive автоматически удалит сущности, исключенные новой политикой.

REPL LOAD

Команда REPL LOAD выполняется в целевом кластере для импорта файлов дампа из HDFS в Hive.

Синтаксис:

REPL LOAD <db_name>
    FROM <dir_name>
    [WITH ('key1'='value1', 'key2'='value2')];

Если указано имя <dbname> и дамп является дампом базы данных, Hive переименует базу данных при импорте. Если имя <dbname> не указано, используется исходное имя базы данных (как записано в дампе). В выводе этой команды содержится подробная информация об импортированных сущностях.

Пример вывода
INFO  : Completed compiling command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 0.019 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983): REPL LOAD demo_repl_db_replica FROM '/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804'
INFO  : Starting task [Stage-0:REPL_BOOTSTRAP_LOAD] in serial mode
INFO  : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadStartTime":1716280801}
INFO  : Root Tasks / Total Tasks : 1 / 8
INFO  : completed load task run : 1
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Starting task [Stage-1:DDL] in serial mode
INFO  : Starting task [Stage-2:DDL] in serial mode
INFO  : Starting task [Stage-3:COPY] in serial mode
INFO  : Copying data from hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804/demo_repl_db/demo_repl_tbl/data to hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003
INFO  : Starting task [Stage-4:MOVE] in serial mode
INFO  : Loading data to table demo_repl_db_replica.demo_repl_tbl from hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003
INFO  : Starting task [Stage-5:DDL] in serial mode
INFO  : Starting task [Stage-6:REPL_STATE_LOG] in serial mode
INFO  : REPL::TABLE_LOAD: {"dbName":"demo_repl_db_replica","tableName":"demo_repl_tbl","tableType":"MANAGED_TABLE","tablesLoadProgress":"1/1","loadTime":1716280802}
INFO  : Starting task [Stage-7:REPL_STATE_LOG] in serial mode
INFO  : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadEndTime":1716280802,"dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","lastReplId":"152"}
INFO  : Starting task [Stage-8:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 1.033 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager

REPL STATUS

Команда REPL STATUS запускается на целевом кластере и возвращает последний реплицированный event_id целевой базы данных/таблицы. Используя REPL STATUS, можно узнать, до какого состояния была реплицирована целевая база данных/таблица.

РЕКОМЕНДАЦИЯ
event_id, возвращаемый этой командой, обычно используется для составления последующей команды REPL DUMP для инкрементной репликации.

Синтаксис:

REPL STATUS <db_name>;

Пример вывода:

+---------------+
| last_repl_id  |
+---------------+
| 180           |
+---------------+

Пример процесса репликации

Следующий пример демонстрирует основные этапы репликации. Данный сценарий описывает способ синхронизации двух баз данных Hive с помощью команд репликации. Описанные в примере шаги можно автоматизировать и адаптировать в соответствии с потребностями вашего бизнеса, чтобы превратить их из набора отдельных команд в полноценную утилиту для автоматической репликации.

  1. Создайте тестовую базу данных Hive.

    DROP DATABASE IF EXISTS demo_repl_db;
    CREATE DATABASE demo_repl_db;
    USE demo_repl_db;
  2. В тестовой базе создайте таблицу и заполните ее тестовыми данными.

    CREATE TABLE demo_repl_tbl (
        `txn_id` int,
        `acc_id` int,
        `txn_amount` decimal(10,2),
        `txn_date` date);
    INSERT INTO demo_repl_tbl VALUES
    (1, 1002, 10.00, '2024-01-01'),
    (2, 1002, 20.00, '2024-01-03'),
    (3, 1002, 30.00, '2024-01-02'),
    (4, 1001, 100.50, '2024-01-02'),
    (5, 1001, 150.50, '2024-01-04'),
    (6, 1001, 200.50, '2024-01-03'),
    (7, 1003, 50.00, '2024-01-03'),
    (8, 1003, 50.00, '2024-01-01'),
    (9, 1003, 75.00, '2024-01-04');
  3. Укажите тестовую базу данных в качестве источника репликации и укажите политику репликации с помощью команды:

    ALTER DATABASE demo_repl_db
    SET DBPROPERTIES ("repl.source.for"="testrepl");

    Где testrepl — произвольная строка, используемая для логической идентификации текущей сессии репликации.

  4. Создайте начальный дамп всей базы данных, используя команду:

    REPL DUMP demo_repl_db;

    Запрос возвращает набор результатов с двумя столбцами, как показано ниже.

    +------------------------------------------------------+---------------+
    |                      dump_dir                        | last_repl_id  |
    +------------------------------------------------------+---------------+
    | /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106 | 120           |
    +------------------------------------------------------+---------------+

    Где:

    • last_repl_id — отражает события, которые попали в дамп; этот идентификатор понадобится на следующей фазе репликации.

    • dump_dir — HDFS-директория, в которой были сохранены файлы дампа.

    Выведите содержимое dump_dir с помощью команды:

    $ hdfs dfs -ls -R <dump_dir>

    Пример вывода:

    -rw-r--r--   3 hive hadoop         41 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/_dumpmetadata
    drwxr-xr-x   - hive hadoop          0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db
    -rw-r--r--   3 hive hadoop        257 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/_metadata
    drwxr-xr-x   - hive hadoop          0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl
    -rw-r--r--   3 hive hadoop       1653 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/_metadata
    drwxr-xr-x   - hive hadoop          0 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/data
    -rw-r--r--   3 hive hadoop         73 2024-05-17 16:28 /user/hive/repl/3943a1ca-220a-4cea-95ec-74e39f7bb106/demo_repl_db/demo_repl_tbl/data/_files

    Директория дампа содержит несколько файлов _metadata, в которых хранятся метаданные базы данных/таблиц Hive, а также файл _files. В последнем хранится ссылка на локацию в хранилище Hive, где физически хранятся данные таблицы.

  5. Загрузите дамп в Hive на целевом кластере. Вы можете использовать distcp для копирования дампа между кластерами. При загрузке дампа в один и тот же кластер ADH необходимо указать новое имя для целевой базы данных. Этот подход используется далее в примере.

    REPL LOAD demo_repl_db_replica FROM '<dump_dir>';

    Команда импортирует начальный дамп в Hive без учета отдельных событий. В выводе содержится подробная информация об импортированных сущностях, событиях, типе загрузки и так далее.

    Пример вывода
    INFO  : Completed compiling command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 0.019 seconds
    INFO  : Concurrency mode is disabled, not creating a lock manager
    INFO  : Executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983): REPL LOAD demo_repl_db_replica FROM '/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804'
    INFO  : Starting task [Stage-0:REPL_BOOTSTRAP_LOAD] in serial mode
    INFO  : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadStartTime":1716280801}
    INFO  : Root Tasks / Total Tasks : 1 / 8
    INFO  : completed load task run : 1
    INFO  : Starting task [Stage-0:DDL] in serial mode
    INFO  : Starting task [Stage-1:DDL] in serial mode
    INFO  : Starting task [Stage-2:DDL] in serial mode
    INFO  : Starting task [Stage-3:COPY] in serial mode
    INFO  : Copying data from hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804/demo_repl_db/demo_repl_tbl/data to hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003
    INFO  : Starting task [Stage-4:MOVE] in serial mode
    INFO  : Loading data to table demo_repl_db_replica.demo_repl_tbl from hdfs://adh/apps/hive/warehouse/demo_repl_db_replica.db/demo_repl_tbl/.hive-staging_hive_2024-05-21_08-40-01_721_5415086613559185017-28/-ext-10003
    INFO  : Starting task [Stage-5:DDL] in serial mode
    INFO  : Starting task [Stage-6:REPL_STATE_LOG] in serial mode
    INFO  : REPL::TABLE_LOAD: {"dbName":"demo_repl_db_replica","tableName":"demo_repl_tbl","tableType":"MANAGED_TABLE","tablesLoadProgress":"1/1","loadTime":1716280802}
    INFO  : Starting task [Stage-7:REPL_STATE_LOG] in serial mode
    INFO  : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"BOOTSTRAP","numTables":1,"numFunctions":0,"loadEndTime":1716280802,"dumpDir":"hdfs://adh/user/hive/repl/dc680c35-945d-4594-a1cd-43af67792804","lastReplId":"152"}
    INFO  : Starting task [Stage-8:DDL] in serial mode
    INFO  : Completed executing command(queryId=hive_20240521084001_1b49dadd-08b4-4b5d-ac7e-c61eb882a983); Time taken: 1.033 seconds
    INFO  : OK
    INFO  : Concurrency mode is disabled, not creating a lock manager

    В результате выполнения команды Hive создает новую базу данных:

    +-----------------------+
    |     database_name     |
    +-----------------------+
    | default               |
    | demo_repl_db          |
    | demo_repl_db_replica  | <-
    +-----------------------+

    На данном этапе исходная и целевая базы данных Hive имеют идентичное содержимое, так как данные были скопированы полностью.

  6. Добавьте данные в исходную таблицу:

    INSERT INTO demo_repl_tbl VALUES
    (10, 1003, 60.00, '2024-01-03'),
    (11, 1003, 50.00, '2024-01-05'),
    (12, 1003, 75.00, '2024-01-05');
  7. Чтобы перенести результаты операции INSERT в целевую базу данных, создайте еще один дамп, как показано ниже:

    REPL DUMP demo_repl_db
    FROM <last_repl_id>; (1)
    1 Вместо <last_repl_id> используйте идентификатор, полученный предыдущим запросом REPL DUMP.

    Наличие предложения FROM <last_repl_id> делает дамп инкрементным, то есть в дамп попадут только те данные, которые отсутствуют в целевом кластере. Приведенный выше запрос можно интерпретировать следующим образом: "создать дамп, включающий все обновления исходной базы данных, начиная с события <last_repl_id>".

    Пример вывода:

    +-----------------------------------------------------+---------------+
    |                      dump_dir                       | last_repl_id  |
    +-----------------------------------------------------+---------------+
    | /user/hive/repl/765fc939-4270-4aca-bb9d-e0eada6b5980| 165           |
    +-----------------------------------------------------+---------------+

    Как и в предыдущей фазе репликации, новая директория dump_dir содержит файл _files, который указывает на расположение данных дампа. Пример содержимого _files показан ниже.

    [admin@ka-adh-2 ~]$ hdfs dfs -cat /user/hive/repl/765fc939-4270-4aca-bb9d-e0eada6b5980/164/data/_files
    hdfs://adh/apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1###

    Выведите содержимое файла /apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1:

    $ hdfs dfs -cat /apps/hive/warehouse/demo_repl_db.db/demo_repl_tbl/000000_0_copy_1

    Следующий вывод показывает, что в дамп были включены только записи, отсутствующие в целевой базе данных (дельта).

    7100360.002024-01-03
    8100350.002024-01-05
    9100375.002024-01-05
  8. Загрузите инкрементный дамп в целевую базу данных:

    REPL LOAD demo_repl_db_replica
    FROM '<dump_dir>'
  9. Выполните SELECT для исходной и целевой таблицы, чтобы убедиться, что обе сущности содержат одинаковые данные.

    demo_repl_db.demo_repl_tbl demo_repl_db_replica.demo_repl_tbl
    SELECT * FROM demo_repl_db.demo_repl_tbl AS src;
    SELECT * FROM demo_repl_db_replica.demo_repl_tbl AS tgt;
    +-------------+-------------+-----------------+---------------+
    | src.txn_id  | src.acc_id  | src.txn_amount  | src.txn_date  |
    +-------------+-------------+-----------------+---------------+
    | 1           | 1002        | 10.00           | 2024-01-01    |
    | 2           | 1002        | 20.00           | 2024-01-03    |
    | 3           | 1002        | 30.00           | 2024-01-02    |
    | 4           | 1001        | 100.50          | 2024-01-02    |
    | 5           | 1001        | 150.50          | 2024-01-04    |
    | 6           | 1001        | 200.50          | 2024-01-03    |
    | 7           | 1003        | 50.00           | 2024-01-03    |
    | 8           | 1003        | 50.00           | 2024-01-01    |
    | 9           | 1003        | 75.00           | 2024-01-04    |
    | 10          | 1003        | 60.00           | 2024-01-03    |
    | 11          | 1003        | 50.00           | 2024-01-05    |
    | 12          | 1003        | 75.00           | 2024-01-05    |
    +-------------+-------------+-----------------+---------------+
    +-------------+-------------+-----------------+---------------+
    | tgt.txn_id  | tgt.acc_id  | tgt.txn_amount  | tgt.txn_date  |
    +-------------+-------------+-----------------+---------------+
    | 1           | 1002        | 10.00           | 2024-01-01    |
    | 2           | 1002        | 20.00           | 2024-01-03    |
    | 3           | 1002        | 30.00           | 2024-01-02    |
    | 4           | 1001        | 100.50          | 2024-01-02    |
    | 5           | 1001        | 150.50          | 2024-01-04    |
    | 6           | 1001        | 200.50          | 2024-01-03    |
    | 7           | 1003        | 50.00           | 2024-01-03    |
    | 8           | 1003        | 50.00           | 2024-01-01    |
    | 9           | 1003        | 75.00           | 2024-01-04    |
    | 10          | 1003        | 60.00           | 2024-01-03    |
    | 11          | 1003        | 50.00           | 2024-01-05    |
    | 12          | 1003        | 75.00           | 2024-01-05    |
    +-------------+-------------+-----------------+---------------+
  10. Удалите тестовую таблицу в исходной базе данных:

    USE demo_repl_db;
    DROP TABLE demo_repl_db.demo_repl_tbl;
  11. Выполните еще один цикл репликации, чтобы перенести результат операции DROP TABLE в целевую базу данных. Для этого выполните следующие действия.

    Создайте дамп:

    REPL DUMP {demo_repl_db}
    FROM <last_repl_id>; (1)
    1 Вместо <last_repl_id> используйте идентификатор, полученный предыдущим запросом REPL DUMP.

    Пример вывода:

    +-----------------------------------------------------+---------------+
    |                      dump_dir                       | last_repl_id  |
    +-----------------------------------------------------+---------------+
    | /user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72| 180           |
    +-----------------------------------------------------+---------------+

    Загрузите дамп в Hive:

    REPL LOAD {demo_repl_db} FROM '<dump_dir>';

    Строка "eventType": "EVENT_DROP_TABLE" в выводе подтверждает, что результаты операции DROP TABLE были импортированы в целевую таблицу.

    Пример вывода
    INFO  : Compiling command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e): repl load demo_repl_db_replica from '/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72'
    INFO  : Concurrency mode is disabled, not creating a lock manager
    INFO  : REPL::START: {"dbName":"demo_repl_db_replica","dumpDir":"hdfs://adh/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72","loadType":"INCREMENTAL","numEvents":1,"loadStartTime":1716294874}
    INFO  : Semantic Analysis Completed (retrial = false)
    INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
    INFO  : EXPLAIN output for queryid hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e : STAGE DEPENDENCIES:
      Stage-0 is a root stage [DEPENDENCY_COLLECTION]
      Stage-1 depends on stages: Stage-0 [DDL]
      Stage-2 depends on stages: Stage-1 [DEPENDENCY_COLLECTION]
      Stage-3 depends on stages: Stage-2 [DDL]
      Stage-4 depends on stages: Stage-3 [REPL_STATE_LOG]
      Stage-5 depends on stages: Stage-4 [REPL_STATE_LOG]
    
    STAGE PLANS:
      Stage: Stage-0
        Dependency Collection
    
      Stage: Stage-1
          Drop Table Operator:
            Drop Table
              table: demo_repl_db_replica.demo_repl_tbl
    
      Stage: Stage-2
        Dependency Collection
    
      Stage: Stage-3
    
      Stage: Stage-4
        Repl State Log
    
      Stage: Stage-5
        Repl State Log
    
    
    INFO  : Completed compiling command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e); Time taken: 0.019 seconds
    INFO  : Concurrency mode is disabled, not creating a lock manager
    INFO  : Executing command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e): repl load demo_repl_db_replica from '/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72'
    INFO  : Starting task [Stage-0:DEPENDENCY_COLLECTION] in serial mode
    INFO  : Starting task [Stage-1:DDL] in serial mode
    INFO  : Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode
    INFO  : Starting task [Stage-3:DDL] in serial mode
    INFO  : Starting task [Stage-4:REPL_STATE_LOG] in serial mode
    INFO  : REPL::EVENT_LOAD: {"dbName":"demo_repl_db_replica","eventId":"180","eventType":"EVENT_DROP_TABLE","eventsLoadProgress":"1/1","loadTime":1716294874}
    INFO  : Starting task [Stage-5:REPL_STATE_LOG] in serial mode
    INFO  : REPL::END: {"dbName":"demo_repl_db_replica","loadType":"INCREMENTAL","numEvents":1,"loadEndTime":1716294874,"dumpDir":"hdfs://adh/user/hive/repl/b42d7198-ae5e-4d41-859e-8bfef5061b72","lastReplId":"180"}
    INFO  : Completed executing command(queryId=hive_20240521123434_be3ff732-f5a2-4ff9-9b66-3da63144710e); Time taken: 0.183 seconds
    INFO  : OK
    INFO  : Concurrency mode is disabled, not creating a lock manager
    No rows affected (0.207 seconds)
  12. Выполните SHOW TABLES для исходной и целевой базы данных и убедитесь, что таблица demo_repl_tbl отсутствует в обеих базах данных Hive.

Ограничения

Существуют некоторые ограничения по использованию функционала репликации Hive:

  • Реплицировать можно только managed-таблицы. Внешние (external) таблицы реплицируются как managed-таблицы.

  • Репликация ACID-таблиц не поддерживается.

  • Пользователь hive должен быть владельцем содержимого таблиц, которое хранится в HDFS.

  • Текущие возможности репликации не подходят для балансировки нагрузки, поскольку нет встроенного механизма, который гарантировал бы синхронизацию исходных и целевых сущностей в любой момент времени. Балансировка нагрузки должна быть реализована с помощью внешних инструментов или кастомного программного обеспечения.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней