Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Scala Spark Integration With Apache NiFi

DZone's Guide to

Scala Spark Integration With Apache NiFi

You don't want all of your Scala code in a continuous block like Apache Zeppelin, so see how to execute Scala Apache Spark code in JARs from Apache NiFi.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Learn how to execute Scala Apache Spark code in JARs from Apache NiFi — because you don't want all of your Scala code in a continuous block like Apache Zeppelin. We're going to use Apache NiFi, Apache Livy, Apache Spark, and Scala.

Flows:

Here's the inline Scala code:

Apache Zeppelin running the Same Scala Job (have to add the JAR to the interpreter for Spark and restart):

Grafana Charts of Apache NiFi Run:

Log search helps you find errors:

Run code for your Spark class:

Setting up your ExecuteSparkInteractive processor:

Setting up your Spark service for Scala:

Tracking the job in Livy UI:

Tracking the job in Spark UI:

I was looking at pulling code from Git and putting it into a NiFi attribute and running directly. For bigger projects, you'll have many classes and dependencies that may require a full IDE and SBT build cycle. Once I build a Scala JAR, I want to run against that.

Example code:

package com.dataflowdeveloper.example
import org.apache.spark.sql.SparkSession
class Example () {
  def run( spark: SparkSession) {
      try {
        println("Started")
        val shdf = spark.read.json("hdfs://princeton0.field.hortonworks.com:8020/smartPlugRaw")
      shdf.printSchema()
      shdf.createOrReplaceTempView("smartplug")
      val stuffdf = spark.sql("SELECT * FROM smartplug")
      stuffdf.count()
        println("Complete.")
      } catch {
        case e: Exception =>
          e.printStackTrace();
      }
  }
}


=--- Run that with


import com.dataflowdeveloper.example.Example
println("Before run")
val job = new Example()
job.run(spark)
println("After run")


=== after run
{"text\/plain":"After run"}

Here's the GitHub link.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,tutorial ,scala ,apache spark ,apache nifi

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}