Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

There and Back Again: Mqtt-Based Data Transfer With Kafka Cloud

DZone 's Guide to

There and Back Again: Mqtt-Based Data Transfer With Kafka Cloud

Not quite as long as the road Mordor, but certainly not something you want to brave alone.

· Big Data Zone ·
Free Resource

hobbit-hole-lord-of-the-rings

Still not sure why they didn't just use the eagles from the beginning

Through this article, I would like to list down the detailed steps for communicating from a device to a server in the cloud running a scalable Kafka infrastructure and back from Kafka to the device using Mosquitto broker and Kafka connect.

You may also like: Building a Custom Kafka Connect Connector.

Project architecture

Project architecture

Device to Cloud Connectivity 

Step One

Download and install Mosquito for windows here.

Step Two

Navigate to D:\Program Files\mosquitto\mosquitto.conf and modify the following: 

# =================================================================
# Extra listeners
# =================================================================

# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
listener 1883 0.0.0.0


Step Three

Run the command

D:\Program Files\mosquitto>mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic" -m "This request is coming from LOCALLAPTOP007 to Avengers"
D:\Program Files\mosquitto>mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic"


Step Four

Install kafka on the server, avengers@12.XX.XXX.XXX and test the producer-consumer message interchange using the following commands. 

./kafka-topics.sh --list -zookeeper 12.XX.XXX.XXX:2181
bin/kafka-console-consumer.sh --bootstrap-server 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic
bin/kafka-console-producer.sh --broker-list 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic


Step Five

modify /etc/hosts  and add the entries as show below

10.XXX.XX.XX avengers.eastus.cloudapp.azure.com
#Above is the private ip of the cloud machine
12.XX.XXX.XXX avengers.eastus.cloudapp.azure.com
#Above is the public ip of the cloud machine


Step Six

Install mosquito on the cloud (just for testing), using the instructions at this url

yum install mosquitto
# service mosquitto start
# systemctl enable mosquitto


Step Seven

Check if port 1883 and 9092 are open in the cloud for inbound traffic using the following commands. 

netstat | grep 1883
netstat | grep 9092


If the ports are not open, open them for inbound traffic in Azure. You can also run the following commands on the server to open them

firewall-cmd --zone=public --add-port=1883/tcp --permanent
firewall-cmd --reload
iptables-save | grep 1883


Step Eight

Navigate to /etc/mosquitto/mosquitto.conf and modify the following:

# =================================================================
# Extra listeners
# =================================================================

# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# then the default listener will not be started.
# The port number to listen on must be given. Optionally, an ip
# address or host name may be supplied as a second argument. In
# this case, mosquitto will attempt to bind the listener to that
# address and so restrict access to the associated network and
# interface. By default, mosquitto will listen on all interfaces.
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
listener 1883 0.0.0.0


Step Nine

Test, publish, and subscribe between the mosquito broker using the following commands: 

mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"


Step 10

Navigate here and unzip the copy of the following files: 

  •  kafka-connect-mqtt-1.0-SNAPSHOT.jar

  •  org.eclipse.paho.client.mqttv3-1.0.2.jar

to /opt/kafka_2.12-2.3.0/libs. 


Step 11

Navigate to /opt/kafka_2.12-2.3.0/config, vi server.properties, and just make the following change:

Navigate to /opt/kafka_2.12-2.3.0/config and vi connect-standalone.properties and just make the following change:

advertised.listeners=PLAINTEXT://12.XX.XXX.XXX:9092
bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092


Then, navigate to /opt/kafka_2.12-2.3.0/config, vi mqtt.propertiesand ensure the file has the following entry: 

name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

kafka.topic=mqtt-mosquitto-topic-kafka
mqtt.client_id=mqtt-kafka-123456789

mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60

mqtt.server_uris=tcp://avengers.eastus.cloudapp.azure.com:1883
mqtt.topic=mqtt-mosquitto-topic


Step 12

cd /opt/kafka_2.12-2.3.0


After running the above command, run all the following on separate putty windows. 

./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
./bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
bin/kafka-console-consumer.sh --bootstrap-server avengers.eastus.cloudapp.azure.com:9092 –topic mqtt-mosquitto-topic-kafka


Run the following first from within the server itself, and then from your Windows local laptop.

mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic" -m "This request is coming from LOCALLAPTOP007 to Avengers"


Ensure that the message sent from mosquito publisher on local laptop arrives in base64 encoded format on the server kafka consumer.

Cloud to Device Connectivity

Step One

Download the Kafka stream connector library and copy the jar to the /home/plugins folder: 

wget https://github.com/Landoop/stream-reactor/releases/download/1.2.2/kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz
tar -xf kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz
cp kafka-connect-mqtt-1.2.2-2.1.0-all.jar /home/plugins


Step Two

Create a new connect.properties file under config folder as shown below:

[root@avengers config]# pwd
/opt/kafka_2.12-2.3.0/config
[root@avengers config]# cat connect.properties
# Kafka broker IP addresses to connect to
bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092

# Path to directory containing the connector jar
plugin.path=/root/plugins

# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
[root@avengers config]#


Step Three

Create a new Kafka topic called "mqtt-sink" by using the instructions below :

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mqtt-sink
bin/kafka-topics.sh --list -zookeeper localhost:2181


Step Four

Create a new mqtt-sink.properties file under config folder as below:

[root@avengers config]# pwd
/opt/kafka_2.12-2.3.0/config
[root@avengers config]# cat mqtt-sink.properties
name=mqtt-sink
connector.class=com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=mqtt-sink
connect.mqtt.hosts=tcp://avengers.eastus.cloudapp.azure.com:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.kcql=INSERT INTO /avengerskafka/test SELECT * FROM mqtt-sink
WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
connect.progress.enabled=true
[root@avengers config]#


Step Five

cd /opt/kafka_2.12-2.3.0


After running the command above, run all of the following on separate putty windows

./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
./bin/connect-standalone.sh config/connect.properties config/mqtt-sink.properties
bin/kafka-console-producer.sh --broker-list avengers.eastus.cloudapp.azure.com:9092 -topic mqtt-sink
Sent a test json message like
{"id":3,"temp":21.9,"timestamp":1530511201,"Note":"This message going from My Avengers server to the device LOCALLAPTOP007"}


Run the following first from within the server itself and then from your Windows laptop. 

mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t "/avengerskafka/test" -q 1
Verify if the message sent from server is received by the laptop


References

Related Articles

Topics:
kafka connect platform ,mqtt protocol ,internet of things ,mosquitto ,mqtt

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}