Real-Time TensorFlow Camera Analysis With Sensors
Real-Time TensorFlow Camera Analysis With Sensors
Check out a sample architecture and a MiniFi script designed to bring TensorFlow's analytical capabilities to a Raspberry Pi camera.
Join the DZone community and get the full member experience.
Join For FreeToday, we're going to use a Raspberry Pi camera, sensors, TensorFlow, and MiniFi to create a real-time camera analysis setup. Let's start with a look at the flow we'll be working with today. We'll touch on what it means below:
The MiniFi flow executes two scripts: one to call TensorFlow Python, which captures an OpenCV Raspberry Pi camera and runs Inception on it. That message is formatted as JSON and sent on. The second script reads GPS values from a USB GSP sensor and outputs JSON. GetFile reads the Pi camera image produced by the ClassifyImages process. CleanupLogs is a standalone timed script that cleans up old logs on the Raspberry Pi.
Using InferredAvroSchema, I created a schema for the GPS unit and stored it in the Hortonworks Schema Registry.
This is the provenance event for a typical GPS message sent. You can see what shell script we ran and from what host.
In Apache NiFi, we process the message, routing to the correct place, setting a schema, and querying it for a latitude. Then we convert the AVRO record to ORC to save as a Hive table.
MiniFi requires that we change the NiFi-created template to a configuration file via the command-line MiniFi Toolkit.
minifi-toolkit-0.2.0/bin/config.sh
transform gpstensorflowpiminifi2.xml config.yml
scp config.yml pi@192.168.1.167:/opt/demo/minifi/conf/
./gpsrun.sh
{"ipaddress": "192.168.1.167", "utc": "2017-08-21T20:00:06.000Z", "epx": "10.301", "epv": "50.6", "serialno": "000000002a1f1e34", "altitude": "38.393", "cputemp": 58.0, "eps": "37.16", "longitude": "-74.52923472", "ts": "2017-08-21 20:00:03", "public_ip": "71.168.184.247", "track": "236.6413", "host": "vid5", "mode": "3", "time": "2017-08-21T20:00:06.000Z", "latitude": "40.268194845", "climb": "-0.054", "speed": "0.513", "ept": "0.005"}
2017-08-21 16:20:33,199 INFO [Timer-Driven Process Thread-6] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes: PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data
2017-08-21 16:20:34,261 INFO [Timer-Driven Process Thread-6] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi TensorFlowImage,targets=http://hw13125.local:8080/nifi]
Successfully sent [StandardFlowFileRecord[uuid=f84767ec-c627-4b63-9e88-bba1dfb4eb9b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1503346615133-2, container=default, section=2], offset=2198, length=441],offset=0,name=3460526041973,size=441]] (441 bytes) to http://HW13125.local:8080/nifi-api in 117 milliseconds at a rate of 3.65 KB/sec {"ipaddress": "192.168.1.167", "utc": "2017-08-21T20:17:21.010Z", "epx": "10.301", "epv": "50.6", "serialno": "000000002a1f1e34", "altitude": "43.009", "cputemp": 52.0, "eps": "1.33", "longitude": "-74.529242206", "ts": "2017-08-21 20:16:55", "public_ip": "71.168.184.247", "track": "190.894", "host": "vid5", "mode": "3", "time": "2017-08-21T20:17:21.010Z", "latitude": "40.268159632", "climb": "0.022", "speed": "0.353", "ept": "0.005"}
To collect our GPS information, below is my script called by MiniFi.
#! /usr/bin/python
import os
from gps import *
from time import *
import time
import threading
import json
import time
import colorsys
import os
import json
import sys, socket
import subprocess
import time
import datetime
from time import sleep
from time import gmtime, strftime
import signal
import time
import urllib2
# Need sudo apt-get install gpsd gpsd-clients python-gps ntp
# Based on
#Author: Callum Pritchard, Joachim Hummel
#Project Name: Flick 3D Gesture
#Project Description: Sending Flick 3D Gesture sensor data to mqtt
#Version Number: 0.1
#Date: 15/6/17
#Release State: Alpha testing
#Changes: Created
# Based on
# Written by Dan Mandle http://dan.mandle.me September 2012
# License: GPL 2.0
# Based on: https://hortonworks.com/tutorial/analyze-iot-weather-station-data-via-connected-data-architecture/section/3/
#### Initialization
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
external_IP_and_port = ('198.41.0.4', 53)
# a.root-servers.net
socket_family = socket.AF_INET
host = os.uname()[1]
def getCPUtemperature():
res = os.popen('vcgencmd measure_temp').readline()
return(res.replace("temp=","").replace("'C\n",""))
def IP_address():
try:
s = socket.socket(socket_family, socket.SOCK_DGRAM)
s.connect(external_IP_and_port)
answer = s.getsockname()
s.close()
return answer[0]
if answer else None
except socket.error:
return None
# Get Raspberry Pi Serial Number
def get_serial():
# Extract serial from cpuinfo file
cpuserial = "0000000000000000"
try:
f = open('/proc/cpuinfo','r')
for line in f: if line[0:6]=='Serial':
cpuserial = line[10:26] f.close()
except: cpuserial = "ERROR000000000"
return cpuserial
# Get Raspberry Pi Public IP via IPIFY Rest Call
def get_public_ip():
ip = json.load(urllib2.urlopen('https://api.ipify.org/?format=json'))['ip']
return ip
cpuTemp=int(float(getCPUtemperature()))
ipaddress = IP_address()
# Attempt to get Public IP
public_ip = get_public_ip()
# Attempt to get Raspberry Pi Serial Number
serial = get_serial()
gpsd = None
class GpsPoller(threading.Thread): def __init__(self): threading.Thread.__init__(self) global gpsd #bring it in scope gpsd = gps(mode=WATCH_ENABLE) #starting the stream of info self.current_value = None self.running = True #setting the thread running to true def run(self): global gpsd while gpsp.running: gpsd.next() #this will continue to loop and grab EACH set of gpsd info to clear the buffer if __name__ == '__main__': gpsp = GpsPoller() # create the thread stopthis = False try: gpsp.start() # start it up while not stopthis: if gpsd.fix.latitude > 0: row = { 'latitude': str(gpsd.fix.latitude), 'longitude': str(gpsd.fix.longitude), 'utc': str(gpsd.utc), 'time': str(gpsd.fix.time), 'altitude': str(gpsd.fix.altitude), 'eps': str(gpsd.fix.eps), 'epx': str(gpsd.fix.epx), 'epv': str(gpsd.fix.epv), 'ept': str(gpsd.fix.ept), 'speed': str(gpsd.fix.speed), 'climb': str(gpsd.fix.climb), 'track': str(gpsd.fix.track), 'ts': currenttime, 'public_ip': public_ip, 'serialno': serial, 'host': host, 'cputemp': round(cpuTemp,2), 'ipaddress': ipaddress, 'mode': str(gpsd.fix.mode)} json_string = json.dumps(row) print json_string gpsp.running = False stopthis = True except (KeyboardInterrupt, SystemExit): #when you press ctrl+c gpsp.running = False gpsp.join() # wait for the thread to finish what it's doing
And a link to it on GitHub.
Opinions expressed by DZone contributors are their own.
{{ parent.title || parent.header.title}}
{{ parent.tldr }}
{{ parent.linkDescription }}
{{ parent.urlSource.name }}