{{announcement.body}}
{{announcement.title}}

EdgeAI: Google Coral With Coral Environmental Sensors and TPU

DZone 's Guide to

EdgeAI: Google Coral With Coral Environmental Sensors and TPU

A hands-on guide to developing sensor-driven EdgeAI application with MiNiFi and NiFi and TensorFlow lite.

· IoT Zone ·
Free Resource

Building MiNiFi AI IoT Apps With the New Cloudera EFM 

Starting an IoT application that runs local deep learning classification, business rules, transformations, routing, compression, and sends this data efficiently to a central cloud location for processing and storage is hard. With the advent of modern open source streaming tools from Cloudera, it is now simple.

Edge Flow Manager lets me deploy drag-n-drop GUI code that a generate on EFM's web interface to thousands of remote devices running MiNiFi agents. In my example, I have a MiNiFi Java agent installed on a Raspberry Pi with Coral Sensors and a Google Coral TPU. I can deploy models through EFM as well as my logic.

Once an IoT Application is deployed, it will start running it on device logic and classification and send the results securely over HTTP(s) or over TCP/IP for Kafka/MQTT. Once received in my central NiFi clusters I can do things like convert data types, query live data and store it permanently in a fast data store like Apache Kudu/Impala, Apache HBase, or Apache Hive.

You might also like: Artificial Intelligence and IoT: Why They’re a Winning Combo

It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.







NiFi Flow receiving from MiNiFi Java agent


In a cluster in my CDP-DC Cluster, I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu. We filter our data with Streaming SQL.


We can use SQL to route, create aggregates like averages, choose a subset of fields, and limit data returned. Using the power of Apache Calcite, streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text. Read and write different formats and convert when your SQL is done. Or just to SELECT * FROM FLOWFILE to get everything.  


We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.


We can search Atlas for Kafka topics.


From coral Kafka topic to NiFi to Kudu.


Details on Coral Kafka topic


Examining the Hive Metastore data on the Coral Kudu table


NiFi Flow Details in Atlas


Details on Alerts Topic


Statistics from Atlas



See: https://www.datainmotion.dev/2020/02/connecting-apache-nifi-to-apache-atlas.html

Example Web Camera Image


 Example JSON Record

[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"127.0.1.1","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]

Querying Kudu results in Hue


Pushing Alerts to Slack from NiFi


I am running on Apache NiFi 1.11.3 and wanted to point out a new feature. Download flow: Will download the highlighted flow/pgroup as JSON.


Looking at NiFi counters to monitor progress:

We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.

Shell (coralrun.sh)

Shell
 


x
 
1
#!/bin/bash
2
3
DATE=$(date +"%Y-%m-%d_%H%M%S")
4
5
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
6
7
python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null


Kudu Table DDL

Python 3 (test.py)

Python
 


xxxxxxxxxx
1
122
 
1
import time
2
import sys
3
import subprocess
4
import os
5
import base64
6
import uuid
7
import datetime
8
import traceback
9
import base64
10
import json
11
from time import gmtime, strftime
12
import math
13
import random, string
14
import time
15
import psutil
16
from getmac import get_mac_address
17
from coral.enviro.board import EnviroBoard
18
from luma.core.render import canvas
19
from PIL import Image, ImageDraw, ImageFont
20
import os
21
import argparse
22
from edgetpu.classification.engine import ClassificationEngine
23
24
# Importing socket library 
25
import socket 
26
27
start = time.time()
28
starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
29
30
def ReadLabelFile(file_path):
31
    with open(file_path, 'r') as f:
32
        lines = f.readlines()
33
    ret = {}
34
    for line in lines:
35
        pair = line.strip().split(maxsplit=1)
36
        ret[int(pair[0])] = pair[1].strip()
37
    return ret
38
39
# Google Example Code
40
def update_display(display, msg):
41
    with canvas(display) as draw:
42
        draw.text((0, 0), msg, fill='white')
43
44
def getCPUtemperature():
45
    res = os.popen('vcgencmd measure_temp').readline()
46
    return(res.replace("temp=","").replace("'C\n",""))
47
48
49
# Get MAC address of a local interfaces
50
def psutil_iface(iface):
51
    # type: (str) -> Optional[str]
52
    import psutil
53
    nics = psutil.net_if_addrs()
54
    if iface in nics:
55
        nic = nics[iface]
56
        for i in nic:
57
            if i.family == psutil.AF_LINK:
58
                return i.address
59
60
# /opt/demo/examples-camera/all_models  
61
row = { }
62
63
try:
64
65
#i = 1
66
#while i == 1:
67
    parser = argparse.ArgumentParser()
68
    parser.add_argument('--image', help='File path of the image to be recognized.', required=True)
69
    args = parser.parse_args()
70
71
    # Prepare labels.
72
    labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')
73
74
    # Initialize engine.
75
    engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')
76
77
    # Run inference.
78
    img = Image.open(args.image)
79
80
    scores = {}
81
    kCount = 1
82
83
    # Iterate Inference Results
84
    for result in engine.ClassifyWithImage(img, top_k=5):
85
        scores['label_' + str(kCount)] = labels[result[0]]
86
        scores['score_' + str(kCount)] = "{:.2f}".format(result[1])
87
        kCount = kCount + 1    
88
89
    enviro = EnviroBoard()
90
    host_name = socket.gethostname()
91
    host_ip = socket.gethostbyname(host_name)
92
    cpuTemp=int(float(getCPUtemperature()))
93
    uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
94
    usage = psutil.disk_usage("/")
95
    end = time.time()
96
    row.update(scores)
97
    row['host'] = os.uname()[1]
98
    row['ip'] = host_ip
99
    row['macaddress'] = psutil_iface('wlan0')
100
    row['cputemp'] = round(cpuTemp,2)
101
    row['te'] = "{0:.2f}".format((end-start))
102
    row['starttime'] = starttf
103
    row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
104
    row['cpu'] = psutil.cpu_percent(interval=1)
105
    row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
106
    row['memory'] = psutil.virtual_memory().percent
107
    row['id'] = str(uuid2)
108
    row['message'] = "Success"
109
    row['temperature'] = '{0:.2f}'.format(enviro.temperature)
110
    row['humidity'] = '{0:.2f}'.format(enviro.humidity)
111
    row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)    
112
    row['ambient_light'] = '{0}'.format(enviro.ambient_light)
113
    row['pressure'] = '{0:.2f}'.format(enviro.pressure)
114
    msg = 'Temp: {0}'.format(row['temperature'])
115
    msg += 'IP: {0}'.format(row['ip'])
116
    update_display(enviro.display, msg)
117
#    i = 2
118
119
except:
120
    row['message'] = "Error"
121
122
print(json.dumps(row))


Source Code (GitHub)

Sensors/Devices/Hardware:

  • Humdity-HDC2010 humidity sensor
  • Light-OPT3002 ambient light sensor
  • Barometric-BMP280 barometric pressure sensor
  • PS3 Eye Camera and Microphone USB
  • Raspberry Pi 3B+
  • Google Coral Environmental Sensor Board
  • Google Coral USB Accelerator TPU

References:

Further Reading

AI, IoT, and Mobile Apps for Construction

Why IoT Needs AI to Succeed

Topics:
edge2ai, edgeai, google coral, iot, kudu, minifi, nifi, sensors, tensorflow, tensorflowlite

Published at DZone with permission of Tim Spann , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}