DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Scalable Client-Server Communication With WebSockets and Spring Boot (Part II)
  • A New Era Of Spring Cloud
  • Run Java Microservices Across Multiple Cloud Regions With Spring Cloud
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)

Trending

  • Mastering Advanced Traffic Management in Multi-Cloud Kubernetes: Scaling With Multiple Istio Ingress Gateways
  • How Large Tech Companies Architect Resilient Systems for Millions of Users
  • Navigating Double and Triple Extortion Tactics
  • After 9 Years, Microsoft Fulfills This Windows Feature Request
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. MQTT Client Load Balancing With RabbitMQ and Spring Cloud

MQTT Client Load Balancing With RabbitMQ and Spring Cloud

Expand your knowledge of MQTT Client and load balancing.

By 
Radwan Nizam user avatar
Radwan Nizam
·
Feb. 08, 19 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
27.8K Views

Join the DZone community and get the full member experience.

Join For Free

Introduction

MQTT is a machine-to-machine (M2M), IoT connectivity protocol. It was designed as an extremely lightweight publish and subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.

Each MQTT client subscribes to certain topics and receives messages when the publisher starts pushing messages on those topics.

How to Scale Out?

The purpose of horizontal scaling is to distribute the load between multiple instances of the same application. If the MQTT clients in those instances are subscribed to the same topic, then the same MQTT message will be delivered to each instance, which is not the desired behavior.

MQTT Subscriptions


Competing Consumers

Spring Cloud Stream defines the concept of "Consumer Groups" as the following:

"While the publish-subscribe model makes it easy to connect applications through shared topics, the ability to scale up by creating multiple instances of a given application is equally important. When doing so, different instances of an application are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message."

Based on this definition, Spring Cloud Stream allows distributing the load for a topic across multiple clients, as shown in the next figure.

Image title

Example

In this example, an MQTT client will publish messages to one topic in RabbitMQ, and multiple consumers will share the messages of that topic.

Install RabbitMQ and MQTT Plugin

First, we will run an instance of RabbitMQ using a Docker image. Then, we will install the MQTT plugin.

> docker run  -d --hostname vs29 --name vs29 -p 8081:15672 -p 5672:5672 -p 1883:1883 rabbitmq:3-management


Now, let's check the startup logs of that container:

>docker ps
CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS              PORTS                                                                                                               NAMES
fbd443154bf6        rabbitmq:3-management     "docker-entrypoint.s…"   6 seconds ago       Up 2 seconds        4369/tcp, 0.0.0.0:1883->1883/tcp, 5671/tcp, 15671/tcp, 0.0.0.0:5672->5672/tcp, 25672/tcp, 0.0.0.0:8081->15672/tcp   vs29

>docker logs fbd443154bf6 -f
....
....
...

2019-02-03 07:34:16.709 [info] <0.198.0>
node : rabbit@vs29
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.conf
cookie hash : O+z+vUDvSh3J1vK/lV08Xw==
log(s) : <stdout>
database dir : /var/lib/rabbitmq/mnesia/rabbit@vs29


The last few lines of the log indicate that the server is read now. Now, let's install the MQTT plugin:

Navigate to the container first:
> docker exec -u 0 -it fbd443154bf6 /bin/bash

Enable the plugin now:
root@vs29:/#rabbitmq-plugins enable rabbitmq_mqtt


Add New User in RabbitMQ

Let's add a new user in RabbitMQ using the administration UI. Open the URL http://RabbitMQhost:8081/ then navigate to the tab 'Admin' (the default credentials in RabbitMQ is guest/guest).

To create a new user:

  • Add the user name in the field 'Username.' Then, let's add the user 'client1.' Set the password in the field 'Password', let's set the password to 'client1'
  • Click on 'Add user'

Image title

  • The default user has no access to any virtual host. Click on 'client1' to edit the permissions of this user. In the new page, click on 'Set Permissions' to give the user access to all virtual hosts.
  • To verify that everything is working fine, use an MQTT client to push data to our new server. In this tutorial, we will use the commands 'mosquitto_pub' and 'mosquitto_sub' that are provided by the Mosquitto server. First, let's subscribe to all topics in the server. Second, we will push some data to the server.
..>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "Hello World" -u client1 -P client1 -p 1883


..>mosquitto_sub -h xxx.xxx.xxx.xxx -t "#" -u client1 -P client1
Hello World


If everything goes fine, you should receive 'Hello World'

Create the Message Receiver Service

The objective of this tutorial is to distribute the load between multiple instances of the same application. So, let's create a simple service to consume messages using Spring Boot and Spring Cloud Stream.

  • Create a new Spring Boot project. You can use an IDE or Spring Initializer
  • Adjust your mvn file to include the following contents :
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>


Now, let's add our Stream Listener:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;

@EnableBinding(MessageSink.InputChannel.class)
public class MessageSink {

    @StreamListener(InputChannel.SINK)
    public void handle(String message) {
System.out.println("new message:" + message + ", from worker :" + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public interface InputChannel {
        String SINK = "message-sink";

        @Input(SINK)
        SubscribableChannel sink();
    }
}


The next step is to define and configure our Channel (which is the most important part of this tutorial). The configuration will be added in the file application.yml:

spring:
  cloud:
    stream:
      bindings:
        message-sink :
         destination: amq.topic
         binder: rabbit
         group: messages-consumer-group
         consumer :
           concurrency: 1
      rabbit:
        bindings:
          message-sink:
            consumer:
              durableSubscription: true
              declareExchange: true
              exchangeDurable: true
              exchangeType: topic
              queueNameGroupOnly: true 
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    password: client1
    username: client1


Let's discuss the important configuration in the application.yml:

  • destination: amq.topic is the default Exchange used by the MQTT Plugin, so we need to subscribe to it.
  • group: As per Spring Cloud Documents, 'All groups that subscribe to a given destination receive a copy of published data, but only one member of each group receives a given message from that destination'
  • consumer.concurrency: The maximum number of threads that can be used to process the received messages in this consumer. We modify this number to any positive value and the concept of 'grouped consumers' is still applied.
  • queueNameGroupOnly: As per Spring Cloud Documents, 'When true, consume from a queue with a name equal to the group. Otherwise, the queue name is destination.group. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.' Indeed, this is a very important configuration. Omitting this property will lead to many errors when starting the service because Spring will generate a queue name that starts with 'amq,' which is not allowed by RabbitMQ. You get more details in this thread

Verify Load Distribution

Let's start two instances of the service and push some data using the MQTT client. First, open a command Shell window, navigate to your project source, and build the project using the command

>mvn clean compile package


Second, open two command Shell window, navigate to your project folder, and start the service using the command

>java -jar target\balanced_mqtt_client-0.0.1-SNAPSHOT.jar


Now, we will push some data from the MQTT client :

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 1" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 2" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 3" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 4" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 5" -u client1 -P client1 -p 1883


At the consumers' side, the following messages will be seen :

consumer 1:

2019-02-07 10:33:55.848  INFO 14284 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.messages-consumer-group
2019-02-07 10:33:55.858  INFO 14284 --- [           main] r.n.cloud.rabbitmq.mqtt.MqttApplication  : Started MqttApplication in 8.824 seconds (JVM running for 9.318)
new message:message 1, from worker :messages-consumer-group-1
new message:message 3, from worker :messages-consumer-group-1
new message:message 5, from worker :messages-consumer-group-1


consumer 2:

O 13832 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.messages-consumer-group
O 13832 --- [           main] r.n.cloud.rabbitmq.mqtt.MqttApplication  : Started MqttApplication in 7.8 seconds (JVM running for 8.495)
worker :messages-consumer-group-1
worker :messages-consumer-group-1


As we can see, the messages were distributed between the two consumers.

Conclusion

The tutorial shows how to implement a load-balanced MQTT consumer using RabbitMQ server and the feature 'grouped consumers.' You can download the demo from GitHub.

MQTT Spring Framework Spring Cloud Load balancing (computing)

Opinions expressed by DZone contributors are their own.

Related

  • Scalable Client-Server Communication With WebSockets and Spring Boot (Part II)
  • A New Era Of Spring Cloud
  • Run Java Microservices Across Multiple Cloud Regions With Spring Cloud
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!