DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • What Is a Modern Developer?
  • How to Create a Real-Time Scalable Streaming App Using Apache NiFi, Apache Pulsar, and Apache Flink SQL
  • Building Real-Time Weather Dashboards With Apache Pinot
  • Best Practices for Data Pipeline Error Handling in Apache NiFi

Trending

  • DevSecOps: Integrating Security Into Your DevOps Workflow
  • Researcher Finds GitHub Admin Credentials of Car Company Thanks to Misconfiguration
  • Creating a Deep vs. Shallow Copy of an Object in Java
  • What Is GitOps?
  1. DZone
  2. Data Engineering
  3. Data
  4. Sensor Reading With Apache NiFi 1.0.0

Sensor Reading With Apache NiFi 1.0.0

See how to use MQTT to push data from IoT devices to Apache NiFi servers using Python applications. Also examples on using Python with Flask for REST API.

Timothy Spann user avatar by
Timothy Spann
·
Sep. 13, 16 · Tutorial
Like (4)
Save
Tweet
Share
9.49K Views

Join the DZone community and get the full member experience.

Join For Free

There are many types of sensors, devices, and meters that can be great sources of data. Some can push data, some can pull data, some provide APIs, and some give you access to install software.

How to Access Sensors

One option is to install MiNiFi on the device if you have root access. This will provide fast access to allow you to script and manage your local data. Another option for bigger devices is to install a full Java-based NiFi node.

It starts becoming harder once you have tens of thousands of devices. You can install an HDF Edge and communicate from this node to your HDF cluster via Site-to-Site protocol. From this Edge Node that acts as an accumulator for many devices (a good idea so that you don't send 10,000 network requests a second from each set of devices, keep as much traffic locally to save time, time-outs, networking, and cloud costs). You can also now aggregate and send larger batches of data and also process some summaries and aggregates locally in NiFi. This will also let you populate local databases, dashboards, and statistics that may only be of interest to the local source of the sensors (perhaps a plant manager or automated monitoring system).

Another option is to have devices push or pull to a local or remote NiFi install via various protocols including TCP/IP, UDP/IP, REST HTTP, JMS, MQTT, SFTP and Email.

Device Push to NiFi

Your device can send messages to NiFi via any number of protocols listed. For my example, I push via MQTT. My local NiFi node will consume these messages via ConsumeMQTT.

Reference: Paho-MQTT

Your device will need to run Linux (or something related), have Python 2.7 or better and PiP installed. With Pip, you can install the Eclipse library that you need to send MQTT messages. pip install paho-mqtt

import paho.mqtt.client as paho
client = paho.Client()
client.connect("servername", 1883, 60)
client.publish("sensor", payload="Test", qos=0, retain=True)


Where "servername" is the name of the server you are sending the message to (it could also be on the NiFi Node, another server, a bigger device, a central aggregator or messaging server). I would recommend having it as close in the network as possible. "sensor" is the name of the topic that we are publishing the message to, NiFi will consume this message. I have cron job setup to run every minute and publish messages (* * * * * /opt/demo/sendit.sh )

NiFi Poll Device

NiFi can poll your device and consume from various protocols like JMS, MQTT, SFTP, TCP and UDP. For my example, I chose a REST API over HTTP to get past hurdles of firewalls and such.

I set up a Flask Server on RPI, to run my REST API, I run this in a shell script.

export FLASK_APP=hello.py
flask run --host=0.0.0.0 --port=8888 --no-debugger

To install Flask, you need to run pip install flask

Sensor Reading Code

#!flask/bin/python
from flask import Flask, jsonify
import sys
import time
import datetime
import subprocess
import sys
import urllib2
import json
import paho.mqtt.client as paho
from sense_hat import SenseHat

sense = SenseHat()
sense.clear()

app = Flask(__name__)

@app.route('/pi/api/v1.0/sensors', methods=['GET'])
def get_sensors():
       p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE,
                                    stderr=subprocess.PIPE)
       out, err = p.communicate()
       temp = sense.get_temperature()
       temp = round(temp, 1)
       temph = sense.get_temperature_from_humidity()
        temph = round(temph, 1)
        tempp = sense.get_temperature_from_pressure()
        tempp = round(tempp, 1)
       humidity = sense.get_humidity()
       humidity = round(humidity, 1)
       pressure = sense.get_pressure()
       pressure = round(pressure, 1)
       tasks = [ { 'tempp': tempp, 'temph': temph, 'cputemp': out, 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]

# As an option we can push this message when we get called as well
     client = paho.Client()
      client.connect("mqttmessageserver", 1883, 60)
     client.publish("sensor", payload=jsonify({'readings': tasks}), qos=0, retain=True)
       return jsonify({'readings': tasks})

@app.route('/pi/api/v1.0/location', methods=['GET'])
def get_loc():
       orientation = sense.get_orientation()
        pitch = orientation['pitch']
        roll = orientation['roll']
        yaw = orientation['yaw']
        acceleration = sense.get_accelerometer_raw()
       x = acceleration['x']
       y = acceleration['y']
       z = acceleration['z']
       x=round(x, 0)
       y=round(y, 0)
       z=round(z, 0)
       tasks = [ { 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ]
       return jsonify({'readings': tasks})

@app.route('/pi/api/v1.0/show', methods=['GET'])
def get_pi():
       temp = sense.get_temperature()
       temp = round(temp, 1)
       humidity = sense.get_humidity()
       humidity = round(humidity, 1)
       pressure = sense.get_pressure()
       pressure = round(pressure, 1)
       # 8x8 RGB
        sense.clear()
        info = 'T(C): ' + str(temp) + 'H: ' + str(humidity) + 'P: ' + str(pressure)
        sense.show_message(info, text_colour=[255, 0, 0])
       sense.clear()
       tasks = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure } ]
       return jsonify({'readings': tasks})

if __name__ == '__main__':
    app.run(debug=True)


The device I am testing is a Raspberry Pi 3 Model B with a Sense Hat sensor attachment. Besides having sensors for temperature, humidity, and barometric pressures it also has an 8x8 light grid for displaying text and simple graphics. We can use this to print messages (sense.show_message) or warnings that we send from NiFi. This allows for two-way very visceral communication to remote devices. This could be used to notify local personnel of conditions.

NiFi 1.0.0 Flows

JSON File Landed in HDFS in our HDP 2.5 Cluster

[root@myserverhdp sensors]# hdfs dfs -ls /sensor
Found 2 items
-rw-r--r--   3 root hdfs        202 2016-09-09 17:26 /sensor/181528179026826
drwxr-xr-x   - hdfs hdfs          0 2016-09-09 15:43 /sensor/failure
[root@tspanndev13 sensors]# hdfs dfs -cat /sensor/181528179026826
{
  "readings": [
    {
      "cputemp": "temp=55.8'C\n",
      "humidity": 40.8,
      "pressure": 1014.1,
      "temp": 40.0,
      "tempf": 84.0,
      "temph": 40.0,
      "tempp": 39.1
    }
  ]
}

The final results of our flow is a JSON file on HDFS. We could easily send a copy of the data to Phoenix via PutSQL or to via PutQL or to Spark Streaming for additional processing via Site-To-Site or Kafka.

Apache NiFi

Published at DZone with permission of Timothy Spann, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • What Is a Modern Developer?
  • How to Create a Real-Time Scalable Streaming App Using Apache NiFi, Apache Pulsar, and Apache Flink SQL
  • Building Real-Time Weather Dashboards With Apache Pinot
  • Best Practices for Data Pipeline Error Handling in Apache NiFi

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: