Mirror Maker 2

Overview of replication based on Kafka Connect

MirrorMaker 2.0 — mechanism for replicating data from a source cluster to a remote one, based on the Kafka Connect service platform.

Kafka Connect is available for installation in ADS starting from version 1.7.1.

With the Active/Active replication architecture, clusters receive data directly from data sources, and replicated data entered from a remote cluster.

With Active/Standby replication architecture target cluster is in passive mode (has no consumers and producers connected to it) and receives only replicated data.

The figure shows the sequence of the replication mechanism.

mm2 dark
Replication scheme using Mirror Maker 2
mm2 light
Replication scheme using Mirror Maker 2

To move data, a high-level pair consumer - producer is used. The consumer reads and processes the original topic, the producer places the data in the replica topic.

Replicator — a special component of the Kafka Connect service that accesses ZooKeeper data, copies the topic metadata, and creates the same topic in the target cluster.

MirrorMaker 2.0 as part of Kafka Connect creates special connectors to enable complex flows between ADS clusters:

  • MirrorSourceConnector replicates topics from a source cluster to a target cluster.

  • MirrorCheckpointConnector creates consumer offset checkpoints and synchronizes the offset with the __consumer_offsets service topic of the source cluster.

  • MirrorHeartbeatConnector periodically checks connectivity between clusters by creating messages in a special heartbeats topic on the source cluster after a given period of time and reading them in the target cluster.

NOTE

This article describes how to replicate topics using the built-in Mirror Maker 2 tools in Kafka. Setting up Mirror Maker 2 using ADS Command Center is described in Mirror Maker 2 in ADSCC.

Setting

A topic is replicated to a remote cluster using the connect-mirror-maker.sh script from any host of the target ADS cluster.

The connect-mirror-maker.sh script creates and configures connectors based on the provided properties file mm2.properties.

To set up the mm2.properties configuration file, do the following:

  1. Enter the command to open (create) the file:

    $ sudo vim /usr/lib/kafka/bin/mm2.properties
  2. Fill in the file with data:

    clusters = <source_cluster_alias>, <target_cluster_alias>
    <target_cluster_alias>.bootstrap.servers = hostname1:9092,hostname2:9092,hostname3:9092
    <source_cluster_alias>.bootstrap.servers = hostname4:9092,hostname5:9092,hostname6:9092
    
    topics
    topics =<test_topic_name>
    groups = *
    
    <source_cluster_alias>-><target_cluster_alias>.enabled=true
    
    replication.factor=3
    
    checkpoints.topic.replication.factor=3
    heartbeats.topic.replication.factor=3
    offset-syncs.topic.replication.factor=3
    
    offset.storage.replication.factor=3
    status.storage.replication.factor=3
    config.storage.replication.factor=3
    mm2.properties contents description
    Parameters Description

    clusters = <source_cluster_alias>, <target_cluster_alias>

    Names for each cluster to be used in the replication

    <source_cluster_alias>.bootstrap.servers = hostname1:9092,hostname2:9092,hostname3:9092

    <target_cluster_alias>.bootstrap.servers = hostname4:9092,hostname5:9092,hostname6:9092

    Connection information for each cluster — comma-separated host:port 9092 pairs

    <source_cluster_alias>→<target_cluster_alias>.enabled = true

    Enabling and designating the direction of replication. In the example, replication is enabled from the <source_cluster_alias> cluster to the <target_cluster_alias> cluster

    topics

    topics = <test_topic_name>

    groups = *

    The name of the topics or groups intended for replication

    replication.factor=1

    Replication ratio of newly created replica topics

    heartbeats.topic.replication.factor=3

    checkpoints.topic.replication.factor=3

    offset-syncs.topic.replication.factor=3

    Replication factor for MM2 service topics:

    • heartbeats — heartbeat topic in each source cluster that is replicated to demonstrate connectivity through connectors. The messages in this topic contain information about the source cluster, target cluster, and the timestamp of the heartbeat.

    • source.checkpoints.internal — topic in target cluster to record offsets for each consumer group in source cluster.

    • mm2-offset-syncs.<target_cluster_alias>.internal — populates the MirrorSourceConnector and then uses the MirrorCheckpointConnector to change consumer group offsets.

    offset.storage.replication.factor=3

    status.storage.replication.factor=3 config.storage.replication.factor=3

    Replication factor for creating internal cluster topics:

    • mm2-configs.<target_cluster_alias>.internal

    • mm2-offsets.<target_cluster_alias>.internal

    • mm2-status.<target_cluster_alias>.internal

    replication.policy.separator = _

    If the value is left blank, the replica topic will have the same name as the original topic

    sync.topic.acls.enabled = false

    Enable monitoring of the source cluster for ACL changes

    emit.heartbeats.interval.seconds = 5

    Interval for heartbeat messages

Topic replication check

In order to run the connect-mirror-maker.sh script and create connectors, enter the following command on any broker of the target cluster:

$ /usr/lib/kafka/bin/bin/connect-mirror-maker.sh mm2.properties

Next, connectors are launched that cyclically poll the source topic to read messages. This terminal session may not be terminated, then the messages can be read in real time.

During the operation of the connectors, internal cluster topics are created:

  • on the side of the source cluster:

    • mm2-configs.<target_cluster_alias>.internal

    • mm2-offset-syncs.<target_cluster_alias>.internal

    • mm2-offsets.<target_cluster_alias>.internal

    • mm2-status.<target_cluster_alias>.internal

      where <target_cluster_alias> corresponds to the target cluster alias entered when mm2.properties was created.

  • on the side of the target cluster:

    • mm2-configs.<source_cluster_alias>.internal

    • mm2-offset-syncs.<source_cluster_alias>.internal

    • mm2-offsets.<source_cluster_alias>.internal

    • mm2-status.<source_cluster_alias>.internal

      where <source_cluster_alias> corresponds to the source cluster alias entered when creating mm2.properties.

In order to check the presence of service topics, you must enter the command on any cluster broker (in a new terminal session) to display a list of topics:

$ /usr/lib/kafka/bin/kafka-topics.sh --list --bootstrap-server hostname:9092

The list of topics should contain the necessary topics.

The creation of a replicated source topic and the writing of messages to it occurs on the side of the source cluster.

To create an source topic, enter the command:

$ /usr/lib/kafka/bin/kafka-topics.sh --create --topic <test_topic_name> --bootstrap-server hostname:9092

where <test_topic_name> — the name of the source topic, as entered when creating mm2.properties.

To write messages to the original topic, enter the command:

$ /usr/lib/kafka/bin/kafka-console-producer.sh --topic <test_topic_name> --bootstrap-server hostname:9092

After writing messages to the source topic in the list of topics on the target cluster, a replica topic appears with the name in the format <source_cluster_alias>.<test_topic_name>, where:

  • <source_cluster_alias> is the corresponding source cluster alias entered when creating mm2.properties.

  • <test_topic_name> is the name of the original topic, entered when creating mm2.properties.

In order to read messages from the replica topic on the target cluster, enter the command:

$ /usr/lib/kafka/bin/kafka-console-consumer.sh --topic <source_cluster_alias>.<test_topic_name> --from-beginning --bootstrap-server hostname:9092

Read messages in the target cluster completely repeat those written in the source cluster.

Found a mistake? Seleсt text and press Ctrl+Enter to report it