There and Back Again: Mqtt-Based Data Transfer With Kafka Cloud
Not quite as long as the road Mordor, but certainly not something you want to brave alone.
Join the DZone community and get the full member experience.
Join For FreeThrough this article, I would like to list down the detailed steps for communicating from a device to a server in the cloud running a scalable Kafka infrastructure and back from Kafka to the device using Mosquitto broker and Kafka connect.
You may also like: Building a Custom Kafka Connect Connector.
Device to Cloud Connectivity
Step One
Download and install Mosquito for windows here.
Step Two
Navigate to D:\Program Files\mosquitto\mosquitto.conf
and modify the following:
# =================================================================
# Extra listeners
# =================================================================
# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
listener 1883 0.0.0.0
Step Three
Run the command
D:\Program Files\mosquitto>mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic" -m "This request is coming from LOCALLAPTOP007 to Avengers"
D:\Program Files\mosquitto>mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic"
Step Four
Install kafka on the server, avengers@12.XX.XXX.XXX
and test the producer-consumer message interchange using the following commands.
./kafka-topics.sh --list -zookeeper 12.XX.XXX.XXX:2181
bin/kafka-console-consumer.sh --bootstrap-server 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic
bin/kafka-console-producer.sh --broker-list 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic
Step Five
modify /etc/hosts
and add the entries as show below
10.XXX.XX.XX avengers.eastus.cloudapp.azure.com
#Above is the private ip of the cloud machine
12.XX.XXX.XXX avengers.eastus.cloudapp.azure.com
#Above is the public ip of the cloud machine
Step Six
Install mosquito on the cloud (just for testing), using the instructions at this url.
yum install mosquitto
# service mosquitto start
# systemctl enable mosquitto
Step Seven
Check if port 1883 and 9092 are open in the cloud for inbound traffic using the following commands.
netstat | grep 1883
netstat | grep 9092
If the ports are not open, open them for inbound traffic in Azure. You can also run the following commands on the server to open them
firewall-cmd --zone=public --add-port=1883/tcp --permanent
firewall-cmd --reload
iptables-save | grep 1883
Step Eight
Navigate to /etc/mosquitto/mosquitto.conf and modify the following:
# =================================================================
# Extra listeners
# =================================================================
# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# then the default listener will not be started.
# The port number to listen on must be given. Optionally, an ip
# address or host name may be supplied as a second argument. In
# this case, mosquitto will attempt to bind the listener to that
# address and so restrict access to the associated network and
# interface. By default, mosquitto will listen on all interfaces.
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
listener 1883 0.0.0.0
Step Nine
Test, publish, and subscribe between the mosquito broker using the following commands:
mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"
Step 10
Navigate here and unzip the copy of the following files:
kafka-connect-mqtt-1.0-SNAPSHOT.jar
org.eclipse.paho.client.mqttv3-1.0.2.jar
to /opt/kafka_2.12-2.3.0/libs
.
Step 11
Navigate to /opt/kafka_2.12-2.3.0/config
, vi server.properties
, and just make the following change:
Navigate to /opt/kafka_2.12-2.3.0/config and vi connect-standalone.properties and just make the following change:
advertised.listeners=PLAINTEXT://12.XX.XXX.XXX:9092
bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092
Then, navigate to /opt/kafka_2.12-2.3.0/config
, vi mqtt.properties
, and ensure the file has the following entry:
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
kafka.topic=mqtt-mosquitto-topic-kafka
mqtt.client_id=mqtt-kafka-123456789
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://avengers.eastus.cloudapp.azure.com:1883
mqtt.topic=mqtt-mosquitto-topic
Step 12
cd /opt/kafka_2.12-2.3.0
After running the above command, run all the following on separate putty windows.
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
./bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
bin/kafka-console-consumer.sh --bootstrap-server avengers.eastus.cloudapp.azure.com:9092 –topic mqtt-mosquitto-topic-kafka
Run the following first from within the server itself, and then from your Windows local laptop.
mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic" -m "This request is coming from LOCALLAPTOP007 to Avengers"
Ensure that the message sent from mosquito publisher on local laptop arrives in base64 encoded format on the server kafka consumer.
Cloud to Device Connectivity
Step One
Download the Kafka stream connector library and copy the jar to the /home/plugins
folder:
wget https://github.com/Landoop/stream-reactor/releases/download/1.2.2/kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz
tar -xf kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz
cp kafka-connect-mqtt-1.2.2-2.1.0-all.jar /home/plugins
Step Two
Create a new connect.properties
file under config folder as shown below:
[root@avengers config]# pwd
/opt/kafka_2.12-2.3.0/config
[root@avengers config]# cat connect.properties
# Kafka broker IP addresses to connect to
bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092
# Path to directory containing the connector jar
plugin.path=/root/plugins
# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
[root@avengers config]#
Step Three
Create a new Kafka topic called "mqtt-sink" by using the instructions below :
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mqtt-sink
bin/kafka-topics.sh --list -zookeeper localhost:2181
Step Four
Create a new mqtt-sink.properties
file under config folder as below:
[root@avengers config]# pwd
/opt/kafka_2.12-2.3.0/config
[root@avengers config]# cat mqtt-sink.properties
name=mqtt-sink
connector.class=com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=mqtt-sink
connect.mqtt.hosts=tcp://avengers.eastus.cloudapp.azure.com:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.kcql=INSERT INTO /avengerskafka/test SELECT * FROM mqtt-sink
WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
connect.progress.enabled=true
[root@avengers config]#
Step Five
cd /opt/kafka_2.12-2.3.0
After running the command above, run all of the following on separate putty windows
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
./bin/connect-standalone.sh config/connect.properties config/mqtt-sink.properties
bin/kafka-console-producer.sh --broker-list avengers.eastus.cloudapp.azure.com:9092 -topic mqtt-sink
Sent a test json message like
{"id":3,"temp":21.9,"timestamp":1530511201,"Note":"This message going from My Avengers server to the device LOCALLAPTOP007"}
Run the following first from within the server itself and then from your Windows laptop.
mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t "/avengerskafka/test" -q 1
Verify if the message sent from server is received by the laptop
References
Related Articles
Opinions expressed by DZone contributors are their own.
Comments