{{announcement.body}}
{{announcement.title}}

Mirror Maker v2.0

DZone 's Guide to

Mirror Maker v2.0

Disaster recovery, replication and aggregation among Kafka datacenters.

· DevOps Zone ·
Free Resource

Before we start let's make some abbreviations.

  • Mirror maker v1.0 -> mmv1
  • Mirror maker v2.0 -> mmv2

Find the Project

Find all the stuff on this document over here. Here is what the repository contains.

  • docker_kafka has instructions on how to build Kafka for Docker. Check this. Otherwise, download the Docker image from my Dockerhub.

  • Shell
     




    xxxxxxxxxx
    1


     
    1
    docker pull lozuwa/kafka:v2.5.0


  • The mmv2 folder has yamls to deploy Kafka nodes. Use this script to deploy the Kafka nodes.
  • After deploying the Kafka nodes, run another Docker container to start the mirror maker v2. Use this script.

Have questions or wanna know more? Follow me on twitter Rodrigo_Loza_L

Introduction 

The new release of mirror maker introduces a lot of new features usable in disaster recovery, backup, streaming and aggregation scenarios. Before Kafka mirror maker, the Kafka replicator product form confluence allowed some of the same features. For me, I built a lot of custom code to provide mmv1 the features of mmv2. Not as dynamic or scalable but still usable. 

This document presents a tutorial as detailed as possible of mmv2. Specifically, details the latest release on May 29th which is Kafka 2.5.0.

Active to active vs active to passive

Comparison Between mmv1 and mmv2

There are a lot of differences architecturereplication policies, monitoring, etc. Here is a summary.

Mirror Maker v1.0

  • Consumer/Producer architecture.
  • Topic replication but configurations are not copied.
  • Monitoring via consumer group id lag.
  • SSL/SASL support.

mmv1

Mirror Maker v2.0

  • Kafka connectors architecture.
  • Checkpoints, offsets and topics replicated.
  • Topics have their configurations copied.
  • Monitoring via Kafka connect integration (This is still a dark place).
  • SSL/SASL support.
  • Multiple deployment modes.
    • MM Kafka dedicated cluster
    • Connect standalone
    • Connectors deployed in a distributed connect cluster

MirrorSourceConnector and workers

Examples

Let's move onto the examples. I suggest you read the catch-ups before we start, most of these are neither documented nor explicitly detailed. Thus, these might be useful to debug problems.

Catch-ups

  • Topic configurations are not immediately copied, instead the config copy job is periodic with a default of 10 minutes. Therefore, the configs are eventually copied.
  • If you need a faster copy of configurations, restart the mirror maker. At start configurations are copied.
  • Default replication factor for topics is 2 (hardcoded in the code). Make sure to replace it depending on your node/cluster.
  • The following configurations are related to replication factor. The default value is 3. In this sense, if you are using a single node, then Exceptions will pop up. Make sure to configure them according to your cluster.
    • config.storage.replication.factor
    • offset.storage.replication.factor
    • status.storage.replication.factor
  • The following configuration are related to replication factor as well. Nonetheless, these belong to mmv2 itself and also have a default value set to 3.
    • offset-syncs.topic.replication.factor
    • heartbeats.topic.replication.factor
    • checkpoints.topic.replication.factor
  • There is an interface in mmv2 code that blacklists internal topics which follow the pattern (.internal|-internal|__.*). This is not configurable, you will have to override the method.
  • Topic blacklists default to topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*] Consider this value if you see your topics are not being replicated.
  • Groups blacklists default to groups.blacklist = [console-consumer-.*, connect-.*, __.*] Consider this value if you see your groups, offsets are not being replicated.
  • Remote topics might be tricky. The DefaultReplicationPolicy class will hardcode the value remote topic to source-cluster-alias.topic-name
  • Producers must be as close as possible while consumers can be remote. This means we oftenly would like to run the mmv2 dedicated cluster as close as possible to the target DC.

Active/Passive Datacenter Replication

Architecture

Architecture

Configuration File

Create a file named mm2.properties and fill it with the following content.

Properties files
 




x



1
# Kafka datacenters.
2
clusters = source, target
3
source.bootstrap.servers = kafka-source:9092
4
target.bootstrap.servers = kafka-target:9092
5
 
          
6
# Source and target cluster configurations.
7
source.config.storage.replication.factor = 1
8
target.config.storage.replication.factor = 1
9
 
          
10
source.offset.storage.replication.factor = 1
11
target.offset.storage.replication.factor = 1
12
 
          
13
source.status.storage.replication.factor = 1
14
target.status.storage.replication.factor = 1
15
 
          
16
source->target.enabled = true
17
target->source.enabled = false
18
 
          
19
# Mirror maker configurations.
20
offset-syncs.topic.replication.factor = 1
21
heartbeats.topic.replication.factor = 1
22
checkpoints.topic.replication.factor = 1
23
 
          
24
topics = .*
25
groups = .*
26
 
          
27
tasks.max = 1
28
replication.factor = 1
29
refresh.topics.enabled = true
30
sync.topic.configs.enabled = true
31
refresh.topics.interval.seconds = 30
32
 
          
33
topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
34
groups.blacklist = console-consumer-.*, connect-.*, __.*
35
 
          
36
# Enable heartbeats and checkpoints.
37
source->target.emit.heartbeats.enabled = true
38
source->target.emit.checkpoints.enabled = true


What's important to highlight:

  • Clusters have been given explicit names. source and target respectively.
  • The bootstrap string is different for each dc.
  • Replication factor variables have been set to 1 since we have 1 node on each side.
  • All topics and groups have been whitelisted with the exception of internal topics, replica topics and consumer offsets. Check the topics and groups blacklist variables.
  • The configuration variable sync.topic.configs.enabled allows mmv2 to replicate not only the records but also the topic configurations.
  • refresh.topics.interval.seconds makes the cluster check for new topics every 30 seconds. Note this does not align with the configuration copy job mentioned in the catch ups.
  • Heartbeats and checkpoints are enabled from source to target. Which means a heartbeat topic will be created on each side and will be populated by mmv2. Also checkpoints will be published to a topic named mm2-offsets.source.internal; this will be useful to check for lag.

Once you have your infrastructure and configuration file setup, run the following commands on the Docker container used for mmv2:

Shell
 




xxxxxxxxxx
1


1
./run-kakfa-mirror-maker.sh
2
cd kafka/bin/
3
./connect-mirror-maker.sh mm2.properties


The logs are very verbose, it is hard to catch everything at first. Make sure you familiarize with Kafka connect and the initialization steps. If the following issue pops up, then verify you are not moving the jars manually at some point in your installation. Otherwise, upgrade your Kafka installation to release 2.5.0 which fixed this problem for me.

Once that mmv2 starts, a periodic log will be triggered that depicts the connectors are copying the topics, offsets and heartbeats. Check for warnings or errors as they may reflect a misconfiguration. 

Checking for warnings and errors

Let's make sure mmv2 has successfully connected the two Kafka nodes. For this let's list the topics on both Kafkas.

On the source cluster.

Shell
 




xxxxxxxxxx
1


1
./kafka-topics.sh --zookeeper zookeeper-source:2181 --list



Source cluster output

On the target cluster.

Shell
 




xxxxxxxxxx
1


1
./kafka-topics.sh --zookeeper zookeeper-target:2181 --list


Target cluster output

Let's check the topic replication. Create a bunch of topics normal, compacted, etc.

Shell
 




x







1
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 1 --replication-factor 1 --topic topic_1 --config cleanup.policy=delete
2
 
          
3
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 2 --replication-factor 1 --topic topic_2 --config cleanup.policy=delete
4
 
          
5
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 5 --replication-factor 1 --topic compact_3 --config cleanup.policy=delete
6
 
          
7
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 1 --replication-factor 1 --topic compacted_topic_1 --config cleanup.policy=compact
8
 
          
9
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 1 --replication-factor 1 --topic compacted_topic_2 --config cleanup.policy=compact


The topics on the source node look as follows.

Topics on source node

The topics on the target node look as follows.

Topics on target node

Note the topics have the alias of the cluster as a suffix. This is how mmv2 assures topics will not step into each other in an active/active replication architecture.

Let's describe the topics one by one in order to verify if the configurations have been replicated. Check the config on the compacted_topic_1 it does not have the cleanup.policy=compact Remember this is eventually replicated. If you are in a hurry, restart mmv2.

Restart mmv2

After 10 minutes the config is copied.

Copied config

Active/Active Datacenter Replication 

Architecture

Active to active replication

Configuration File

Create a file named mm2.properties and fill it with the following content.

Shell
 




x


 
1
# Kafka datacenters.
2
clusters = source, target
3
source.bootstrap.servers = kafka-source:9092
4
target.bootstrap.servers = kafka-target:9092
5
 
          
6
# Source and target clusters configurations.
7
source.config.storage.replication.factor = 1
8
target.config.storage.replication.factor = 1
9
 
          
10
source.offset.storage.replication.factor = 1
11
target.offset.storage.replication.factor = 1
12
 
          
13
source.status.storage.replication.factor = 1
14
target.status.storage.replication.factor = 1
15
 
          
16
source->target.enabled = true
17
target->source.enabled = true
18
 
          
19
# Mirror maker configurations.
20
offset-syncs.topic.replication.factor = 1
21
heartbeats.topic.replication.factor = 1
22
checkpoints.topic.replication.factor = 1
23
 
          
24
topics = .*
25
groups = .*
26
 
          
27
tasks.max = 1
28
replication.factor = 1
29
refresh.topics.enabled = true
30
sync.topic.configs.enabled = true
31
refresh.topics.interval.seconds = 10
32
 
          
33
topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
34
groups.blacklist = console-consumer-.*, connect-.*, __.*
35
 
          
36
# Enable heartbeats and checkpoints.
37
source->target.emit.heartbeats.enabled = true
38
source->target.emit.checkpoints.enabled = true


What's important to highlight:

  • Same as the previous property file. But active to active replication is enabled.

Once you have your infrastructure and configuration file setup, run the following commands on the Docker container used for mmv2:

Shell
 




x


1
./run-kakfa-mirror-maker.sh
2
cd kafka/bin/
3
./connect-mirror-maker.sh /data/active-to-active-mm2.properties


Check the topics on the source cluster.

Source cluster topics

Check the topics on the target cluster.

Target cluster topics

Let's create some topics. Each on a different Kafka node. Note the zookeeper is different on each command.

Shell
 




xxxxxxxxxx
1


 
1
./kafka-topics.sh --zookeeper zookeeper-source:2181 --create --partitions 1 --replication-factor 1 --topic topic_1
2
./kafka-topics.sh --zookeeper zookeeper-target:2181 --create --partitions 1 --replication-factor 1 --topic topic_2


Let's list the topics.

On the source cluster. Check the local topics and one replicated from the target cluster.

Source cluster topics

On the target cluster. Check the local topics and one replicated from the source cluster.

local topics on target cluster

That is it for this post. Check the next one for more examples such as aggregation. Also I'll show you how to create your own replication policy using Java.

Topics:
apache kafka, integration, kafka, mirror maker, replication

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}