RocketMQ: HA Implementation

DZone 's Guide to

RocketMQ: HA Implementation

Explore an HA implementation with RocketMQ.

· Integration Zone ·
Free Resource


When we talk about HA, normally people think about the failover mechanism. However, making the cluster available to messages is also considered HA. And to a certain extent, I think it is more important than just making the brokers available. After all, users can and will feel the impact of this availability.

Code Snippets

Here is the scenario:

Assume there are 2 brokers in a cluster: master-a, and master-b. Each has four queues: master-a (q0,q1,q2,q3) and master-b(q0,q1,q2,q3). The last message was sent to master-a q0. Now, master-a is down.

The goal here is to try our best to continue delivering the message.

There are two possibilities:

If the Outage Has Not Been Detected

In that case, RocketMQ will retry 3 times:


int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

for (; times < timesTotal; times++) {
    // ...

Three is the default number and is configurable. There are three sending mechanisms: SYNC, ASYNC, and ONEWAY.

If the Outage Has Been Detected

Good. Now the question becomes: How do we avoid going to master-a?

First, we need to set sendLatencyFaultEnable=true, then pick a message queue:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

   if (this.sendLatencyFaultEnable) {
     try {

      int index = tpInfo.getSendWhichQueue().getAndIncrement();
      for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
        if (pos < 0)
          pos = 0;
        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
          if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
            return mq;

      final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
      int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
      if (writeQueueNums > 0) {
        final MessageQueue mq = tpInfo.selectOneMessageQueue();
        if (notBestBroker != null) {
          mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
        return mq;
      } else {

    } catch (Exception e) {
      log.error("Error occurred when selecting message queue", e);

    return tpInfo.selectOneMessageQueue();

  return tpInfo.selectOneMessageQueue(lastBrokerName);

The important part is this:

latencyFaultTolerance.isAvailable(mq.getBrokerName()); (line 13)

In normal time, when a message is sent, it will call updateFaultItem():

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

And when there is an outage, the same method is called:

endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

currentLatency indicates the current latency, isolation is the flag on whether or not to avoid this broker.I.e when the transmission is successful, do not avoid this broker. Otherwise avoid it.

latencyMax and notAvailableDuration two core variables in the algorithm. They also decide the value of startTimestamp in case of an outage.

From the code, we can see if a broker is out, the wait time is 30s. And then, we’ll scan the latencyMax from bottom to top until we hit a number smaller than currentLatency .Here are the default values of latencyMax and notAvailableDuration:

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

If isolation=true, this broker will get a 10 min avoidance. Otherwise, it's dependent on the latency of the message.

FalutItem stores the information of the failed brokers, including its name, latency, and when the avoidance begins.

public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {

  FaultItem old = this.faultItemTable.get(name);
  if (null == old) {

    final FaultItem faultItem = new FaultItem(name);

    faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

    old = this.faultItemTable.putIfAbsent(name, faultItem);
    if (old != null) {

      old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
  } else {

    old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

This class will update the status of the failed brokers. And it will also decide whether a recovered broker and re-join the cluster. This is the method:


public boolean isAvailable() {
  return (System.currentTimeMillis() - startTimestamp) >= 0;


So far, we've examined the main code blocks for implementing the HA mechanism. Basically, the best effort is made to deliver the message. We see a lot of constant pollings and retrys. That really is the reality of the enterprise-grade solution. By showing this code, we would like to demonstrate the efforts we make in ensuring the high availability of the cluster.

database, high availability cluster, message queue

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}