Over a million developers have joined DZone.

Real-Time TensorFlow Camera Analysis With Sensors

DZone's Guide to

Real-Time TensorFlow Camera Analysis With Sensors

Check out a sample architecture and a MiniFi script designed to bring TensorFlow's analytical capabilities to a Raspberry Pi camera.

· IoT Zone ·
Free Resource

Today, we're going to use a Raspberry Pi camera, sensors, TensorFlow, and MiniFi to create a real-time camera analysis setup. Let's start with a look at the flow we'll be working with today. We'll touch on what it means below:

The MiniFi flow executes two scripts: one to call TensorFlow Python, which captures an OpenCV Raspberry Pi camera and runs Inception on it. That message is formatted as JSON and sent on. The second script reads GPS values from a USB GSP sensor and outputs JSON. GetFile reads the Pi camera image produced by the ClassifyImages process. CleanupLogs is a standalone timed script that cleans up old logs on the Raspberry Pi.

Using InferredAvroSchema, I created a schema for the GPS unit and stored it in the Hortonworks Schema Registry.

This is the provenance event for a typical GPS message sent. You can see what shell script we ran and from what host.

In Apache NiFi, we process the message, routing to the correct place, setting a schema, and querying it for a latitude. Then we convert the AVRO record to ORC to save as a Hive table.

MiniFi requires that we change the NiFi-created template to a configuration file via the command-line MiniFi Toolkit.

transform gpstensorflowpiminifi2.xml config.yml 
scp config.yml pi@     
{"ipaddress": "", "utc": "2017-08-21T20:00:06.000Z", "epx": "10.301", "epv": "50.6", "serialno": "000000002a1f1e34", "altitude": "38.393", "cputemp": 58.0, "eps": "37.16",     "longitude": "-74.52923472", "ts": "2017-08-21 20:00:03", "public_ip": "", "track": "236.6413", "host": "vid5", "mode": "3", "time": "2017-08-21T20:00:06.000Z",     "latitude": "40.268194845", "climb": "-0.054", "speed": "0.513", "ept": "0.005"}         
2017-08-21 16:20:33,199 INFO [Timer-Driven Process Thread-6] o.apache.nifi.remote.client.PeerSelector New Weighted Distribution of Nodes: PeerStatus[hostname=HW13125.local,port=8080,secure=false,flowFileCount=0] will receive 100.0% of data    
2017-08-21 16:20:34,261 INFO [Timer-Driven Process Thread-6] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=MiniFi TensorFlowImage,targets=http://hw13125.local:8080/nifi]     
Successfully sent [StandardFlowFileRecord[uuid=f84767ec-c627-4b63-9e88-bba1dfb4eb9b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1503346615133-2, container=default, section=2], offset=2198, length=441],offset=0,name=3460526041973,size=441]] (441 bytes) to http://HW13125.local:8080/nifi-api in 117 milliseconds at a rate of 3.65 KB/sec {"ipaddress": "", "utc": "2017-08-21T20:17:21.010Z", "epx": "10.301", "epv": "50.6", "serialno": "000000002a1f1e34",     "altitude": "43.009", "cputemp": 52.0, "eps": "1.33", "longitude": "-74.529242206", "ts": "2017-08-21 20:16:55", "public_ip": "",     "track": "190.894", "host": "vid5", "mode": "3", "time": "2017-08-21T20:17:21.010Z", "latitude": "40.268159632",     "climb": "0.022", "speed": "0.353", "ept": "0.005"}   

To collect our GPS information, below is my script called by MiniFi.

#! /usr/bin/python         
import os    
from gps import *   
from time import *    
import time    
import threading    
import json    
import time    
import colorsys    
import os    
import json    
import sys, socket    
import subprocess    
import time    
import datetime    
from time import sleep    
from time import gmtime, strftime    
import signal    
import time    
import urllib2         
# Need sudo apt-get install gpsd gpsd-clients python-gps ntp    
# Based on    
#Author: Callum Pritchard, Joachim Hummel    
#Project Name: Flick 3D Gesture    
#Project Description: Sending Flick 3D Gesture sensor data to mqtt    
#Version Number: 0.1    
#Date: 15/6/17    
#Release State: Alpha testing    
#Changes: Created    
# Based on    
# Written by Dan Mandle http://dan.mandle.me September 2012    
# License: GPL 2.0         
# Based on:  https://hortonworks.com/tutorial/analyze-iot-weather-station-data-via-connected-data-architecture/section/3/         
#### Initialization    
# yyyy-mm-dd hh:mm:ss    
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())         
external_IP_and_port = ('', 53)  
# a.root-servers.net    
socket_family = socket.AF_INET         
host = os.uname()[1]        
def getCPUtemperature():       
  res = os.popen('vcgencmd measure_temp').readline()        

def IP_address():           
    s = socket.socket(socket_family, socket.SOCK_DGRAM)                
    answer = s.getsockname()                
    return answer[0] 
  if answer else None            
  except socket.error:                
    return None         
  # Get Raspberry Pi Serial Number    
  def get_serial():      
    # Extract serial from cpuinfo file     
    cpuserial = "0000000000000000"      
      f = open('/proc/cpuinfo','r')       
      for line in f:          if line[0:6]=='Serial':           
          cpuserial = line[10:26]        f.close()      
          except:        cpuserial = "ERROR000000000"           
            return cpuserial         
          # Get Raspberry Pi Public IP via IPIFY Rest Call    
def get_public_ip():      
  ip = json.load(urllib2.urlopen('https://api.ipify.org/?format=json'))['ip']  
  return ip         

ipaddress = IP_address()    
# Attempt to get Public IP    
public_ip = get_public_ip()         
# Attempt to get Raspberry Pi Serial Number    
serial = get_serial()         
gpsd = None        

class GpsPoller(threading.Thread):      def __init__(self):        threading.Thread.__init__(self)        global gpsd #bring it in scope        gpsd = gps(mode=WATCH_ENABLE) #starting the stream of info        self.current_value = None        self.running = True #setting the thread running to true           def run(self):        global gpsd        while gpsp.running:          gpsd.next() #this will continue to loop and grab EACH set of gpsd info to clear the buffer         if __name__ == '__main__':      gpsp = GpsPoller() # create the thread      stopthis = False      try:        gpsp.start() # start it up        while not stopthis:          if gpsd.fix.latitude > 0:            row = { 'latitude': str(gpsd.fix.latitude),             'longitude': str(gpsd.fix.longitude),             'utc': str(gpsd.utc),             'time':   str(gpsd.fix.time),             'altitude': str(gpsd.fix.altitude),             'eps': str(gpsd.fix.eps),             'epx': str(gpsd.fix.epx),             'epv': str(gpsd.fix.epv),             'ept': str(gpsd.fix.ept),             'speed': str(gpsd.fix.speed),             'climb': str(gpsd.fix.climb),             'track': str(gpsd.fix.track),             'ts': currenttime,             'public_ip': public_ip,             'serialno': serial,             'host': host,             'cputemp': round(cpuTemp,2),             'ipaddress': ipaddress,             'mode': str(gpsd.fix.mode)}            json_string = json.dumps(row)            print json_string    gpsp.running = False            stopthis = True           except (KeyboardInterrupt, SystemExit): #when you press ctrl+c        gpsp.running = False        gpsp.join() # wait for the thread to finish what it's doing  

And a link to it on GitHub.

tensorflow ,minifi ,nifi ,deep learning ,raspberry pi ,sensor networks ,tutorial ,iot

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}