Sensor Reading With Apache NiFi 1.0.0
See how to use MQTT to push data from IoT devices to Apache NiFi servers using Python applications. Also examples on using Python with Flask for REST API.
Join the DZone community and get the full member experience.
Join For FreeThere are many types of sensors, devices, and meters that can be great sources of data. Some can push data, some can pull data, some provide APIs, and some give you access to install software.
How to Access Sensors
One option is to install MiNiFi on the device if you have root access. This will provide fast access to allow you to script and manage your local data. Another option for bigger devices is to install a full Java-based NiFi node.
It starts becoming harder once you have tens of thousands of devices. You can install an HDF Edge and communicate from this node to your HDF cluster via Site-to-Site protocol. From this Edge Node that acts as an accumulator for many devices (a good idea so that you don't send 10,000 network requests a second from each set of devices, keep as much traffic locally to save time, time-outs, networking, and cloud costs). You can also now aggregate and send larger batches of data and also process some summaries and aggregates locally in NiFi. This will also let you populate local databases, dashboards, and statistics that may only be of interest to the local source of the sensors (perhaps a plant manager or automated monitoring system).
Another option is to have devices push or pull to a local or remote NiFi install via various protocols including TCP/IP, UDP/IP, REST HTTP, JMS, MQTT, SFTP and Email.
Device Push to NiFi
Your device can send messages to NiFi via any number of protocols listed. For my example, I push via MQTT. My local NiFi node will consume these messages via ConsumeMQTT.
Reference: Paho-MQTT
Your device will need to run Linux (or something related), have Python 2.7 or better and PiP installed. With Pip, you can install the Eclipse library that you need to send MQTT messages. pip install paho-mqtt
import paho.mqtt.client as paho
client = paho.Client()
client.connect("servername", 1883, 60)
client.publish("sensor", payload="Test", qos=0, retain=True)
Where "servername" is the name of the server you are sending the message to (it could also be on the NiFi Node, another server, a bigger device, a central aggregator or messaging server). I would recommend having it as close in the network as possible. "sensor" is the name of the topic that we are publishing the message to, NiFi will consume this message. I have cron job setup to run every minute and publish messages (* * * * * /opt/demo/sendit.sh )
NiFi Poll Device
NiFi can poll your device and consume from various protocols like JMS, MQTT, SFTP, TCP and UDP. For my example, I chose a REST API over HTTP to get past hurdles of firewalls and such.
I set up a Flask Server on RPI, to run my REST API, I run this in a shell script.
export FLASK_APP=hello.py
flask run --host=0.0.0.0 --port=8888 --no-debugger
To install Flask, you need to run pip install flask
Sensor Reading Code
#!flask/bin/python
from flask import Flask, jsonify
import sys
import time
import datetime
import subprocess
import sys
import urllib2
import json
import paho.mqtt.client as paho
from sense_hat import SenseHat
sense = SenseHat()
sense.clear()
app = Flask(__name__)
@app.route('/pi/api/v1.0/sensors', methods=['GET'])
def get_sensors():
p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = p.communicate()
temp = sense.get_temperature()
temp = round(temp, 1)
temph = sense.get_temperature_from_humidity()
temph = round(temph, 1)
tempp = sense.get_temperature_from_pressure()
tempp = round(tempp, 1)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
tasks = [ { 'tempp': tempp, 'temph': temph, 'cputemp': out, 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]
# As an option we can push this message when we get called as well
client = paho.Client()
client.connect("mqttmessageserver", 1883, 60)
client.publish("sensor", payload=jsonify({'readings': tasks}), qos=0, retain=True)
return jsonify({'readings': tasks})
@app.route('/pi/api/v1.0/location', methods=['GET'])
def get_loc():
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
x=round(x, 0)
y=round(y, 0)
z=round(z, 0)
tasks = [ { 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ]
return jsonify({'readings': tasks})
@app.route('/pi/api/v1.0/show', methods=['GET'])
def get_pi():
temp = sense.get_temperature()
temp = round(temp, 1)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
# 8x8 RGB
sense.clear()
info = 'T(C): ' + str(temp) + 'H: ' + str(humidity) + 'P: ' + str(pressure)
sense.show_message(info, text_colour=[255, 0, 0])
sense.clear()
tasks = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]
return jsonify({'readings': tasks})
if __name__ == '__main__':
app.run(debug=True)
The device I am testing is a Raspberry Pi 3 Model B with a Sense Hat sensor attachment. Besides having sensors for temperature, humidity, and barometric pressures it also has an 8x8 light grid for displaying text and simple graphics. We can use this to print messages (sense.show_message) or warnings that we send from NiFi. This allows for two-way very visceral communication to remote devices. This could be used to notify local personnel of conditions.
NiFi 1.0.0 Flows
JSON File Landed in HDFS in our HDP 2.5 Cluster
[root@myserverhdp sensors]# hdfs dfs -ls /sensor
Found 2 items
-rw-r--r-- 3 root hdfs 202 2016-09-09 17:26 /sensor/181528179026826
drwxr-xr-x - hdfs hdfs 0 2016-09-09 15:43 /sensor/failure
[root@tspanndev13 sensors]# hdfs dfs -cat /sensor/181528179026826
{
"readings": [
{
"cputemp": "temp=55.8'C\n",
"humidity": 40.8,
"pressure": 1014.1,
"temp": 40.0,
"tempf": 84.0,
"temph": 40.0,
"tempp": 39.1
}
]
}
The final results of our flow is a JSON file on HDFS. We could easily send a copy of the data to Phoenix via PutSQL or to via PutQL or to Spark Streaming for additional processing via Site-To-Site or Kafka.
Published at DZone with permission of Timothy Spann, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments