Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using Apache MXNet GluonCV with Apache NiFi

DZone's Guide to

Using Apache MXNet GluonCV with Apache NiFi

Read this article in order to learn how to use Apache MXNet GluonCV with Apache NiFi.

· AI Zone ·
Free Resource

Did you know that 50- 80% of your enterprise business processes can be automated with AssistEdge?  Identify processes, deploy bots and scale effortlessly with AssistEdge.

Using Apache MXNet GluonCV with Apache NiFi for Deep Learning Computer Vision

Source: https://github.com/tspannhw/OpenSourceComputerVision/

Gluon and Apache MXNet have been great for deep learning, especially for newbies like me. It got even better! They added a Deep Learning Toolkit that is easy to use and has a number of great pre-trained models that you can easily use to do some general use cases around computer vision. So, I have used a simple well-documented example that I tweaked to save the final image and send some JSON details via MQTT to Apache NiFi. This may sound familiar: https://community.hortonworks.com/articles/198912/ingesting-apache-mxnet-gluon-deep-learning-results.html GluonCV makes this even easier! Let's check it out. Again, let's take a simple Python example, tweak it, run it via a shell script, and send the results over MQTT.

# Based on https://gluon-cv.mxnet.io/build/examples_detection/demo_ssd.html#sphx-glr-build-examples-detection-demo-ssd-py
from gluoncv import model_zoo, data, utils
from matplotlib import pyplot as plt
import numpy
import base64
import uuid
from mxnet import nd, gluon, init, autograd
from mxnet.gluon import nn
from mxnet.gluon.data.vision import datasets, transforms
import matplotlib.pyplot as plt
from time import time
from mxnet.gluon.model_zoo import vision as models
from mxnet.gluon.utils import download
from mxnet import image
import time
import sys
import datetime
import subprocess
import os
from PIL import Image
import datetime
import traceback
import math
import random, string
import base64
import json
from time import gmtime, strftime
import mxnet as mx
import inception_predict
import numpy as np
import cv2
import math
import random, string
import time
import numpy
import random, string
import time
import psutil
import paho.mqtt.client as mqtt
import scipy.misc
from time import gmtime, strftime
start = time.time()
cap = cv2.VideoCapture(1)   # 0 - laptop   #1 - monitor
ret, frame = cap.read()
uuid = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
filename = 'images/gluoncv_image_{0}.jpg'.format(uuid)
filename2 = 'images/gluoncv_image_processed_{0}.jpg'.format(uuid)
cv2.imwrite(filename, frame)

# model zoo for SSD 512 RESNET 50 v1 VOC
net = model_zoo.get_model('ssd_512_resnet50_v1_voc', pretrained=True)

#im_fname = utils.download('https://github.com/dmlc/web-data/blob/master/' +
#                          'gluoncv/detection/street_small.jpg?raw=true',
#                          path='street_small.jpg')

x, img = data.transforms.presets.ssd.load_test(filename, short=512)

end = time.time()
row = { }
row['imgname'] = filename
row['host'] = os.uname()[1]
row['shape'] = str(x.shape)
row['end'] = '{0}'.format( str(end ))
row['te'] = '{0}'.format(str(end-start))
row['battery'] = psutil.sensors_battery()[0]
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
row['cpu'] = psutil.cpu_percent(interval=1)
usage = psutil.disk_usage("/")
row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
row['memory'] = psutil.virtual_memory().percent
row['id'] = str(uuid)
json_string = json.dumps(row)
# print(json_string)

# MQTT
client = mqtt.Client()
client.username_pw_set("user","pass")
client.connect("server", 17769, 60)
client.publish("gluoncv", payload=json_string, qos=0, retain=True)

class_IDs, scores, bounding_boxs = net(x)

ax = utils.viz.plot_bbox(img, bounding_boxs[0], scores[0], class_IDs[0], class_names=net.classes)

plt.savefig(filename2)
# plt.show()

This is the Saved Annotated Figure

Simple Apache NiFi Flow to Ingest MQTT Data from GluonCV example Python and Store to Hive and Parquet and HBase.

A simple flow:

  1. ConsumeMQTT
  2. InferAvroSchema
  3. RouteOnContent
  4. MergeRecord (convert batches of JSON to single Avro)
  5. ConvertAvroToORC
  6. PutHDFS
  7. PutParquet
  8. PutHbaseRecord

Again, Apache NiFi generates a schema for us from data examination. There's a really cool project coming out of New Jersey that has advanced schema generation looking at tables. I'll report on that later. We take it, add, save to Schema Registry, and are ready to Merge Records. One thing you may want to add is to turn regular types from: "type": "string" to "type": ["string","null"].

Schema

{
 "type": "record",
 "name": "gluoncv",
 "fields": [
  {
   "name": "imgname",
   "type": "string",
   "doc": "Type inferred from '\"images/gluoncv_image_20180615203319_6e0e5f0b-d2aa-4e94-b7e9-8bb7f29c9512.jpg\"'"
  },
  {
   "name": "host",
   "type": "string",
   "doc": "Type inferred from '\"HW13125.local\"'"
  },
  {
   "name": "shape",
   "type": "string",
   "doc": "Type inferred from '\"(1, 3, 512, 910)\"'"
  },
  {
   "name": "end",
   "type": "string",
   "doc": "Type inferred from '\"1529094800.88097\"'"
  },
  {
   "name": "te",
   "type": "string",
   "doc": "Type inferred from '\"2.4256367683410645\"'"
  },
  {
   "name": "battery",
   "type": "int",
   "doc": "Type inferred from '100'"
  },
  {
   "name": "systemtime",
   "type": "string",
   "doc": "Type inferred from '\"06/15/2018 16:33:20\"'"
  },
  {
   "name": "cpu",
   "type": "double",
   "doc": "Type inferred from '23.2'"
  },
  {
   "name": "diskusage",
   "type": "string",
   "doc": "Type inferred from '\"112000.8 MB\"'"
  },
  {
   "name": "memory",
   "type": "double",
   "doc": "Type inferred from '65.8'"
  },
  {
   "name": "id",
   "type": "string",
   "doc": "Type inferred from '\"20180615203319_6e0e5f0b-d2aa-4e94-b7e9-8bb7f29c9512\"'"
  }
 ]
}

Example JSON

{"imgname": "images/gluoncv_image_20180615203615_c83fed6f-2ec8-4841-97e3-40985f7859ad.jpg", "host": "HW13125.local", "shape": "(1, 3, 512, 910)", "end": "1529094976.237143", "te": "1.8907802104949951", "battery": 100, "systemtime": "06/15/2018 16:36:16", "cpu": 29.3, "diskusage": "112008.6 MB", "memory": 66.5, "id": "20180615203615_c83fed6f-2ec8-4841-97e3-40985f7859ad"}

Table Generated

CREATE EXTERNAL TABLE IF NOT EXISTS gluoncv (imgname STRING, host STRING, shape STRING, end STRING, te STRING, battery INT, systemtime STRING, cpu DOUBLE, diskusage STRING, memory DOUBLE, id STRING)

STORED AS ORC

LOCATION "/gluoncv"

Parquet Table

Create external table gluoncv_parquet (imgname STRING, host STRING, shape STRING, end STRING, te STRING, battery INT, systemtime STRING, cpu DOUBLE, diskusage STRING, memory DOUBLE, id STRING)

STORED AS PARQUET

LOCATION "/gluoncvpar"

Consuming AI in byte sized applications is the best way to transform digitally. #BuiltOnAI, EdgeVerve’s business application, provides you with everything you need to plug & play AI into your enterprise.  Learn more.

Topics:
gluoncv ,apache mxnet ,deep learning ,apache nifi ,hortonworks ,big data ,apache hive ,apache parquet ,apache avro ,artificial intelligence

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}