Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Ingesting Data From Multiple IoT Devices With Apache NiFi 1.7 and MiniFi 0.5

DZone's Guide to

Ingesting Data From Multiple IoT Devices With Apache NiFi 1.7 and MiniFi 0.5

How to ingest data (GPS, Temperature, Humidity, DL Results, Images) from multiple devices and multiple sensors using easy MiniFi and NiFi flows.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Topic: IoT Edge Processing with Apache NiFi and MiniFi and Multiple Deep Learning Libraries

Part 1: Multiple Devices With Data

Keywords: Deep Learning On The Edge, GPS Ingestion, Sense-Hat and Rainbow Hat Sensor Ingest, WebCam Image Ingest.

In preparation for my talk at Strata in NYC, I am updating my IoT demos for more devices, more data types, and more actions.

I have three streams coming from each device including web camera images.

When we are sending data from a MiniFi agent we need to define a port on an Apache NiFi server/cluster to receive it.

So I designed my MiniFi flow in the Apache NiFi UI (pretty soon there will be a special designer for this). You then highlight everything there and hit 'Create Template.' You can then export it and convert it to config.yml. Again, this process will be automated and connected with the NiFi Registry very shortly to reduce the amount of clicking.

This is an example. When you connect to it in your flow you design it in Apache NiFi UI, you will connect to this port on the Remote Processor Group. If you are manually editing one (okay never do this, but sometimes I have to), you can copy that ID from this Port Details and past it in the file.

Once MiniFi has it's config.yml and it's started, we will start getting messages to that Port.

You can see I have two inputs, one for Movidius and one for Rainbow. I could just have one and route to what I want. It's up to you how you want to segment these flows.

Welcome to Apache NiFi registry v0.2.0, this one works just as well. Very stable, but with some new magic. You can now connect to Git and GitHub!

We have structured JSON, so let's infer a schema, clean it up, and store it in the Hortonworks Schema Registry. That will make it versioned and REST enabled. I add one for the each of the two JSON file types I am sending from the rainbow device. You can see the schemas in full at the bottom of the article.

The data is received from MiniFi on my local NiFi edge server for simple event processing, filtering, and analysis.

I route based on the two types of files, apply their schema, do a simple filter via SQL and send the converted AVRO formatted file to my cloud hosted cluster.

Once I get the data I send it from my edge server to my cloud HDF 3.2 cluster. For images, I send them to my existing image storage processor group. For my other two types of files I convert them to Apache ORC and store them in HDFS as Apache Hive tables.

Server Dashboard

Rainbow Processing

Routing is Easy

On High Humidity, Send a Slack Message (Query on humidity value)

We can dive into any flowfile as it travels through the system and examine its data and metadata.

Now that my data is saved in HDFS with Hive tables on top I can use the latest version of Apache Zeppelin to analyze the data.

I added some maps to Zeppelin via Helium, which is now available in HDP 3.0.

I found a bunch of new chart types, this one could be insightful.

So with the latest NiFi 1.7.1 and HDP 3.0 I can do a lot of interesting things. Next up, let's run a Dockerized TensorFlow application in my HDP 3.0 cluster.

Strata Talk

Python Scripts

Schemas

rainbow

{
 "type": "record",
 "name": "rainbow",
 "fields": [
  {
   "name": "tempf",
   "type": "double",
   "doc": "Type inferred from '84.15'"
  },
  {
   "name": "cputemp",
   "type": "double",
   "doc": "Type inferred from '53.0'"
  },
  {
   "name": "pressure",
   "type": "double",
   "doc": "Type inferred from '101028.56'"
  },
  {
   "name": "host",
   "type": "string",
   "doc": "Type inferred from '\"rainbow\"'"
  },
  {
   "name": "uniqueid",
   "type": "string",
   "doc": "Type inferred from '\"rainbow_uuid_20180718234222\"'"
  },
  {
   "name": "ipaddress",
   "type": "string",
   "doc": "Type inferred from '\"192.168.1.165\"'"
  },
  {
   "name": "temp",
   "type": "double",
   "doc": "Type inferred from '38.58'"
  },
  {
   "name": "diskfree",
   "type": "string",
   "doc": "Type inferred from '\"4831.2 MB\"'"
  },
  {
   "name": "altitude",
   "type": "double",
   "doc": "Type inferred from '80.65'"
  },
  {
   "name": "ts",
   "type": "string",
   "doc": "Type inferred from '\"2018-07-18 23:42:22\"'"
  },
  {
   "name": "tempf2",
   "type": "double",
   "doc": "Type inferred from '28.97'"
  },
  {
   "name": "memory",
   "type": "double",
   "doc": "Type inferred from '32.3'"
  }
 ]
}

gps

{
 "type": "record",
 "name": "gps",
 "fields": [
  {
   "name": "speed",
   "type": "string",
   "doc": "Type inferred from '\"0.066\"'"
  },
  {
   "name": "diskfree",
   "type": "string",
   "doc": "Type inferred from '\"4830.3 MB\"'"
  },
  {
   "name": "altitude",
   "type": "string",
   "doc": "Type inferred from '\"43.0\"'"
  },
  {
   "name": "ts",
   "type": "string",
   "doc": "Type inferred from '\"2018-07-18 23:46:39\"'"
  },
  {
   "name": "cputemp",
   "type": "double",
   "doc": "Type inferred from '54.0'"
  },
  {
   "name": "latitude",
   "type": "string",
   "doc": "Type inferred from '\"40.2681555\"'"
  },
  {
   "name": "track",
   "type": "string",
   "doc": "Type inferred from '\"0.0\"'"
  },
  {
   "name": "memory",
   "type": "double",
   "doc": "Type inferred from '32.3'"
  },
  {
   "name": "host",
   "type": "string",
   "doc": "Type inferred from '\"rainbow\"'"
  },
  {
   "name": "uniqueid",
   "type": "string",
   "doc": "Type inferred from '\"gps_uuid_20180718234640\"'"
  },
  {
   "name": "ipaddress",
   "type": "string",
   "doc": "Type inferred from '\"192.168.1.165\"'"
  },
  {
   "name": "epd",
   "type": "string",
   "doc": "Type inferred from '\"nan\"'"
  },
  {
   "name": "utc",
   "type": "string",
   "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  },
  {
   "name": "epx",
   "type": "string",
   "doc": "Type inferred from '\"40.135\"'"
  },
  {
   "name": "epy",
   "type": "string",
   "doc": "Type inferred from '\"42.783\"'"
  },
  {
   "name": "epv",
   "type": "string",
   "doc": "Type inferred from '\"171.35\"'"
  },
  {
   "name": "ept",
   "type": "string",
   "doc": "Type inferred from '\"0.005\"'"
  },
  {
   "name": "eps",
   "type": "string",
   "doc": "Type inferred from '\"85.57\"'"
  },
  {
   "name": "longitude",
   "type": "string",
   "doc": "Type inferred from '\"-74.529094\"'"
  },
  {
   "name": "mode",
   "type": "string",
   "doc": "Type inferred from '\"3\"'"
  },
  {
   "name": "time",
   "type": "string",
   "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  },
  {
   "name": "climb",
   "type": "string",
   "doc": "Type inferred from '\"0.0\"'"
  },
  {
   "name": "epc",
   "type": "string",
   "doc": "Type inferred from '\"nan\"'"
  }
 ]
}

SQL

%sql


CREATE EXTERNAL TABLE IF NOT EXISTS movidiussense (label5 STRING, runtime STRING, label1 STRING, diskfree STRING, top1 STRING, starttime STRING, label2 STRING, label3 STRING, top3pct STRING, host STRING, top5pct STRING, humidity DOUBLE, currenttime STRING, roll DOUBLE, uuid STRING, label4 STRING, tempf DOUBLE, y DOUBLE, top4pct STRING, cputemp2 DOUBLE, top5 STRING, top2pct STRING, ipaddress STRING, cputemp INT, pitch DOUBLE, x DOUBLE, z DOUBLE, yaw DOUBLE, pressure DOUBLE, top3 STRING, temp DOUBLE, memory DOUBLE, top4 STRING, imagefilename STRING, top1pct STRING, top2 STRING) STORED AS ORC LOCATION '/movidiussense'


%sql


CREATE EXTERNAL TABLE IF NOT EXISTS minitensorflow2 (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id INT) STORED AS ORC LOCATION '/minifitensorflow2'

%sql


CREATE EXTERNAL TABLE IF NOT EXISTS gps (speed STRING, diskfree STRING, altitude STRING, ts STRING, cputemp DOUBLE, latitude STRING, track STRING, memory DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, epd STRING, utc STRING, epx STRING, epy STRING, epv STRING, ept STRING, eps STRING, longitude STRING, mode STRING, time STRING, climb STRING, epc STRING) STORED AS ORC LOCATION '/gps'


%sql


CREATE EXTERNAL TABLE IF NOT EXISTS rainbow (tempf DOUBLE, cputemp DOUBLE, pressure DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, temp DOUBLE, diskfree STRING, altitude DOUBLE, ts STRING, 
                                             tempf2 DOUBLE, memory DOUBLE) STORED AS ORC LOCATION '/rainbow'

Thanks for reading, let me know if there are enhancements or additional functionality that may be of interest to you. I am looking for more features to add to my demo. I most likely will add another 5-10 devices into one port and see where it makes sense to do Aggregates, Counts, Windowing, and more stream processing.

Stay in touch!

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
nifi ,minifi ,big data ,sensors ,raspberry pi

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}