Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Spark and Apache NiFi Integration (Part 2 of 2)

DZone's Guide to

Apache Spark and Apache NiFi Integration (Part 2 of 2)

Let's finish off our journey of integrating Apache Spark and Apache NiFi to cover both data ingestion and running Apache Spark jobs.

· Big Data Zone ·
Free Resource

The open source HPCC Systems platform is a proven, easy to use solution for managing data at scale. Visit our Easy Guide to learn more about this completely free platform, test drive some code in the online Playground, and get started today.

We'll briefly start by going over our use case: ingesting energy data and running an Apache Spark job as part of the flow.

We will be using the new (in Apache NiFi 1.5/HDF 3.1) ExecuteSparkInteractive processor with the LivyController to accomplish that integration. As we mentioned in the first part of the article, it's pretty easy to set this up.

Since this is a modern Apache NiFi project, we use version control on our code:

On a local machine, I am talking to an electricity sensor over Wi-Fi in a Python script. This code is processed, cleaned, and sent to a cloud-hosted Apache NiFi instance via S2S over HTTP.

In the cloud, we receive the pushed messages.

Once we open the Spark It Up processor group, we have a flow to process the data.

Flow Overview

  • QueryRecord: Determine how to route based on query on streaming data. Converts JSON to Apache AVRO.

Path for all files:

  • UpdateAttribute: Set a schema
  • MergeContent: Do an Apache AVRO merge on our data to make bigger files.
  • ConvertAvroToORC: Build an Apache ORC file from merged Apache AVRO file.
  • PutHDFS: Store our Apache ORC file in an HDFS directory on our HDP 2.6.4 cluster.

Path for large voltage:

  • ExecuteSparkInteractive: Call our PySpark job
  • PutHDFS: Store the results to HDFS.

We could take all the metadata attributes and send them somewhere or store them as a JSON file.

We tested our PySpark program in Apache Zeppelin and then copy it to our processor.







Our ExecuteSparkInteractive Processor:

In our QueryProcessor, we send messages with large voltages to the Apache Spark executor to run a PySpark job to do some more processing.

Once we have submitted a job via Apache Livy, we are now able to see the job during and after execution with detailed Apache Livy UI screens and Spark screens. In the Apache Livy UI screen below, we can see the PySpark code executed and its output.

Apache Livy UI:

Apache Spark Jobs UI: Jobs

Apache Spark Jobs UI: SQL

Apache Spark Jobs UI: Executors

Apache Zeppelin SQL search of the data:


Hive/Spark SQL table DDL generated automagically by Apache NiFi:

Below is the source code related to this article:

PySpark code:

shdf = spark.read.json("hdfs://yourhdp264server:8020/spark2-history")

shdf.printSchema()

shdf.createOrReplaceTempView("sparklogs")

stuffdf = spark.sql("SELECT * FROM sparklogs")

stuffdf.count()


This is a pretty simple PySpark application to read the JSON results of Spark2 History, print a schema inferred from it, and then do a simple SELECT and count. We could do Spark machine learning or other processing in there very easily. You can run Python 2.x or 3.x for this with PySpark. I am running this in Apache Spark 2.2.0 hosted on an HDP 2.6.4 cluster running Centos 7.  The fun part is that every time I run this Spark job, it produces more results for it to read. I should probably just read that log in Apache NiFi, but it was a fun little example. Clearly, you can run any kind of job in here, my next article will be around running Apache MXNet and Spark MLib jobs through Apache Livy and Apache NiFi.

For a quick side note, you have a lot of options for working with schemas now:

Schema for energy data:

inferred.avro.schema
{ "type" : "record", "name" : "smartPlug", "fields" : [ { "name" : "day19", "type" : "double", "doc" : "Type inferred from '2.035'" }, { "name" : "day20", "type" : "double", "doc" : "Type inferred from '1.191'" }, { "name" : "day21", "type" : "double", "doc" : "Type inferred from '0.637'" }, { "name" : "day22", "type" : "double", "doc" : "Type inferred from '1.497'" }, { "name" : "day23", "type" : "double", "doc" : "Type inferred from '1.151'" }, { "name" : "day24", "type" : "double", "doc" : "Type inferred from '1.227'" }, { "name" : "day25", "type" : "double", "doc" : "Type inferred from '1.387'" }, { "name" : "day26", "type" : "double", "doc" : "Type inferred from '1.138'" }, { "name" : "day27", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day28", "type" : "double", "doc" : "Type inferred from '1.401'" }, { "name" : "day29", "type" : "double", "doc" : "Type inferred from '1.288'" }, { "name" : "day30", "type" : "double", "doc" : "Type inferred from '1.439'" }, { "name" : "day31", "type" : "double", "doc" : "Type inferred from '0.126'" }, { "name" : "day1", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day2", "type" : "double", "doc" : "Type inferred from '1.006'" }, { "name" : "day3", "type" : "double", "doc" : "Type inferred from '1.257'" }, { "name" : "day4", "type" : "double", "doc" : "Type inferred from '1.053'" }, { "name" : "day5", "type" : "double", "doc" : "Type inferred from '1.597'" }, { "name" : "day6", "type" : "double", "doc" : "Type inferred from '1.642'" }, { "name" : "day7", "type" : "double", "doc" : "Type inferred from '0.443'" }, { "name" : "day8", "type" : "double", "doc" : "Type inferred from '0.01'" }, { "name" : "day9", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day10", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day11", "type" : "double", "doc" : "Type inferred from '0.075'" }, { "name" : "day12", "type" : "double", "doc" : "Type inferred from '1.149'" }, { "name" : "day13", "type" : "double", "doc" : "Type inferred from '1.014'" }, { "name" : "day14", "type" : "double", "doc" : "Type inferred from '0.851'" }, { "name" : "day15", "type" : "double", "doc" : "Type inferred from '1.134'" }, { "name" : "day16", "type" : "double", "doc" : "Type inferred from '1.54'" }, { "name" : "day17", "type" : "double", "doc" : "Type inferred from '1.438'" }, { "name" : "day18", "type" : "double", "doc" : "Type inferred from '1.056'" }, { "name" : "sw_ver", "type" : "string", "doc" : "Type inferred from '\"1.1.1 Build 160725 Rel.164033\"'" }, { "name" : "hw_ver", "type" : "string", "doc" : "Type inferred from '\"1.0\"'" }, { "name" : "mac", "type" : "string", "doc" : "Type inferred from '\"50:C7:BF:B1:95:D5\"'" }, { "name" : "type", "type" : "string", "doc" : "Type inferred from '\"IOT.SMARTPLUGSWITCH\"'" }, { "name" : "hwId", "type" : "string", "doc" : "Type inferred from '\"60FF6B258734EA6880E186F8C96DDC61\"'" }, { "name" : "fwId", "type" : "string", "doc" : "Type inferred from '\"060BFEA28A8CD1E67146EB5B2B599CC8\"'" }, { "name" : "oemId", "type" : "string", "doc" : "Type inferred from '\"FFF22CFF774A0B89F7624BFC6F50D5DE\"'" }, { "name" : "dev_name", "type" : "string", "doc" : "Type inferred from '\"Wi-Fi Smart Plug With Energy Monitoring\"'" }, { "name" : "model", "type" : "string", "doc" : "Type inferred from '\"HS110(US)\"'" }, { "name" : "deviceId", "type" : "string", "doc" : "Type inferred from '\"8006ECB1D454C4428953CB2B34D9292D18A6DB0E\"'" }, { "name" : "alias", "type" : "string", "doc" : "Type inferred from '\"Tim Spann's MiniFi Controller SmartPlug - Desk1\"'" }, { "name" : "icon_hash", "type" : "string", "doc" : "Type inferred from '\"\"'" }, { "name" : "relay_state", "type" : "int", "doc" : "Type inferred from '1'" }, { "name" : "on_time", "type" : "int", "doc" : "Type inferred from '1995745'" }, { "name" : "active_mode", "type" : "string", "doc" : "Type inferred from '\"schedule\"'" }, { "name" : "feature", "type" : "string", "doc" : "Type inferred from '\"TIM:ENE\"'" }, { "name" : "updating", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-34'" }, { "name" : "led_off", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "latitude", "type" : "double", "doc" : "Type inferred from '40.268216'" }, { "name" : "longitude", "type" : "double", "doc" : "Type inferred from '-74.529088'" }, { "name" : "index", "type" : "int", "doc" : "Type inferred from '18'" }, { "name" : "zone_str", "type" : "string", "doc" : "Type inferred from '\"(UTC-05:00) Eastern Daylight Time (US & Canada)\"'" }, { "name" : "tz_str", "type" : "string", "doc" : "Type inferred from '\"EST5EDT,M3.2.0,M11.1.0\"'" }, { "name" : "dst_offset", "type" : "int", "doc" : "Type inferred from '60'" }, { "name" : "month1", "type" : "double", "doc" : "Type inferred from '32.674'" }, { "name" : "month2", "type" : "double", "doc" : "Type inferred from '8.202'" }, { "name" : "current", "type" : "double", "doc" : "Type inferred from '0.772548'" }, { "name" : "voltage", "type" : "double", "doc" : "Type inferred from '121.740428'" }, { "name" : "power", "type" : "double", "doc" : "Type inferred from '91.380606'" }, { "name" : "total", "type" : "double", "doc" : "Type inferred from '48.264'" }, { "name" : "time", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" }, { "name" : "ledon", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" } ] }


Python source (updated to include 31 days):

from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime


plug = SmartPlug("192.168.1.203")

row = { }

emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


emeterdaily = plug.get_emeter_daily(year=2018, month=1)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


emeterdaily = plug.get_emeter_daily(year=2018, month=2)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


hwinfo = plug.hw_info
for k, v in hwinfo.items():
     row["%s" % k] = v


sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items():
     row["%s" % k] = v


timezone = plug.timezone
for k, v in timezone.items():
     row["%s" % k] = v


emetermonthly =  plug.get_emeter_monthly(year=2018)
for k, v in emetermonthly.items():
     row["month%s" % k] = v


realtime = plug.get_emeter_realtime()
for k, v in realtime.items():
     row["%s" % k] = v


row['alias'] = plug.alias
row['time'] =  plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] =  plug.led
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)
print(json_string)


Example output:

{"text\/plain":"root\n |-- App Attempt ID: string (nullable = true)\n |-- App ID: string (nullable = true)\n |-- App Name: string (nullable = true)\n |-- Block Manager ID: struct (nullable = true)\n |    |-- Executor ID: string (nullable = true)\n |    |-- Host: string (nullable = true)\n |    |-- Port: long (nullable = true)\n |-- Classpath Entries: struct (nullable = true)\n |    |-- \/etc\/hadoop\/conf\/: string (nullable = true)\n |    |-- \/etc\/hadoop\/conf\/secure: string (nullable = true)\n |    |-- \/etc\/zeppelin\/conf\/external-dependency-conf\/: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_conf__: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/JavaEWAH-0.3.2.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/RoaringBitmap-0.5.11.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/ST4-4.0.4.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/activation-1.1.1.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/aircompressor-0.8.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-2.7.7.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-runtime-3.4.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr4-runtime-4.5.3.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/


Shell tip: Apache MXnet may have some warnings sent to STDERR. I don't want these, so send them to /dev/null:

python3 -W ignore analyze.py 2>/dev/null


Software

  • PySpark
  • Python
  • Apache NiFi
  • Apache Spark
  • HDF 3.1
  • HDP 2.6.4
  • Apache Hive
  • Apache Avro
  • Apache ORC
  • Apache Ambari
  • Apache Zeppelin

Managing data at scale doesn’t have to be hard. Find out how the completely free, open source HPCC Systems platform makes it easier to update, easier to program, easier to integrate data, and easier to manage clusters. Download and get started today.

Topics:
apache spark ,apache livy ,apache nifi ,integration ,pyspark ,big data ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}