Reaching 100M MQTT Connections With EMQX 5.0
To stress-test the scalability of our open-source MQTT messaging broker, we established 100M MQTT connections to a 23-node EMQX cluster. Read more in this post.
Join the DZone community and get the full member experience.
Join For FreeThe ever-increasing scale of IoT device connections and deployments requires IoT messaging platforms to be massively scalable and robust at scale. To stress-test the scalability of our open-source MQTT messaging broker, EMQX, we established 100 million MQTT connections to a 23-node EMQX cluster.
In this test, each MQTT client subscribed to a unique non-wildcard topic. When publishing, we chose a 1-to-1 publisher/subscriber topology for a low, constant publishing rate of 90k messages per second, then provoked a burst of messages and reached 1M messages processed per second at peak. We also compared how the increasing cluster size affected the maximum subscription rate using two different database backends: running in RLOG mode and plain Mnesia. Here we will detail our setup and some of the challenges we faced along the way.
Introduction
EMQX is an open-source, highly scalable, and distributed MQTT messaging broker written in Erlang/OTP that can support millions of concurrent clients. As such, there is a need to persist and replicate various data among the cluster nodes; for example, MQTT topics and their subscribers, routing information, ACL rules, various configurations, and many more. Since its beginning, EMQX has used Mnesia as the database backend for such needs.
Mnesia is an embedded ACID distributed database that comes with Erlang/OTP. It uses a full-mesh peer-to-peer Erlang distribution for transaction coordination and replication. Because of this characteristic, it has trouble scaling horizontally: the more nodes and replicas of the data there are, the bigger the overhead for write-transaction coordination and the bigger the risk of split-brain scenarios.
In EMQX 5.0, we attempted to mitigate this issue in a new DB mode called RLOG (as in "replication log"), which is implemented in Mria. Mria is an extension to the Mnesia database that helps it scale horizontally by defining two types of nodes:
Core nodes, which behave as usual Mnesia nodes and participate in write transactions
Replicant nodes, which do not take part in transactions, but instead delegate those to core nodes, while keeping a read-only replica of the data locally
This helps to reduce the risk of split-brain scenarios and lessens the coordination needed for transactions since fewer nodes participate in them while keeping read-only data access fast as it is available locally in all nodes.
Before deciding to make this DB backend our new default, we needed to stress test it and verify that it did indeed scale well horizontally. To that end, we performed tests in which a 23-node EMQX cluster sustained 100 million concurrent connections, divided in half between publishers and subscribers, and published messages at a constant rate in a one-to-one fashion, and also with an aggressive burst. We also compared the RLOG DB mode to conventional Mnesia performance and confirmed that RLOG can indeed sustain higher arrival rates than Mnesia.
How We Tested It
For deploying and running our cluster tests, we used AWS CDK, which allowed us to experiment with different instance types and numbers and different development branches of EMQX. You can check out our scripts in this GitHub repo. In our load generator nodes ("loadgens" for short), we used our emqtt-bench tool to generate the connection/publishing/subscribing traffic with various options. EMQX's Dashboard and Prometheus were used for monitoring the progress of the test and the instances' health.
We experimented gradually with various instance types and numbers, and in the last runs we settled on using c6g.metal
instances for both EMQX nodes and loadgens, and the "3+20" topology for our cluster: 3 nodes of type “core” and 20 nodes of type “replicant.” As for our loadgens, we observed that publisher clients required quite a bit more resources than subscribers. In order to connect and subscribe 100 million connections, only 13 loadgens instances were needed; for both publishing and subscribing, we needed 17 instances.
We did not use any load balancers for those tests. Instead, each loadgen client connected directly to each replicant node in an evenly distributed fashion, so all replicants had about the same number of connections and resource requirements. To allow core nodes to be dedicated solely to managing database write transactions, loadgens did not connect to them.
Each subscribing client subscribed to a non-wildcard topic of the form bench/%i/test
with QoS 1, where %i
stands for a unique subscriber number, and each publisher published with QoS 1 to the same topic of the form bench/%i/test
, with the same %i
as the corresponding subscriber. This pattern ensured that for each publisher there was exactly one subscriber. The size of the payload in the messages was always 100 bytes. Each subscriber and each publisher connected to random brokers, so they didn’t necessarily connect to the same server, which means that on average 95 % of the messages were being forwarded between brokers.
In our tests, we first connected all our publishers and only then started to connect our subscribers. After all publishers were connected, they each started to publish every 600s to create a “background” of approximately 90,000 published messages per second. Then, we spawned 10 extra publishers per loadgen that were responsible for doing a short burst of intense publishing targeting 1000 random subscribers each. They published 10 QoS 0 messages (100 bytes) to each randomly chosen subscriber, repeating this process 1000 times. For the 100M connection test reported here, the rate at which both subscribers and publishers connected to the broker cluster was 20,000 connections/s at peak, although we believe that the cluster can sustain an even higher connection rate.
Challenges Along the Way
As we experimented with such large volumes of connections and throughput, we encountered several challenges along the way and investigated and improved performance bottlenecks. For tracking down memory and CPU usage in Erlang processes, system_monitor
was quite a helpful tool. It is basically "htop
for BEAM processes," allowing us to find processes with long message queues, high memory, and/or CPU usage, then perform a few performance tunings [1][2][3] in Mria after what we observed. For more on those processes, visit the GitHub links below:
- Move message queues to
off_heap
- Improve performance of the agent and the replicant
- Remove
mria_status
process
In our initial tests with Mria, without going into too many details, the replication mechanism basically logged all transactions to a "phantom" Mnesia table, which was subscribed to by replicant nodes. This generated a bit of network overhead between the core nodes because each transaction was essentially duplicated. In our OTP fork, we added a new Mnesia module that allows us to capture all committed transaction logs more easily, removing the need for duplicate writes, reducing network usage significantly, and allowing the cluster to sustain higher connection/transaction rates. While stressing the cluster further after those optimizations, we found new bottlenecks that prompted further performance tuning, as shown in the links below:
- Store transactions in replayq in normal mode
- Remove redundant data from the
mnesia ops
- Batch transaction imports
Even our benchmarking tool, emqtt-bench, needed a few adjustments to help deal with such a large volume and rate of connections. Several quality-of-life improvements have been made, such as:
- New waiting options for publishing
- Option to retry connections
- Support for rate control for 1000+ conns/s
- Support of multi-target hosts
Additionally, a couple of performance optimizations were made:
In our pub/sub tests, we even needed to use a special fork so that memory usage could be further lowered (not in the current master
branch) for the sole purpose of performing the test.
Results
The animation above illustrates our final results for the 1-to-1 publish-subscribe + burst tests. We established 100 million connections, 50M of which were subscribers and 50M were publishers. By publishing constantly every 600 seconds, we have an average inbound and outbound rate of 90k messages per second. After we start the aggressive publishing burst, over 1M messages per second were achieved. At the publishing phase, each of the 20 replicant nodes (which, as a reminder, are the ones taking in connections) consumed at most 90% of its memory (about 113 GiB), around 60% CPU (64 arm64 cores) during the constant publishing plateau, and reached 100% CPU during the publishing bursts. The 3 core nodes handling the transactions were quite idle in CPU (less than 1% usage) and used up only 32% of their memory (about 40 GiB). The network traffic required during the constant publishing of 100-byte payloads was around 36 MB/s and reached 500 MB/s during the bursts. The loadgens used approximately 60% (~ 80 GiB) of their total memory and ~20% CPU during the constant publishing plateau and bursts.
Grafana screenshot of CPU, memory, and network usage of EMQX nodes during the test
Final Remarks
After seeing these optimistic results, we believe that the RLOG DB mode offered by Mria is ready for production usage in EMQX 5.0. It is already the default DB backend in the current master
branch.
Published at DZone with permission of Thales Macedo Garitezi. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments