RocketMQ: HA Implementation
Explore an HA implementation with RocketMQ.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
When we talk about HA, normally people think about the failover mechanism. However, making the cluster available to messages is also considered HA. And to a certain extent, I think it is more important than just making the brokers available. After all, users can and will feel the impact of this availability.
Code Snippets
Here is the scenario:
Assume there are 2 brokers in a cluster: master-a, and master-b. Each has four queues: master-a (q0,q1,q2,q3)
and master-b(q0,q1,q2,q3)
. The last message was sent to master-a q0. Now, master-a is down.
The goal here is to try our best to continue delivering the message.
There are two possibilities:
If the Outage Has Not Been Detected
In that case, RocketMQ will retry 3 times:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
:
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) {
// ...
}
Three is the default number and is configurable. There are three sending mechanisms: SYNC, ASYNC, and ONEWAY.
If the Outage Has Been Detected
Good. Now the question becomes: How do we avoid going to master-a?
First, we need to set sendLatencyFaultEnable=true
, then pick a message queue:
org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
The important part is this:
latencyFaultTolerance.isAvailable(mq.getBrokerName());
(line 13)
In normal time, when a message is sent, it will call updateFaultItem()
:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
And when there is an outage, the same method is called:
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
currentLatency
indicates the current latency, isolation
is the flag on whether or not to avoid this broker.I.e when the transmission is successful, do not avoid this broker. Otherwise avoid it.
latencyMax
and notAvailableDuration
two core variables in the algorithm. They also decide the value of startTimestamp
in case of an outage.
From the code, we can see if a broker is out, the wait time is 30s. And then, we’ll scan the latencyMax
from bottom to top until we hit a number smaller than currentLatency
.Here are the default values of latencyMax
and notAvailableDuration
:
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
If isolation=true, this broker will get a 10 min avoidance. Otherwise, it's dependent on the latency of the message.
FalutItem stores the information of the failed brokers, including its name, latency, and when the avoidance begins.
org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
This class will update the status of the failed brokers. And it will also decide whether a recovered broker and re-join the cluster. This is the method:
org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable:
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
Conclusion
So far, we've examined the main code blocks for implementing the HA mechanism. Basically, the best effort is made to deliver the message. We see a lot of constant pollings and retrys. That really is the reality of the enterprise-grade solution. By showing this code, we would like to demonstrate the efforts we make in ensuring the high availability of the cluster.
Opinions expressed by DZone contributors are their own.
Comments