Playing With Docker, MQTT, Grafana, InfluxDB, Python, and Arduino
Take a look at how to gather and visualize IoT data using a time series database, MQTT for communication, Docker containers, and a handy Arduino.
Join the DZone community and get the full member experience.
Join For FreeI must admit that this post is just an excuse to play with Grafana and InfluxDB. InfluxDB is a cool database specifically designed to work with time series data. Grafana is one open source tool used for time series analytics. I want to build a simple prototype. The idea is:
- One Arduino device (ESP32) emits an MQTT event to a Mosquitto server. I’ll use a potentiometer to emulate one sensor. For example, imagine a temperature sensor instead of a potentiometer. This is a circuit I’ve used in previous projects.
- One Python script will be listening to the MQTT event on my Raspberry Pi and will persist the value to the InfluxDB database.
- I will monitor the state of the time series given by the potentiometer with Grafana.
- I will create one alert in Grafana when the average value within 10 seconds is above a threshold. This will trigger a WebHook when the alert changes its state.
- One Python Flask server, a microservice, will be listening to the WebHook and emit an MQTT event, depending on its state.
- One NodeMcu, a type of Arduino device, will be listening to this MQTT event and activate an LED. It will signal a red LED if the alert is ON and a green LED if the alert is OFF.
The Server
As I've said before, we’ll need three servers:
- MQTT server (Mosquitto)
- InfluxDB server
- Grafana server
We’ll use Docker. The Docker host will be running on a Raspberry Pi3. The Raspberry Pi is an ARM device, so we will need Docker images for this architecture.
version: '2'
services:
mosquitto:
image: pascaldevink/rpi-mosquitto
container_name: moquitto
ports:
- "9001:9001"
- "1883:1883"
restart: always
influxdb:
image: hypriot/rpi-influxdb
container_name: influxdb
restart: always
environment:
- INFLUXDB_INIT_PWD="password"
- PRE_CREATE_DB="iot"
ports:
- "8083:8083"
- "8086:8086"
volumes:
- ~/docker/rpi-influxdb/data:/data
grafana:
image: fg2it/grafana-armhf:v4.6.3
container_name: grafana
restart: always
ports:
- "3000:3000"
volumes:
- grafana-db:/var/lib/grafana
- grafana-log:/var/log/grafana
- grafana-conf:/etc/grafana
volumes:
grafana-db:
driver: local
grafana-log:
driver: local
grafana-conf:
driver: local
ESP32
The ESP32 part is very simple. We will only need to connect our potentiometer to the Esp32. The potentiometer has three pins: GND, Signal, and Vcc. For Signal, we’ll use pin 32.
We only need to configure our Wi-Fi network, connect to our MQTT server, and emit the potentiometer value within each loop.
#include <PubSubClient.h>
#include <WiFi.h>
const int potentiometerPin = 32;
// Wi-Fi configuration
const char * ssid = "my_wifi_ssid";
const char * password = "my_wifi_password";
// MQTT configuration
const char * server = "192.168.1.111";
const char * topic = "/pot";
const char * clientName = "com.gonzalo123.esp32";
String payload;
WiFiClient wifiClient;
PubSubClient client(wifiClient);
void wifiConnect() {
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.print("WiFi connected.");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
void mqttReConnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
if (client.connect(clientName)) {
Serial.println("connected");
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void mqttEmit(String topic, String value) {
client.publish((char * ) topic.c_str(), (char * ) value.c_str());
}
void setup() {
Serial.begin(115200);
wifiConnect();
client.setServer(server, 1883);
delay(1500);
}
void loop() {
if (!client.connected()) {
mqttReConnect();
}
int current = (int)((analogRead(potentiometerPin) * 100) / 4095);
mqttEmit(topic, (String) current);
delay(500);
}
MQTT Listener
The ESP32 emits an event (“/pot”) with the value of the potentiometer. So, we’re going to create an MQTT listener that listens to MQTT and persists the value to InfluxDB.
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import datetime
import logging
def persists(msg):
current_time = datetime.datetime.utcnow().isoformat()
json_body = [
{
"measurement": "pot",
"tags": {},
"time": current_time,
"fields": {
"value": int(msg.payload)
}
}
]
logging.info(json_body)
influx_client.write_points(json_body)
logging.basicConfig(level=logging.INFO)
influx_client = InfluxDBClient('docker', 8086, database='iot')
client = mqtt.Client()
client.on_connect = lambda self, mosq, obj, rc: self.subscribe("/pot")
client.on_message = lambda client, userdata, msg: persists(msg)
client.connect("docker", 1883, 60)
client.loop_forever()
Grafana
In Grafana, we need to do two things. First, we will create one data source from our InfluxDB server. From here, it's pretty straightforward.
Next, we’ll create a dashboard. We only have one time series within the value of the potentiometer. I must admit that my dashboard has a lot of things that I’ve created just for fun.
That's the query that I’m using to plot the main graph.
SELECT
last("value") FROM "pot"
WHERE
time >= now() - 5m
GROUP BY
time($interval) fill(previous)
Here, we can see the dashboard.
Here, we can see my alert configuration:
I’ve also created a notification channel with a WebHook. Grafana will use this WebHook for notifications when the state of the alert changes.
WebHook Listener
Grafana will emit a WebHook, so we’ll need a REST endpoint to collect the WebHook calls. I normally use PHP/Lumen to create REST servers, but, in this project, I’ll use Python and Flask.
We need to handle the HTTP Basic Auth and emit an MQTT event. MQTT is a very simple protocol, but it has one very nice feature that fits like a glove here. Let me explain.
Imagine that we’ve got our system up and running and the state is “ok”. Now, we connect one device (for example one big red/green light). Since the “ok” event was fired before we connect the light, our green light will not switch on. We need to wait for the “alert” event if we want to see any light. That’s not cool.
MQTT allows us to “retain” messages. That means that we can emit messages with the “retain” flag to one topic, and, when we connect one device later to this topic, it will receive the message. Here, it’s exactly what we need.
from flask import Flask
from flask import request
from flask_httpauth import HTTPBasicAuth
import paho.mqtt.client as mqtt
import json
client = mqtt.Client()
app = Flask(__name__)
auth = HTTPBasicAuth()
# http basic auth credentials
users = {
"user": "password"
}
@auth.get_password
def get_pw(username):
if username in users:
return users.get(username)
return None
@app.route('/alert', methods=['POST'])
@auth.login_required
def alert():
client.connect("docker", 1883, 60)
data = json.loads(request.data.decode('utf-8'))
if data['state'] == 'alerting':
client.publish(topic="/alert", payload="1", retain=True)
elif data['state'] == 'ok':
client.publish(topic="/alert", payload="0", retain=True)
client.disconnect()
return "ok"
if __name__ == "__main__":
app.run(host='0.0.0.0')
NodeMcu
Finally, the NodeMcu. This part is similar to the ESP32 one. Our LEDs are on pins 4 and 5. We also need to configure the Wi-Fi and connect to the MQTT server. NodeMcu and ESP32 are similar devices, but not the same. For example, we need to use different libraries to connect to the Wi-Fi.
This device will be listening to the MQTT event and trigger one LED or another, depending on its state.
#include <PubSubClient.h>
#include <ESP8266WiFi.h>
const int ledRed = 4;
const int ledGreen = 5;
// Wi-Fi configuration
const char * ssid = "my_wifi_ssid";
const char * password = "my_wifi_password";
// MQTT configuration
const char * server = "192.168.1.111";
const char * topic = "/alert";
const char * clientName = "com.gonzalo123.nodemcu";
int value;
int percent;
String payload;
WiFiClient wifiClient;
PubSubClient client(wifiClient);
void wifiConnect() {
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.print("WiFi connected.");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
void mqttReConnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
if (client.connect(clientName)) {
Serial.println("connected");
client.subscribe(topic);
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void callback(char * topic, byte * payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
String data;
for (int i = 0; i < length; i++) {
data += (char) payload[i];
}
cleanLeds();
int value = data.toInt();
switch (value) {
case 1:
digitalWrite(ledRed, HIGH);
break;
case 0:
digitalWrite(ledGreen, HIGH);
break;
}
Serial.print("] value:");
Serial.println((int) value);
}
void cleanLeds() {
digitalWrite(ledRed, LOW);
digitalWrite(ledGreen, LOW);
}
void setup() {
Serial.begin(9600);
pinMode(ledRed, OUTPUT);
pinMode(ledGreen, OUTPUT);
cleanLeds();
Serial.println("start");
wifiConnect();
client.setServer(server, 1883);
client.setCallback(callback);
delay(1500);
}
void loop() {
Serial.print(".");
if (!client.connected()) {
mqttReConnect();
}
client.loop();
delay(500);
}
Here, you can see the working prototype in action.
And, here is the source code.
Published at DZone with permission of Gonzalo Ayuso, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments