Using Apache Spark and Apache NiFi to Run TensorFlow

DZone 's Guide to

Using Apache Spark and Apache NiFi to Run TensorFlow

Using Apache NiFi and Apache Livy to run TensorFlow jobs on Spark clusters.

· AI Zone ·
Free Resource

Executing TensorFlow Classifications From Apache NiFi Using Apache Spark 2.3 and Apache Livy


Apache Spark 2.3 + Apache Livy + Apache NiFi 1.5 + TensorFlow + Python

Python Code

TIP: In this version of Apache NiFi, you need to use double quotes (") instead of single quotes (') in your Python code.

Python Code for NiFi ExecuteSparkInteractive

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import errno as _errno
import sys as _sys

from tensorflow.python.platform import flags
from tensorflow.python.util.all_util import remove_undocumented
from tensorflow.python.util.tf_export import tf_export

import argparse
import os.path
import re
import sys
import tarfile
import os
import datetime
import math
import numpy as np
from six.moves import urllib
import tensorflow as tf

FLAGS = None

# pylint: disable=line-too-long
DATA_URL = "http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz"
# pylint: enable=line-too-long

class NodeLookup(object):

  def __init__(self,
    if not label_lookup_path:
      label_lookup_path = os.path.join(
          FLAGS.model_dir, "imagenet_2012_challenge_label_map_proto.pbtxt")
    if not uid_lookup_path:
      uid_lookup_path = os.path.join(
          FLAGS.model_dir, "imagenet_synset_to_human_label_map.txt")
    self.node_lookup = self.load(label_lookup_path, uid_lookup_path)

  def load(self, label_lookup_path, uid_lookup_path):
    if not tf.gfile.Exists(uid_lookup_path):
      tf.logging.fatal("File does not exist %s", uid_lookup_path)
    if not tf.gfile.Exists(label_lookup_path):
      tf.logging.fatal("File does not exist %s", label_lookup_path)

    proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
    uid_to_human = {}
    p = re.compile(r"[n\d]*[ \S,]*")
    for line in proto_as_ascii_lines:
      parsed_items = p.findall(line)
      uid = parsed_items[0]
      human_string = parsed_items[2]
      uid_to_human[uid] = human_string

    node_id_to_uid = {}
    proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
    for line in proto_as_ascii:
      if line.startswith("  target_class:"):
        target_class = int(line.split(": ")[1])
      if line.startswith("  target_class_string:"):
        target_class_string = line.split(": ")[1]
        node_id_to_uid[target_class] = target_class_string[1:-2]

    node_id_to_name = {}
    for key, val in node_id_to_uid.items():
      if val not in uid_to_human:
        tf.logging.fatal("Failed to locate: %s", val)
      name = uid_to_human[val]
      node_id_to_name[key] = name

    return node_id_to_name

  def id_to_string(self, node_id):
    if node_id not in self.node_lookup:
      return ""
    return self.node_lookup[node_id]

def create_graph():
  with tf.gfile.FastGFile(os.path.join(
      FLAGS.model_dir, "classify_image_graph_def.pb"), "rb") as f:
    graph_def = tf.GraphDef()
    _ = tf.import_graph_def(graph_def, name="")

def run_inference_on_image(image):
  if not tf.gfile.Exists(image):
    tf.logging.fatal("File does not exist %s", image)
  image_data = tf.gfile.FastGFile(image, "rb").read()


  with tf.Session() as sess:
    softmax_tensor = sess.graph.get_tensor_by_name("softmax:0")
    predictions = sess.run(softmax_tensor,
                           {"DecodeJpeg/contents:0": image_data})
    predictions = np.squeeze(predictions)
    node_lookup = NodeLookup()

    # sort the predictions in order of score
    top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]

    # output all of our human string and scores
    for node_id in top_k:
      # get the printable string for that id
      human_string = node_lookup.id_to_string(node_id)
      # get the prediction score for that id
      score = predictions[node_id]
      # let"s print it as a nice table in zeppelin
      print("{0}\t{1}\t{2}%\n".format(str(node_id), str(human_string),  float(score) * 100.0))

def maybe_download_and_extract():
  dest_directory = FLAGS.model_dir
  if not os.path.exists(dest_directory):
  filename = DATA_URL.split("/")[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
    statinfo = os.stat(filepath)
  tarfile.open(filepath, "r:gz").extractall(dest_directory)

def main(_):
  img_name = "/opt/demo/images/photo1.jpg"

parser = argparse.ArgumentParser()

      Path to classify_image_graph_def.pb,
      imagenet_synset_to_human_label_map.txt, and
      help="Absolute path to image file." )
      help="Display this many predictions.")
FLAGS, unparsed = parser.parse_known_args()

Simple Apache NiFi Flow To Execute TensorFlow Python Applications via Apache Livy

I am just using Apache Livy as the transport call from Apache NiFi to Apache Spark. My Apache Spark 2.3 cluster is not doing any Spark specific processing. PySpark is just running a vanilla TensorFlow python application in this version. We could also call TensorFlow on Spark code in this way. My goal was to run TensorFlow on my Spark cluster trigger from Apache NiFi and get back results.

Results Returned in Success From ExecuteSparkInteractive Call

  "text/plain" : "273\tracer, race car, racing car\t37.4601334333%\n\n274\tsports car, sport car\t25.3520905972%\n\n267\tcab, hack, taxi, taxicab\t11.1182622612%\n\n268\tconvertible\t9.85431224108%\n\n271\tminivan\t3.22951599956%"

Apache Livy UI Showing Results of Runs

This is the ExecuteSparkInteractive Processor. We can put the code in the Code property or pass it in.

Let's Configure a PySpark Apache Livy Controller.


There is a technical preview of LogSearch, which is great for finding issues in HDF components or HDP components. This is easier than searching logs, though I can easily write NiFi code to search logs as well.

ai artificial intelligence, apache hadoop, apache livy, apache nifi, apache spark, artificial intelligence, big data, deep learning, python, tensorflow

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}