DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • DZone Community Awards 2022
  • Augmented Analytics With PySpark and Sentiment Analysis
  • Optimizing Integration Workflows With Spark Structured Streaming and Cloud Services
  • XAI: Making ML Models Transparent for Smarter Hiring Decisions

Trending

  • Rust, WASM, and Edge: Next-Level Performance
  • Monolith: The Good, The Bad and The Ugly
  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems
  • How to Create a Successful API Ecosystem
  1. DZone
  2. Data Engineering
  3. Data
  4. Apache Spark and Apache NiFi Integration (Part 2 of 2)

Apache Spark and Apache NiFi Integration (Part 2 of 2)

Let's finish off our journey of integrating Apache Spark and Apache NiFi to cover both data ingestion and running Apache Spark jobs.

By 
Tim Spann user avatar
Tim Spann
DZone Core CORE ·
Feb. 15, 18 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
29.8K Views

Join the DZone community and get the full member experience.

Join For Free

We'll briefly start by going over our use case: ingesting energy data and running an Apache Spark job as part of the flow.

We will be using the new (in Apache NiFi 1.5/HDF 3.1) ExecuteSparkInteractive processor with the LivyController to accomplish that integration. As we mentioned in the first part of the article, it's pretty easy to set this up.

Since this is a modern Apache NiFi project, we use version control on our code:

On a local machine, I am talking to an electricity sensor over Wi-Fi in a Python script. This code is processed, cleaned, and sent to a cloud-hosted Apache NiFi instance via S2S over HTTP.

In the cloud, we receive the pushed messages.

Once we open the Spark It Up processor group, we have a flow to process the data.

Flow Overview

  • QueryRecord: Determine how to route based on query on streaming data. Converts JSON to Apache AVRO.

Path for all files:

  • UpdateAttribute: Set a schema
  • MergeContent: Do an Apache AVRO merge on our data to make bigger files.
  • ConvertAvroToORC: Build an Apache ORC file from merged Apache AVRO file.
  • PutHDFS: Store our Apache ORC file in an HDFS directory on our HDP 2.6.4 cluster.

Path for large voltage:

  • ExecuteSparkInteractive: Call our PySpark job
  • PutHDFS: Store the results to HDFS.

We could take all the metadata attributes and send them somewhere or store them as a JSON file.

We tested our PySpark program in Apache Zeppelin and then copy it to our processor.







Our ExecuteSparkInteractive Processor:

In our QueryProcessor, we send messages with large voltages to the Apache Spark executor to run a PySpark job to do some more processing.

Once we have submitted a job via Apache Livy, we are now able to see the job during and after execution with detailed Apache Livy UI screens and Spark screens. In the Apache Livy UI screen below, we can see the PySpark code executed and its output.

Apache Livy UI:

Apache Spark Jobs UI: Jobs

Apache Spark Jobs UI: SQL

Apache Spark Jobs UI: Executors

Apache Zeppelin SQL search of the data:


Hive/Spark SQL table DDL generated automagically by Apache NiFi:

Below is the source code related to this article:

  • Source code

PySpark code:

shdf = spark.read.json("hdfs://yourhdp264server:8020/spark2-history")

shdf.printSchema()

shdf.createOrReplaceTempView("sparklogs")

stuffdf = spark.sql("SELECT * FROM sparklogs")

stuffdf.count()


This is a pretty simple PySpark application to read the JSON results of Spark2 History, print a schema inferred from it, and then do a simple SELECT and count. We could do Spark machine learning or other processing in there very easily. You can run Python 2.x or 3.x for this with PySpark. I am running this in Apache Spark 2.2.0 hosted on an HDP 2.6.4 cluster running Centos 7.  The fun part is that every time I run this Spark job, it produces more results for it to read. I should probably just read that log in Apache NiFi, but it was a fun little example. Clearly, you can run any kind of job in here, my next article will be around running Apache MXNet and Spark MLib jobs through Apache Livy and Apache NiFi.

For a quick side note, you have a lot of options for working with schemas now:

Schema for energy data:

inferred.avro.schema
{ "type" : "record", "name" : "smartPlug", "fields" : [ { "name" : "day19", "type" : "double", "doc" : "Type inferred from '2.035'" }, { "name" : "day20", "type" : "double", "doc" : "Type inferred from '1.191'" }, { "name" : "day21", "type" : "double", "doc" : "Type inferred from '0.637'" }, { "name" : "day22", "type" : "double", "doc" : "Type inferred from '1.497'" }, { "name" : "day23", "type" : "double", "doc" : "Type inferred from '1.151'" }, { "name" : "day24", "type" : "double", "doc" : "Type inferred from '1.227'" }, { "name" : "day25", "type" : "double", "doc" : "Type inferred from '1.387'" }, { "name" : "day26", "type" : "double", "doc" : "Type inferred from '1.138'" }, { "name" : "day27", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day28", "type" : "double", "doc" : "Type inferred from '1.401'" }, { "name" : "day29", "type" : "double", "doc" : "Type inferred from '1.288'" }, { "name" : "day30", "type" : "double", "doc" : "Type inferred from '1.439'" }, { "name" : "day31", "type" : "double", "doc" : "Type inferred from '0.126'" }, { "name" : "day1", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day2", "type" : "double", "doc" : "Type inferred from '1.006'" }, { "name" : "day3", "type" : "double", "doc" : "Type inferred from '1.257'" }, { "name" : "day4", "type" : "double", "doc" : "Type inferred from '1.053'" }, { "name" : "day5", "type" : "double", "doc" : "Type inferred from '1.597'" }, { "name" : "day6", "type" : "double", "doc" : "Type inferred from '1.642'" }, { "name" : "day7", "type" : "double", "doc" : "Type inferred from '0.443'" }, { "name" : "day8", "type" : "double", "doc" : "Type inferred from '0.01'" }, { "name" : "day9", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day10", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day11", "type" : "double", "doc" : "Type inferred from '0.075'" }, { "name" : "day12", "type" : "double", "doc" : "Type inferred from '1.149'" }, { "name" : "day13", "type" : "double", "doc" : "Type inferred from '1.014'" }, { "name" : "day14", "type" : "double", "doc" : "Type inferred from '0.851'" }, { "name" : "day15", "type" : "double", "doc" : "Type inferred from '1.134'" }, { "name" : "day16", "type" : "double", "doc" : "Type inferred from '1.54'" }, { "name" : "day17", "type" : "double", "doc" : "Type inferred from '1.438'" }, { "name" : "day18", "type" : "double", "doc" : "Type inferred from '1.056'" }, { "name" : "sw_ver", "type" : "string", "doc" : "Type inferred from '\"1.1.1 Build 160725 Rel.164033\"'" }, { "name" : "hw_ver", "type" : "string", "doc" : "Type inferred from '\"1.0\"'" }, { "name" : "mac", "type" : "string", "doc" : "Type inferred from '\"50:C7:BF:B1:95:D5\"'" }, { "name" : "type", "type" : "string", "doc" : "Type inferred from '\"IOT.SMARTPLUGSWITCH\"'" }, { "name" : "hwId", "type" : "string", "doc" : "Type inferred from '\"60FF6B258734EA6880E186F8C96DDC61\"'" }, { "name" : "fwId", "type" : "string", "doc" : "Type inferred from '\"060BFEA28A8CD1E67146EB5B2B599CC8\"'" }, { "name" : "oemId", "type" : "string", "doc" : "Type inferred from '\"FFF22CFF774A0B89F7624BFC6F50D5DE\"'" }, { "name" : "dev_name", "type" : "string", "doc" : "Type inferred from '\"Wi-Fi Smart Plug With Energy Monitoring\"'" }, { "name" : "model", "type" : "string", "doc" : "Type inferred from '\"HS110(US)\"'" }, { "name" : "deviceId", "type" : "string", "doc" : "Type inferred from '\"8006ECB1D454C4428953CB2B34D9292D18A6DB0E\"'" }, { "name" : "alias", "type" : "string", "doc" : "Type inferred from '\"Tim Spann's MiniFi Controller SmartPlug - Desk1\"'" }, { "name" : "icon_hash", "type" : "string", "doc" : "Type inferred from '\"\"'" }, { "name" : "relay_state", "type" : "int", "doc" : "Type inferred from '1'" }, { "name" : "on_time", "type" : "int", "doc" : "Type inferred from '1995745'" }, { "name" : "active_mode", "type" : "string", "doc" : "Type inferred from '\"schedule\"'" }, { "name" : "feature", "type" : "string", "doc" : "Type inferred from '\"TIM:ENE\"'" }, { "name" : "updating", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-34'" }, { "name" : "led_off", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "latitude", "type" : "double", "doc" : "Type inferred from '40.268216'" }, { "name" : "longitude", "type" : "double", "doc" : "Type inferred from '-74.529088'" }, { "name" : "index", "type" : "int", "doc" : "Type inferred from '18'" }, { "name" : "zone_str", "type" : "string", "doc" : "Type inferred from '\"(UTC-05:00) Eastern Daylight Time (US & Canada)\"'" }, { "name" : "tz_str", "type" : "string", "doc" : "Type inferred from '\"EST5EDT,M3.2.0,M11.1.0\"'" }, { "name" : "dst_offset", "type" : "int", "doc" : "Type inferred from '60'" }, { "name" : "month1", "type" : "double", "doc" : "Type inferred from '32.674'" }, { "name" : "month2", "type" : "double", "doc" : "Type inferred from '8.202'" }, { "name" : "current", "type" : "double", "doc" : "Type inferred from '0.772548'" }, { "name" : "voltage", "type" : "double", "doc" : "Type inferred from '121.740428'" }, { "name" : "power", "type" : "double", "doc" : "Type inferred from '91.380606'" }, { "name" : "total", "type" : "double", "doc" : "Type inferred from '48.264'" }, { "name" : "time", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" }, { "name" : "ledon", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" } ] }


Python source (updated to include 31 days):

from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime


plug = SmartPlug("192.168.1.203")

row = { }

emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


emeterdaily = plug.get_emeter_daily(year=2018, month=1)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


emeterdaily = plug.get_emeter_daily(year=2018, month=2)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


hwinfo = plug.hw_info
for k, v in hwinfo.items():
     row["%s" % k] = v


sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items():
     row["%s" % k] = v


timezone = plug.timezone
for k, v in timezone.items():
     row["%s" % k] = v


emetermonthly =  plug.get_emeter_monthly(year=2018)
for k, v in emetermonthly.items():
     row["month%s" % k] = v


realtime = plug.get_emeter_realtime()
for k, v in realtime.items():
     row["%s" % k] = v


row['alias'] = plug.alias
row['time'] =  plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] =  plug.led
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)
print(json_string)


Example output:

{"text\/plain":"root\n |-- App Attempt ID: string (nullable = true)\n |-- App ID: string (nullable = true)\n |-- App Name: string (nullable = true)\n |-- Block Manager ID: struct (nullable = true)\n |    |-- Executor ID: string (nullable = true)\n |    |-- Host: string (nullable = true)\n |    |-- Port: long (nullable = true)\n |-- Classpath Entries: struct (nullable = true)\n |    |-- \/etc\/hadoop\/conf\/: string (nullable = true)\n |    |-- \/etc\/hadoop\/conf\/secure: string (nullable = true)\n |    |-- \/etc\/zeppelin\/conf\/external-dependency-conf\/: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_conf__: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/JavaEWAH-0.3.2.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/RoaringBitmap-0.5.11.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/ST4-4.0.4.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/activation-1.1.1.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/aircompressor-0.8.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-2.7.7.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-runtime-3.4.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr4-runtime-4.5.3.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/


Shell tip: Apache MXnet may have some warnings sent to STDERR. I don't want these, so send them to /dev/null:

python3 -W ignore analyze.py 2>/dev/null


Software

  • PySpark
  • Python
  • Apache NiFi
  • Apache Spark
  • HDF 3.1
  • HDP 2.6.4
  • Apache Hive
  • Apache Avro
  • Apache ORC
  • Apache Ambari
  • Apache Zeppelin
Apache Spark Apache NiFi avro Machine learning career Integration Apache ORC pyspark

Opinions expressed by DZone contributors are their own.

Related

  • DZone Community Awards 2022
  • Augmented Analytics With PySpark and Sentiment Analysis
  • Optimizing Integration Workflows With Spark Structured Streaming and Cloud Services
  • XAI: Making ML Models Transparent for Smarter Hiring Decisions

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!