Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

IoT and Vert.x: Building a Simple MQTT Server

DZone's Guide to

IoT and Vert.x: Building a Simple MQTT Server

Here, we'll use the reactive framework Vert.x and MQTT Dash to build a simple MQTT server to turn your lights on and off, all from your phone.

· IoT Zone
Free Resource

Download Red Hat’s blueprint for building an open IoT platform—open source from cloud to gateways to devices.

Almost a year ago, Dreamix adopted Vert.x in its technology stack as a need for a framework with reactive capabilities. Its good documentation and strong community are excellent additions to the various features which the framework provides.

The term IoT (Internet of Things) is very popular these days, since everyone talks how all sorts of devices from TVs, refrigerators, air conditioners, etc. will be connected to the Internet. The implementation of IoT standards indicates that the field has grown tremendously, and every developer with an interest in this ecosystem should be aware of them.

This blog will cover the implementation of an MQTT server with Vert.x and the corresponding communication with an Android MQTT client.

“MQTT is a machine-to-machine (M2M)/”Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.”

Prerequisites

  1. Download the MQTT Dash (IoT, Smart Home) Android application. This will serve as our client.
  2. Git clone https://github.com/boykodimitroff/vertx-mqtt.git (Optional)

Hands on the Project

We are going to implement simple home automation project and more specifically remote switching of your home lights. But first little theory.

As the quote above states, the MQTT is as simple publish/subscribe protocol one of the de-facto IoT standards. The protocol is developed by Andy Stanford-Clark and Arlen Nipper in 1999 for the purposes of oil pipeline monitoring through the desert.

The main communication unit is called Broker and is responsible for dispatching all messaging between the sender and the receiver. In the context of our project we are going to implement exactly an MQTT broker which will dispatch messages from our Android client to a LightsController class which will simulate the switching of the lights.

Our Android client will publish a message to the broker with included special string in it’s message. This string is called Topic. Based on the topic, the broker will know how to reroute the message to a previously subscribed receiver to the same topic.

Let’s build our MQTT broker.

<dependencies>
    <dependency>
         <groupId>io.vertx</groupId>
         <artifactId>vertx-mqtt-server</artifactId>
         <version>3.4.2</version>
    </dependency>
</dependencies>


/*/src/main/java/eu/dreamix/server/MQTTBroker.java*/

package eu.dreamix.broker;

import eu.dreamix.controllers.LightsController;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttPublishMessage;

import java.util.ArrayList;
import java.util.List;

public class MQTTBroker {

    private static final String TOPIC_LIGHTS = "lights";
    public static void main(String args[]) {
        MqttServer mqttServer = MqttServer.create(Vertx.vertx());
        init(mqttServer);
    }

    private static void init(MqttServer mqttServer) {
        mqttServer.endpointHandler(endpoint - > {

            System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
            endpoint.accept(false);

            handleSubscription(endpoint);
            handleUnsubscription(endpoint);
            publishHandler(endpoint);
            handleClientDisconnect(endpoint);
        }).listen(ar - > {
            if (ar.succeeded()) {
                System.out.println("MQTT server is listening on port " + ar.result().actualPort());
            } else {
                System.out.println("Error on starting the server");
                ar.cause().printStackTrace();
            }
        });
    }

    private static void handleSubscription(MqttEndpoint endpoint) {
        endpoint.subscribeHandler(subscribe - > {

            List grantedQosLevels = new ArrayList < > ();
            for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
                System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
                grantedQosLevels.add(s.qualityOfService());
            }

            endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);

        });
    }

    private static void handleUnsubscription(MqttEndpoint endpoint) {
        endpoint.unsubscribeHandler(unsubscribe - > {

            for (String t: unsubscribe.topics()) {
                System.out.println("Unsubscription for " + t);
            }

            endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
        });
    }

    private static void publishHandler(MqttEndpoint endpoint) {
        endpoint.publishHandler(message - > {
            handleQoS(message, endpoint);
        }).publishReleaseHandler(messageId - > {
            endpoint.publishComplete(messageId);
        });
    }

    private static void handleQoS(MqttPublishMessage message, MqttEndpoint endpoint) {
        if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            String topicName = message.topicName();

            switch (topicName) {
                case TOPIC_LIGHTS:
                    LightsController.handler(message);
                    break;
                    // Future implementation for Camera
                    // case TOPIC_CAMERA:
                    // CamController.handler(message);
            }

            endpoint.publishAcknowledge(message.messageId());

        } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
            endpoint.publishRelease(message.messageId());
        }
    }

    private static void handleClientDisconnect(MqttEndpoint endpoint) {
        endpoint.disconnectHandler(h - > {
            System.out.println("The remote client has closed the connection.");
        });
    }
}


The broker implements publish/subscribe handlers, which are the backbone of MQTT communication. Also, the implementation provides an unsubscription mechanism, which, for the purpose of the demo, just prints a message in the console.

The topic that the broker must handle is called “lights”. Based on the topic, the broker will know how to dispatch the message. In our case, it gives the message sent from our client to the LightsController class.

MQTT QoS (Quality of Service) ensures that the messaging between the sender and receiver will be guaranteed according to the specific QoS levels. There are three QoS levels:

  • At most once (0): This is the minimum level, or the so called “fire and forget”. The receiver is not obliged to return a response to the sender and the sender is not obliged to resend the message.
  • At least once (1): This ensures that the message will be delivered at least once to the receiver. The receiver must return a message to the sender. However, the message can be delivered more than once, so duplicate messages should be considered.
  • Exactly once (2): The safest and slowest level. It ensures that the message will be delivered only once.

In our case, we are using QoS level 1. When the LightsController finishes with its task, a response to the client will be sent.

Let’s look at the LightsController implementation.

/*/src/main/java/eu/dreamix/controllers/LightsController*/
package eu.dreamix.controllers;
 
import io.vertx.mqtt.messages.MqttPublishMessage;
 
/**
* @author Boyko Dimitrov on 6/26/17.
*/
public class LightsController {
 
   private static final String TURN_ON_INDICATOR = "1";
   private static final String TURN_OFF_INDICATOR = "0";
 
   public static void handler(MqttPublishMessage message) {
       if(TURN_ON_INDICATOR.equals(message.payload().toString())) {
           turnOn();
       } else if(TURN_OFF_INDICATOR.equalsIgnoreCase(message.payload().toString())) {
           turnOff();
       }
   }
 
   private static void turnOn() {
       System.out.println("Lights are on!");
   }
 
   private static void turnOff() {
       System.out.println("Lights are off!");
   }
 
}


This simple expects the message to contain a special value, based on which the controller will “turnOn” or “turnOff” the lights.

Run the application. “MQTT server is listening on port 1883” should appear in the console.

Android Client Configuration

Open the MQTT Dash Android application. The following screen should appear:

MQTT Dash

Click on the “+” sign in the upper right corner. Enter a preferred name of your dashboard in the “name” field (in my case, it will be Demo) and also enter the IP address of the running MQTT Broker application. Click on the upper right floppy disk icon in order to save the configuration. The following screen should appear:

MQTT Dash Demo

Click on your newly created dashboard and check the console of the MQTT Broker application. “MQTT client [<client-id>] request to connect” should appear in the console.

Let’s improve our dashboard and add a switch button to it.

Click on the “+” sign and choose “Switch/button”.  The following screen should appear:

MQTT Dash readme

Insert the following configuration on the screen and click the floppy disk icon in order to save:

  • Name – Home Lights
  • Topic(sub) – lights
  • Other settings – QoS(1)

The following screen should appear:

MQTT light

Click on the checkbox and, again, check the console of the broker application.

“Subscription for lights with QoS AT_LEAST_ONCE”

“Lights are on!”

If you click again on the checkbox, you will see another message which indicates that the “Lights are off!”

Conclusion

The MQTT protocol is a very powerful tool and should be kept in mind when developing IoT projects. If you want to get deeper into the Vert.x implementation of this protocol, check out http://vertx.io/docs/#mqtt-server.

The code from this basic project can be reused and improved without any concerns.

Build an open IoT platform with Red Hat—keep it flexible with open source software.

Topics:
iot ,mqtt ,vert.x ,iot app development ,tutorial

Published at DZone with permission of Boyko Dimitrov, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}