Mirror Maker 2
Обзор репликации на базе Kafka Connect
MirrorMaker 2.0 — основанный на платформе сервиса Kafka Connect механизм репликации данных из исходного кластера на удаленный.
Kafka Connect доступен к установке в ADS, начиная с версии 1.7.1.
При архитектуре репликации Active/Active кластеры получают данные непосредственно от источников данных и реплицированные данные, вводимые из удаленного кластера.
При архитектуре репликации Active/Standby целевoй кластер находится в пассивном режиме (не имеет подключенных к нему потребителей и производителей) и получает только реплицированные данные.
На рисунке показана последовательность механизма репликации.
Для перемещения данных используется высокоуровневая пара потребитель (consumer) - производитель (producer). Потребитель читает и обрабатывает исходный топик, производитель помещает данные в топик-реплику.
Replicator — специальный компонент сервиса Kafka Connect, который обращается к данным ZooKeeper, копирует метаданные топика и создает такой же топик в целевом кластере.
MirrorMaker 2.0 в составе Kafka Connect создает специальные коннекторы для включения сложных потоков между кластерами ADS:
-
MirrorSourceConnector осуществляет репликацию топиков из исходного кластера в целевой кластер.
-
MirrorCheckpointConnector создает контрольные точки смещения потребителя и синхронизирует смещение со служебным топиком
__consumer_offsets
исходного кластера. -
MirrorHeartbeatConnector периодически проверяет подключение между кластерами, создавая сообщения в специальном топике
heartbeats
в исходном кластере через заданный период времени и считывая их в целевом кластере.
ПРИМЕЧАНИЕ
В данной статье описана репликация топиков при помощи встроенных средств Mirror Maker 2 в Kafka. Настройка Mirror Maker 2 c использованием ADS Control описана в статье Mirror Maker 2 в ADS Control. |
Настройка
Репликация топика на удаленый кластер осуществляется при помощи скрипта connect-mirror-maker.sh c любого хоста целевого кластера ADS.
Скрипт connect-mirror-maker.sh создает и настраивает коннекторы на основе предоставленного файла свойств mm2.properties.
Для настройки конфигурационного файла mm2.properties необходимо:
-
Ввести команду для открытия (создания) файла:
$ sudo vim /usr/lib/kafka/bin/mm2.properties
-
Заполнить файл данными:
clusters = <source_cluster_alias>, <target_cluster_alias> <target_cluster_alias>.bootstrap.servers = hostname1:9092,hostname2:9092,hostname3:9092 <source_cluster_alias>.bootstrap.servers = hostname4:9092,hostname5:9092,hostname6:9092 topics topics =<test_topic_name> groups = * <source_cluster_alias>-><target_cluster_alias>.enabled=true replication.factor=3 checkpoints.topic.replication.factor=3 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3
Описание содержания mm2.propertiesПараметры Описание clusters = <source_cluster_alias>, <target_cluster_alias>
Имена для каждого кластера, которые будут использоваться в репликации
<source_cluster_alias>.bootstrap.servers = hostname1:9092,hostname2:9092,hostname3:9092
<target_cluster_alias>.bootstrap.servers = hostname4:9092,hostname5:9092,hostname6:9092
Информация о соединении для каждого кластера — пары
хост:порт 9092
, разделенные запятыми<source_cluster_alias>→<target_cluster_alias>.enabled = true
Включение и обозначение направления репликации, в примере включена репликация из кластера
<source_cluster_alias>
в кластер<target_cluster_alias>
topics
topics = <test_topic_name>
groups = *
Название топиков или групп, предназначенных для репликации
replication.factor=1
Коэффициент репликации вновь созданных топиков-реплик
heartbeats.topic.replication.factor=3
checkpoints.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
Фактор репликации для служебных топиков MM2:
-
heartbeats
— топик проверки соединения в каждом исходном кластере, который реплицируется для демонстрации подключения через коннекторы. Сообщения в этом топике содержат информацию об исходном кластере, целевом кластере и отметке времени создания heartbeat. -
source.checkpoints.internal
— топик в целевом кластере для записи смещений для каждой группы потребителей в исходном кластере. -
mm2-offset-syncs.<target_cluster_alias>.internal
— заполняется MirrorSourceConnector, а затем используется MirrorCheckpointConnector для изменения смещений группы потребителей.
offset.storage.replication.factor=3
status.storage.replication.factor=3 config.storage.replication.factor=3
Фактор репликации для создания внутренних топиков кластеров:
-
mm2-configs.<target_cluster_alias>.internal
-
mm2-offsets.<target_cluster_alias>.internal
-
mm2-status.<target_cluster_alias>.internal
replication.policy.separator = _
Если значение оставить пустым, топик-реплика будет иметь то же имя, что и исходный топик
sync.topic.acls.enabled = false
Включение мониторинга исходного кластера на предмет изменений ACL
emit.heartbeats.interval.seconds = 5
Интервал для сообщений heartbeat
-
Проверка репликации топиков
Для того чтобы запустить скрипт connect-mirror-maker.sh и создать коннекторы, необходимо ввести на любом брокере целевого кластера команду:
$ /usr/lib/kafka/bin/connect-mirror-maker.sh mm2.properties
Далее запускаются коннекторы, которые циклично опрашивают исходный топик для считывания сообщений. Эту терминальную сессию можно не завершать, тогда сообщения можно будет считывать в реальном времени.
В процессе работы коннекторов создаются внутренние топики кластера:
-
на стороне исходного кластера:
-
mm2-configs.<target_cluster_alias>.internal
-
mm2-offset-syncs.<target_cluster_alias>.internal
-
mm2-offsets.<target_cluster_alias>.internal
-
mm2-status.<target_cluster_alias>.internal
где
<target_cluster_alias>
соответствует обозначению целевого кластера, введенному при создании mm2.properties.
-
-
на стороне целевого кластера:
-
mm2-configs.<source_cluster_alias>.internal
-
mm2-offset-syncs.<source_cluster_alias>.internal
-
mm2-offsets.<source_cluster_alias>.internal
-
mm2-status.<source_cluster_alias>.internal
где
<source_cluster_alias>
соответствует обозначению исходного кластера, введенному при создании mm2.properties.
-
Для того чтобы проверить наличие служебных топиков, необходимо ввести на любом брокере кластера (в новой терминальной сессии) команду для вывода списка топиков:
$ /usr/lib/kafka/bin/kafka-topics.sh --list --bootstrap-server hostname:9092
В списке топиков должны присутствовать нужные топики.
Создание реплицируемого исходного топика и запись в него сообщений происходит на стороне исходного кластера.
Для создания исходного топика необходимо ввести команду:
$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic <test_topic_name> --bootstrap-server hostname:9092
где <test_topic_name>
— имя исходного топика, введенное при создании mm2.properties.
Для записи сообщений в исходный топик необходимо ввести команду:
$ /usr/lib/kafka/bin/kafka-console-producer.sh --topic <test_topic_name> --bootstrap-server hostname:9092
После записи сообщений в исходный топик в списке топиков на целевом кластере появляется топик-реплика с названием в формате <source_cluster_alias>.<test_topic_name>
, где:
-
<source_cluster_alias> — соответствующее обозначение исходного кластера, введенное при создании mm2.properties.
-
<test_topic_name> — имя исходного топика, введенное при создании mm2.properties.
Для того чтобы считать сообщения из топика-реплики на целевом кластере, необходимо ввести команду:
$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic <source_cluster_alias>.<test_topic_name> --from-beginning --bootstrap-server hostname:9092
Считываемые сообщения в целевом кластере полностью повторяют записанные в исходном кластере.