DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Mirror Maker v2.0
  • Introduction to Data Replication With MariaDB Using Docker Containers
  • Introduction to Apache Kafka With Spring
  • Implementing MongoDB to Elastic Search 7.X Data Pipeline

Trending

  • Mastering Advanced Traffic Management in Multi-Cloud Kubernetes: Scaling With Multiple Istio Ingress Gateways
  • Comprehensive Guide to Property-Based Testing in Go: Principles and Implementation
  • AI's Dilemma: When to Retrain and When to Unlearn?
  • Unlocking Data with Language: Real-World Applications of Text-to-SQL Interfaces
  1. DZone
  2. Coding
  3. Tools
  4. Kafka Replication — Manipulate Your Topics and Records in Mirror Maker V1.0 and V2.0

Kafka Replication — Manipulate Your Topics and Records in Mirror Maker V1.0 and V2.0

Using the features of a tool might not enough. Check how to insert your own Java code into the Kafka mirror maker tools.

By 
Rodrigo Loza user avatar
Rodrigo Loza
·
Updated Jun. 18, 20 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
12.2K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we will be working with both mirror maker v1.0 and v2.0. Therefore, we are doing some abbreviations.

  • Mirror maker v1.0 -> mmv1
  • Mirror maker v2.0 -> mmv2

Find the Project

Find all the docker images and code over here.

Have questions or wanna know more? Follow me on twitter Rodrigo_Loza_L

Introduction

Apache Kafka has released tools that can be used to replicate data between Kafka datacenters. Each has its own level of manipulation, architecture, performance and reliability. In this document, we will explore how to alter a data replication pipeline using the two versions of the Kafka mirror maker tool.

Manipulate the Replication Pipeline Records and Topics Using Kafka Mirror Maker V1.0

This version of the mirror maker tool allows to build the following architecture:
source clusterThis architecture allows the replication of topics and data from the source cluster to the target cluster. Some disadvantages that are worth mentioning are the following:

  • Topic configurations are not copied which means that default topic configurations will be used in the target cluster in order to create the topics.
  • A consumer and producer are used to replicate the data. Sometimes it gets stuck or stops replicating. This forces the administrator to change the consumer group id in order to replicate all the data again. Otherwise, a restart is used to register the consumer group in the source cluster.

Let's check how to manipulate this flow of data between the Kafka clusters. In this version of mmv1 this is possible by KIP-3 which introduced the possibility to override the handle Method on the class MirrorMaker.MirrorMakerMessageHandler. 

proposed changes

So what is it that we want to solve? A common case is topic mapping in which you have a topic on the source cluster. Your task is to replicate the data of that topic to another cluster. But, you don't want the topic to be called the same. Therefore, you need to rename that topic. 

source cluster

In the image above the topic named my_topic_1 in the source cluster is renamed to my_topic in the target cluster. This was possible by the TopicMapping class. Let's see how to implement it.

Create a java project and add the following dependencies. In my case I used gradle.

Java
 




xxxxxxxxxx
1


 
1
dependencies {
2
    compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.0'
3
    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.3'
4
    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.3'
5
}



Next let's code the handle method that would deal with the topic mapping.

Java
 




x


 
1
public class TopicMapping implements MirrorMaker.MirrorMakerMessageHandler {
2
 
          
3
  private static final Logger logger = LoggerFactory.getLogger(TopicMapping.class);
4
 
          
5
  private final String TOPIC_LIST_SPLITTER_TOKEN = ",";
6
  private final String TOPIC_MAP_SPLITTER_TOKEN = ":";
7
 
          
8
  private HashMap<String, String> topicMaps = new HashMap<>();
9
 
          
10
  public TopicMapping(String topicMappingArgument) {
11
    String[] listOfTopics = topicMappingArgument.split(TOPIC_LIST_SPLITTER_TOKEN);
12
    for (int index=0; index<listOfTopics.length; index++){
13
      String[] topicMap = listOfTopics[index].split(TOPIC_MAP_SPLITTER_TOKEN);
14
      logger.info(stringFormatter("Topic map. Source: {0} Target: {1}", topicMap[0], topicMap[1]));
15
      topicMaps.put(topicMap[0], topicMap[1]);
16
    }
17
  }
18
 
          
19
  @Override
20
  public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
21
    try {
22
      // Get record information.
23
      String headers = record.headers().toString();
24
      //String key = new String(record.key(), Charset.forName("UTF-8"));
25
      String value = new String(record.value(), Charset.forName("UTF-8"));
26
      String topic = record.topic();
27
      Long timestamp = record.timestamp();
28
      String message = stringFormatter("New record arrived with headers: {0} value: {1} topic: {2}", headers, value, topic);
29
      // Check if the topic is in topicMaps.
30
      String targetTopic = topicMaps.getOrDefault(topic, null);
31
      if (targetTopic != null) {
32
        return Collections.singletonList(new ProducerRecord<byte[], byte[]>(
33
            targetTopic,
34
            null,
35
            timestamp,
36
            record.key(),
37
            record.value(),
38
            record.headers()
39
        ));
40
      } else {
41
        return Collections.singletonList(new ProducerRecord<byte[], byte[]>(
42
            topic,
43
            null,
44
            timestamp,
45
            record.key(),
46
            record.value(),
47
            record.headers()
48
        ));
49
      }
50
    }
51
    catch (Exception e){
52
      logger.error("NPE found in topic.");
53
      e.printStackTrace();
54
      return Collections.EMPTY_LIST;
55
    }
56
  }
57
 
          
58
  public String stringFormatter(String line, Object ... params){
59
    return new MessageFormat(line).format(params);
60
  }
61
 
          
62
}
63
 
          



The try/catch is very important in the handle method. If you try the decode the record you may get a lot of NPEs depending on the data that you are sending. It depends on whether you are sending a record with keys, headers, value, etc.

Compile the project and produce the jar. The jar must be copied to kafka/libs/ afterwards so it can be read by the mirror maker message handler parameters. 

This setup will take you sometime so I recommend you to check my repository with the examples here. Follow the next steps. 

  • Run build.sh
  • Deploy the kafka nodes with this script
  • Deploy mmv1 with this script

Check the entrypoint script of the Docker image. Note the topic whitelist is hardcoded to my_topic_.* so only topics that start with the pattern my_topic_ will be replicated. Check the message handler arguments in the yaml file. The mapping will follow the rules:

  • my_topic_1 will be translated to my_topic
  • my_topic_2 will be translated to my_other_topic

Let's test the replication pipeline. I created my_topic_1 and my_topic_2 in the source cluster manually.

my_topic_1 and my_topic_2

Let's write some data to the topics.

writing data

Finally, let's verify the data has been written in the topics of the target cluster.

target cluster

Note the data has been written to the correct mapped topics.

Before we move onto the next example note that the potential in the message handler method is amazing. You can apply any logic and transform the actual Kafka record as you want.

Manipulate the Replication Pipeline Topics Using Kafka Mirror Maker V2.0

The next example is another replication pipeline customization but using Kafka mirror maker version 2. For this example we will use an active-to-passive architecture. Sadly I haven't figured out how to manipulate the actual Kafka records yet (I guess tweaking this class). 

Nonetheless, we can play around with the topics and create the topic mappings as we did in the previous example. Now this is what you really want to do because not only does mmv2 replicate your data and topics but also the topic configs. So if your topic is compacted on one side, it will compacted on the other side as well.

The methods that you need to override are in this class. It is a bit more complicated to understand but I got some help checking this repository. The replication policy that we will implement will be called identity replication policy.

Java
 




x
1
67


 
1
public class IdentityMapperTopicReplicationPolicy extends DefaultReplicationPolicy {
2
 
          
3
  private static final Logger logger = LoggerFactory.getLogger(IdentityMapperTopicReplicationPolicy.class);
4
 
          
5
  public static final String TOPIC_REPLICATION_MAPS_ENVIRONMENT_VARIABLE_NAME = "TOPIC_REPLICATION_MAPS";
6
  public static final String TOPICS_REPLICATION_MAPS_SEPARATOR = ":";
7
  public static final String TOPICS_REPLICATION_MAP_SEPARATOR = ",";
8
 
          
9
  private String sourceClusterAlias;
10
  private HashMap<String, String> topicMappings = new HashMap<>();
11
 
          
12
  @Override
13
  public void configure(Map<String, ?> props) {
14
    HashMap<String, List<String>> maps = new HashMap<>();
15
    // Load source cluster alias from props.
16
    sourceClusterAlias = props.get("source.cluster.alias").toString();
17
    // Load the topic mapping parameter from the environment variables.
18
    Map<String, String> environmentVariables = System.getenv();
19
    String topicMappingEnvVar = null;
20
    for (Map.Entry<String, String> environmentVariable : environmentVariables.entrySet()) {
21
      if (environmentVariable.getKey().equals(TOPIC_REPLICATION_MAPS_ENVIRONMENT_VARIABLE_NAME)){
22
        logger.info("TOPICS_REPLICATION_MAPS: " + environmentVariable.getValue());
23
        topicMappingEnvVar = environmentVariable.getValue();
24
        break;
25
      }
26
    }
27
    if (topicMappingEnvVar == null){
28
      String errorMessage = Utils.StringFormatter("{0} environment variable has not been set.", TOPIC_REPLICATION_MAPS_ENVIRONMENT_VARIABLE_NAME);
29
      throw new RuntimeException(errorMessage);
30
    }
31
    // Load the mappings to a hashmap.
32
    List<String> topicsMaps = Arrays.asList(topicMappingEnvVar.split(TOPICS_REPLICATION_MAPS_SEPARATOR));
33
    for (String topicsMap : topicsMaps){
34
      List<String> kafkaTopicsMap = Arrays.asList(topicsMap.split(TOPICS_REPLICATION_MAP_SEPARATOR));
35
      String sourceTopic = kafkaTopicsMap.get(0);
36
      String targetTopic = kafkaTopicsMap.get(1);
37
      topicMappings.put(sourceTopic, targetTopic);
38
    }
39
  }
40
 
          
41
  @Override
42
  public String formatRemoteTopic(String sourceClusterAlias, String topic) {
43
    String targetTopic = topicMappings.getOrDefault(topic, null);
44
    logger.info(Utils.StringFormatter("Source topic: {0} Target topic: {1}", topic, targetTopic));
45
    if (targetTopic != null) {
46
      return targetTopic;
47
    } else {
48
      return topic;
49
    }
50
  }
51
 
          
52
  @Override
53
  public String topicSource(String topic) {
54
    return topic == null ? null : sourceClusterAlias;
55
  }
56
 
          
57
  @Override
58
  public String upstreamTopic(String topic) {
59
    return null;
60
  }
61
 
          
62
  @Override
63
  public boolean isInternalTopic(String topic) {
64
    return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.matches("__[a-zA-Z]+.*") || topic.startsWith(".");
65
  }
66
 
          
67
}



Code highlights:

  • The configure method is alike to the constructor we saw in the message handler of mmv1. It works as a starting point. We are using it to load the topic mappings.
  • The formatRemoteTopic method is the one that produces the topic name in the target cluster. Any topic remapping should be done here. In this example, we load a map of topic mappings. If the whitelisted topic is in the map, it is mapped. Otherwise it is copied with the same name.
  • The topicSource and upstreamTopic methods are a bit confusing. The way to make sure replication identity is achieved is by setting upstreamTopic to return always null and sourceTopic must return null if the topic variable is null otherwise return the source cluster alias.
  • The method isInternalTopic() can be override in case it has a conflict with your own topics. In a production scenario I had multiple topics that started with ___ (triple underscore). So these could not be replicated until I changed the code.

Let's move to the example. Follow the next steps:

  • Use this Dockerfile to build mmv2. It will build a docker image for Kafka and add the jar of the replication policy to the libs folder.
  • Deploy the Kafka nodes using this bash script.
  • Run this bash script to start mmv2 so replication starts between the two Kafka nodes.

Note this is the yaml of mmv2. Here is an environment variable with the topic mappings. 

  •  source_topic_1 is mapped to target_topic_1.
  • source_topic_2 is mapped to target_topic_2.
  • Any other topic is copied as an identity.

Let's test the replication policy.

First, let's create source_topic_1, source_topic_2 and rodrigo_topic_1 in the source Kafka node.

Java
 




x


 
1
./kafka-topics.sh --zookeeper zookeeper-source:2181 --partitions 1 --replication-factor 1 --topic source_topic_1 --create
2
./kafka-topics.sh --zookeeper zookeeper-source:2181 --partitions 1 --replication-factor 1 --topic source_topic_2 --create
3
./kafka-topics.sh --zookeeper zookeeper-source:2181 --partitions 1 --replication-factor 1 --topic rodrigo_topic_1 --create




source_topic


Let's check the target Kafka node to verify the replication policy has created the mapped and identity topics. identity topics

The mapped topics and the identity topic have been created.

Finally, let's produce some data on the source topics. And validate the data is being streamed to the mapped and identity topics.

source_topic_1 into target_topic_1

source_topic_1

target_topic_1

source_topic_2 into target_topic_2

source_topic_2

target_topic_2

rodrigo_topic_1 into rodrigo_topic_1

rodrigo_topic_1

rodrigo_topic_1

That's it. You have the power to manipulate data streaming in mmv2.

Conclusion

In this document we have made a review of both of the mirror maker tool releases. In addition, we have seen how to tweak the code in order to alter the replication pipelines. In conclusion, the tools presented in this document have different levels of manipulation, have different performance and have their own architecture to achieve their goals; finally, each architecture has its pros and cons. Thus, the boundaries of the problem must be defined before picking the Kafka mirror maker tool and its version.

kafka Replication (computing) Mirror (programming) Docker (software) Record (computer science) Data (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Mirror Maker v2.0
  • Introduction to Data Replication With MariaDB Using Docker Containers
  • Introduction to Apache Kafka With Spring
  • Implementing MongoDB to Elastic Search 7.X Data Pipeline

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!