DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Using Apache Spark and Apache NiFi to Run TensorFlow

Using Apache Spark and Apache NiFi to Run TensorFlow

Using Apache NiFi and Apache Livy to run TensorFlow jobs on Spark clusters.

Tim Spann user avatar by
Tim Spann
CORE ·
Jun. 18, 18 · Tutorial
Like (2)
Save
Tweet
Share
6.33K Views

Join the DZone community and get the full member experience.

Join For Free

Executing TensorFlow Classifications From Apache NiFi Using Apache Spark 2.3 and Apache Livy

Technology:

Apache Spark 2.3 + Apache Livy + Apache NiFi 1.5 + TensorFlow + Python

Python Code

TIP: In this version of Apache NiFi, you need to use double quotes (") instead of single quotes (') in your Python code.

Python Code for NiFi ExecuteSparkInteractive

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import errno as _errno
import sys as _sys

from tensorflow.python.platform import flags
from tensorflow.python.util.all_util import remove_undocumented
from tensorflow.python.util.tf_export import tf_export

import argparse
import os.path
import re
import sys
import tarfile
import os
import datetime
import math
import numpy as np
from six.moves import urllib
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)

FLAGS = None

# pylint: disable=line-too-long
DATA_URL = "http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz"
# pylint: enable=line-too-long

class NodeLookup(object):

  def __init__(self,
               label_lookup_path=None,
               uid_lookup_path=None):
    if not label_lookup_path:
      label_lookup_path = os.path.join(
          FLAGS.model_dir, "imagenet_2012_challenge_label_map_proto.pbtxt")
    if not uid_lookup_path:
      uid_lookup_path = os.path.join(
          FLAGS.model_dir, "imagenet_synset_to_human_label_map.txt")
    self.node_lookup = self.load(label_lookup_path, uid_lookup_path)

  def load(self, label_lookup_path, uid_lookup_path):
    if not tf.gfile.Exists(uid_lookup_path):
      tf.logging.fatal("File does not exist %s", uid_lookup_path)
    if not tf.gfile.Exists(label_lookup_path):
      tf.logging.fatal("File does not exist %s", label_lookup_path)

    proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
    uid_to_human = {}
    p = re.compile(r"[n\d]*[ \S,]*")
    for line in proto_as_ascii_lines:
      parsed_items = p.findall(line)
      uid = parsed_items[0]
      human_string = parsed_items[2]
      uid_to_human[uid] = human_string

    node_id_to_uid = {}
    proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
    for line in proto_as_ascii:
      if line.startswith("  target_class:"):
        target_class = int(line.split(": ")[1])
      if line.startswith("  target_class_string:"):
        target_class_string = line.split(": ")[1]
        node_id_to_uid[target_class] = target_class_string[1:-2]

    node_id_to_name = {}
    for key, val in node_id_to_uid.items():
      if val not in uid_to_human:
        tf.logging.fatal("Failed to locate: %s", val)
      name = uid_to_human[val]
      node_id_to_name[key] = name

    return node_id_to_name

  def id_to_string(self, node_id):
    if node_id not in self.node_lookup:
      return ""
    return self.node_lookup[node_id]

def create_graph():
  with tf.gfile.FastGFile(os.path.join(
      FLAGS.model_dir, "classify_image_graph_def.pb"), "rb") as f:
    graph_def = tf.GraphDef()
    graph_def.ParseFromString(f.read())
    _ = tf.import_graph_def(graph_def, name="")


def run_inference_on_image(image):
  if not tf.gfile.Exists(image):
    tf.logging.fatal("File does not exist %s", image)
  image_data = tf.gfile.FastGFile(image, "rb").read()

  create_graph()

  with tf.Session() as sess:
    softmax_tensor = sess.graph.get_tensor_by_name("softmax:0")
    predictions = sess.run(softmax_tensor,
                           {"DecodeJpeg/contents:0": image_data})
    predictions = np.squeeze(predictions)
    node_lookup = NodeLookup()

    # sort the predictions in order of score
    top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]

    # output all of our human string and scores
    for node_id in top_k:
      # get the printable string for that id
      human_string = node_lookup.id_to_string(node_id)
      # get the prediction score for that id
      score = predictions[node_id]
      # let"s print it as a nice table in zeppelin
      print("{0}\t{1}\t{2}%\n".format(str(node_id), str(human_string),  float(score) * 100.0))

def maybe_download_and_extract():
  dest_directory = FLAGS.model_dir
  if not os.path.exists(dest_directory):
    os.makedirs(dest_directory)
  filename = DATA_URL.split("/")[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
        pass
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
    statinfo = os.stat(filepath)
  tarfile.open(filepath, "r:gz").extractall(dest_directory)

def main(_):
  maybe_download_and_extract()
  img_name = "/opt/demo/images/photo1.jpg"
  run_inference_on_image(img_name)

parser = argparse.ArgumentParser()

parser.add_argument(
      "--model_dir",
      type=str,
      default="/opt/demo/imagenet",
      help="""\
      Path to classify_image_graph_def.pb,
      imagenet_synset_to_human_label_map.txt, and
      imagenet_2012_challenge_label_map_proto.pbtxt.\
      """)
parser.add_argument(
      "--image_file",
      type=str,
      default="",
      help="Absolute path to image file." )
parser.add_argument(
      "--num_top_predictions",
      type=int,
      default=5,
      help="Display this many predictions.")
FLAGS, unparsed = parser.parse_known_args()
main(FLAGS)

Simple Apache NiFi Flow To Execute TensorFlow Python Applications via Apache Livy

I am just using Apache Livy as the transport call from Apache NiFi to Apache Spark. My Apache Spark 2.3 cluster is not doing any Spark specific processing. PySpark is just running a vanilla TensorFlow python application in this version. We could also call TensorFlow on Spark code in this way. My goal was to run TensorFlow on my Spark cluster trigger from Apache NiFi and get back results.

Results Returned in Success From ExecuteSparkInteractive Call

{
  "text/plain" : "273\tracer, race car, racing car\t37.4601334333%\n\n274\tsports car, sport car\t25.3520905972%\n\n267\tcab, hack, taxi, taxicab\t11.1182622612%\n\n268\tconvertible\t9.85431224108%\n\n271\tminivan\t3.22951599956%"
}

Apache Livy UI Showing Results of Runs

This is the ExecuteSparkInteractive Processor. We can put the code in the Code property or pass it in.

Let's Configure a PySpark Apache Livy Controller.

LogSearch

There is a technical preview of LogSearch, which is great for finding issues in HDF components or HDP components. This is easier than searching logs, though I can easily write NiFi code to search logs as well.

Apache NiFi Apache Spark TensorFlow

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Connecting Your Devs' Work to the Business
  • Silver Bullet or False Panacea? 3 Questions for Data Contracts
  • Data Mesh vs. Data Fabric: A Tale of Two New Data Paradigms
  • Apache Kafka Introduction, Installation, and Implementation Using .NET Core 6

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: