Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Spark Structured Streaming Integration With Apache NiFi 1.5: Scala Edition

DZone's Guide to

Apache Spark Structured Streaming Integration With Apache NiFi 1.5: Scala Edition

Learn how to easily integrate Apache Spark structured streaming with Apache NiFI 1.5 by using Scala code.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Apache Spark 2.2.0 with Scala 2.11.8 with Java 1.8.0_112 on HDP 2.6.4 called from HDF 3.1 with Apache NiFi 1.5:

This is a follow up to this article.

We are using the same Apache NiFi flow to send messages to Apache Kafka. What is nice is that you could have the structured streaming version, non-structured version, and others listening to the same topic and same messages sent by Apache NiFi.

When we start, we have no data.

We quickly get a ton of data:

By default, a Kafka cluster is 3 nodes. A replication factor of 3 is good, then. I have one node. I had to change this. There were tons of warnings in the /usr/hdf/current/kafka-broker/logs directory.

The simplest Apache Spark client is one run in the shell:

/usr/hdp/current/spark2-client/bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0

The code is a simple fork from this excellent highly recommended tutorial. If you are submitting this job and not running in a shell, add //. In the end, stop the streaming query with sq.awaitTermination.

val records = spark.
  readStream.
  format("kafka").
  option("subscribe", "smartPlug2").
  option("kafka.bootstrap.servers", "mykafkabroker:6667").load

records.printSchema

val result = records.
  select(
    $"key" cast "string",   
    $"value" cast "string",
    $"topic",
    $"partition",
    $"offset")

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = result.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Append).
  queryName("scalastrstrclient").
  start

sq.status

Example run:

Spark context Web UI available at http://myipiscool:4045
Spark context available as 'sc' (master = local[*], app id = local-1519248053841).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.2.6.4.0-91
      /_/


Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val records = spark.
     |   readStream.
     |   format("kafka"). 
     |   option("subscribe", "smartPlug2").
     |   option("kafka.bootstrap.servers", "server:6667").load
records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]

scala> records.printSchema
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

scala> val result = records.
     |   select(
     |     $"key" cast "string",  
     |     $"value" cast "string", 
     |     $"topic",
     |     $"partition",
     |     $"offset")
result: org.apache.spark.sql.DataFrame = [key: string, value: string ... 3 more fields]

scala> import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}


scala> import scala.concurrent.duration._
import scala.concurrent.duration._

scala> val sq = result.
     |   writeStream.
     |   format("console").
     |   option("truncate", false).
     |   trigger(Trigger.ProcessingTime(10.seconds)).
     |   outputMode(OutputMode.Append).
     |   queryName("scalastrstrclient").
     |   start
sq: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3638a852

scala> sq.status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Getting offsets from KafkaSource[Subscribe[smartPlug2]]",
  "isDataAvailable" : false,
  "isTriggerActive" : true
}

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+
|key|value|topic|partition|offset|
+---+-----+-----+---------+------+
+---+-----+-----+---------+------+


-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+
|key                |value                                                                                                                                                                          |topic     |partition|offset|
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+
|02/21/2018 16:22:00|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.333,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:BF:B1:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"7777","fwId":"777","oemId":"FFF22CFF774A0B89F7624BFC6F50D5DE","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"777","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452287,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41,"longitude":-77,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.888908,"voltage":118.880856,"power":103.141828,"total":8.19,"time":"02/21/2018 16:22:00","ledon":true,"systemtime":"02/21/2018 16:22:00"}|smartPlug2|0        |14    |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+

Example JSON data:

|02/21/2018 16:23:58|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.337,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:88:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"8888","fwId":"6767","oemId":"6767","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"7676","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452404,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41.3241234,"longitude":-74.1234234,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.932932,"voltage":118.890282,"power":107.826982,"total":8.194,"time":"02/21/2018 16:23:58","ledon":true,"systemtime":"02/21/2018 16:23:58"}|smartPlug2|0        |24

You can learn more here.

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Topics:
scala ,apache spark ,apache nifi ,apache kafka ,big data ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}