Mirror Maker 2

Обзор репликации на базе Kafka Connect

MirrorMaker 2.0 — основанный на платформе сервиса Kafka Connect механизм репликации данных из исходного кластера на удаленный.

Kafka Connect доступен к установке в ADS, начиная с версии 1.7.1.

При архитектуре репликации Active/Active кластеры получают данные непосредственно от источников данных и реплицированные данные, вводимые из удаленного кластера.

При архитектуре репликации Active/Standby целевoй кластер находится в пассивном режиме (не имеет подключенных к нему потребителей и производителей) и получает только реплицированные данные.

На рисунке показана последовательность механизма репликации.

Схема репликации с использованием Mirror Maker 2
Схема репликации с использованием Mirror Maker 2
Схема репликации с использованием Mirror Maker 2
Схема репликации с использованием Mirror Maker 2

Для перемещения данных используется высокоуровневая пара потребитель (consumer) - производитель (producer). Потребитель читает и обрабатывает исходный топик, производитель помещает данные в топик-реплику.

Replicator — специальный компонент сервиса Kafka Connect, который обращается к данным ZooKeeper, копирует метаданные топика и создает такой же топик в целевом кластере.

MirrorMaker 2.0 в составе Kafka Connect создает специальные коннекторы для включения сложных потоков между кластерами ADS:

  • MirrorSourceConnector осуществляет репликацию топиков из исходного кластера в целевой кластер.

  • MirrorCheckpointConnector создает контрольные точки смещения потребителя и синхронизирует смещение со служебным топиком __consumer_offsets исходного кластера.

  • MirrorHeartbeatConnector периодически проверяет подключение между кластерами, создавая сообщения в специальном топике heartbeats в исходном кластере через заданный период времени и считывая их в целевом кластере.

ПРИМЕЧАНИЕ

В данной статье описана репликация топиков при помощи встроенных средств Mirror Maker 2 в Kafka. Настройка Mirror Maker 2 c использованием ADS Control описана в статье Mirror Maker 2 в ADS Control.

Настройка

Репликация топика на удаленый кластер осуществляется при помощи скрипта connect-mirror-maker.sh c любого хоста целевого кластера ADS.

Скрипт connect-mirror-maker.sh создает и настраивает коннекторы на основе предоставленного файла свойств mm2.properties.

Для настройки конфигурационного файла mm2.properties необходимо:

  1. Ввести команду для открытия (создания) файла:

    $ sudo vim /usr/lib/kafka/bin/mm2.properties
  2. Заполнить файл данными:

    clusters = <source_cluster_alias>, <target_cluster_alias>
    <target_cluster_alias>.bootstrap.servers = hostname1:9092,hostname2:9092,hostname3:9092
    <source_cluster_alias>.bootstrap.servers = hostname4:9092,hostname5:9092,hostname6:9092
    
    topics
    topics =<test_topic_name>
    groups = *
    
    <source_cluster_alias>-><target_cluster_alias>.enabled=true
    
    replication.factor=3
    
    checkpoints.topic.replication.factor=3
    heartbeats.topic.replication.factor=3
    offset-syncs.topic.replication.factor=3
    
    offset.storage.replication.factor=3
    status.storage.replication.factor=3
    config.storage.replication.factor=3
    Описание содержания mm2.properties
    Параметры Описание

    clusters = <source_cluster_alias>, <target_cluster_alias>

    Имена для каждого кластера, которые будут использоваться в репликации

    <source_cluster_alias>.bootstrap.servers = hostname1:9092,hostname2:9092,hostname3:9092

    <target_cluster_alias>.bootstrap.servers = hostname4:9092,hostname5:9092,hostname6:9092

    Информация о соединении для каждого кластера — пары хост:порт 9092, разделенные запятыми

    <source_cluster_alias>→<target_cluster_alias>.enabled = true

    Включение и обозначение направления репликации, в примере включена репликация из кластера <source_cluster_alias> в кластер <target_cluster_alias>

    topics

    topics = <test_topic_name>

    groups = *

    Название топиков или групп, предназначенных для репликации

    replication.factor=1

    Коэффициент репликации вновь созданных топиков-реплик

    heartbeats.topic.replication.factor=3

    checkpoints.topic.replication.factor=3

    offset-syncs.topic.replication.factor=3

    Фактор репликации для служебных топиков MM2:

    • heartbeats — топик проверки соединения в каждом исходном кластере, который реплицируется для демонстрации подключения через коннекторы. Сообщения в этом топике содержат информацию об исходном кластере, целевом кластере и отметке времени создания heartbeat.

    • source.checkpoints.internal — топик в целевом кластере для записи смещений для каждой группы потребителей в исходном кластере.

    • mm2-offset-syncs.<target_cluster_alias>.internal — заполняется MirrorSourceConnector, а затем используется MirrorCheckpointConnector для изменения смещений группы потребителей.

    offset.storage.replication.factor=3

    status.storage.replication.factor=3 config.storage.replication.factor=3

    Фактор репликации для создания внутренних топиков кластеров:

    • mm2-configs.<target_cluster_alias>.internal

    • mm2-offsets.<target_cluster_alias>.internal

    • mm2-status.<target_cluster_alias>.internal

    replication.policy.separator = _

    Если значение оставить пустым, топик-реплика будет иметь то же имя, что и исходный топик

    sync.topic.acls.enabled = false

    Включение мониторинга исходного кластера на предмет изменений ACL

    emit.heartbeats.interval.seconds = 5

    Интервал для сообщений heartbeat

Проверка репликации топиков

Для того чтобы запустить скрипт connect-mirror-maker.sh и создать коннекторы, необходимо ввести на любом брокере целевого кластера команду:

$ /usr/lib/kafka/bin/connect-mirror-maker.sh mm2.properties

Далее запускаются коннекторы, которые циклично опрашивают исходный топик для считывания сообщений. Эту терминальную сессию можно не завершать, тогда сообщения можно будет считывать в реальном времени.

В процессе работы коннекторов создаются внутренние топики кластера:

  • на стороне исходного кластера:

    • mm2-configs.<target_cluster_alias>.internal

    • mm2-offset-syncs.<target_cluster_alias>.internal

    • mm2-offsets.<target_cluster_alias>.internal

    • mm2-status.<target_cluster_alias>.internal

      где <target_cluster_alias> соответствует обозначению целевого кластера, введенному при создании mm2.properties.

  • на стороне целевого кластера:

    • mm2-configs.<source_cluster_alias>.internal

    • mm2-offset-syncs.<source_cluster_alias>.internal

    • mm2-offsets.<source_cluster_alias>.internal

    • mm2-status.<source_cluster_alias>.internal

      где <source_cluster_alias> соответствует обозначению исходного кластера, введенному при создании mm2.properties.

Для того чтобы проверить наличие служебных топиков, необходимо ввести на любом брокере кластера (в новой терминальной сессии) команду для вывода списка топиков:

$ /usr/lib/kafka/bin/kafka-topics.sh --list --bootstrap-server hostname:9092

В списке топиков должны присутствовать нужные топики.

Создание реплицируемого исходного топика и запись в него сообщений происходит на стороне исходного кластера.

Для создания исходного топика необходимо ввести команду:

$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic <test_topic_name> --bootstrap-server hostname:9092

где <test_topic_name> — имя исходного топика, введенное при создании mm2.properties.

Для записи сообщений в исходный топик необходимо ввести команду:

$ /usr/lib/kafka/bin/kafka-console-producer.sh --topic <test_topic_name> --bootstrap-server hostname:9092

После записи сообщений в исходный топик в списке топиков на целевом кластере появляется топик-реплика с названием в формате <source_cluster_alias>.<test_topic_name>, где:

  • <source_cluster_alias> — соответствующее обозначение исходного кластера, введенное при создании mm2.properties.

  • <test_topic_name> — имя исходного топика, введенное при создании mm2.properties.

Для того чтобы считать сообщения из топика-реплики на целевом кластере, необходимо ввести команду:

$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic <source_cluster_alias>.<test_topic_name> --from-beginning --bootstrap-server hostname:9092

Считываемые сообщения в целевом кластере полностью повторяют записанные в исходном кластере.

Нашли ошибку? Выделите текст и нажмите Ctrl+Enter чтобы сообщить о ней