Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

HDP 2.6.4-HDF 3.1: Apache Spark Streaming Integration With Apache NiFi 1.5

DZone's Guide to

HDP 2.6.4-HDF 3.1: Apache Spark Streaming Integration With Apache NiFi 1.5

Let's see HDP, HDF, Apache Spark, Apache NiFi, and Python all work together to create a simple, robust data flow.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

I push my power data from a local Apache NiFi 1.5 server over Site-to-Site HTTP to a cloud-hosted HDF 3.1 cluster. This cluster has a Remote Input that passes the information on to a version-controlled Process Group called "Spark-Kafka-Streaming". Once inside, I set a schema name and data type then push the data to Kafka 1.0 hosted in HDF 3.1.

The PublishKafkaRecord_1.0 settings are super easy. We use the JsonTreeReader and the supplied schema to read the JSON file into records. I chose to use the JsonRecordSetWriter to push JSON out. I could have easily done Apache Avro or CSV or another format. I chose JSON, as it is easy to work with in Apache Spark and good for a debug display.

This method and code will work for several versions forward, but I cannot confirm for previous versions.

This article is how to connect Apache NiFi with Apache Spark via Kafka using Spark Streaming. The example code is in PySpark.

I run the streaming Spark code two different ways for testing.

The first way is via Apache Zeppelin, you will need to load the Apache Spark Kafka Streaming package to Apache Zeppelin

To add Kafka Streaming Support, we just add a dependency to the spark2 interpreter and restart the interpreter with the restart button. No need to restart Apache Zeppelin or a server.

The other way I run this is as a Spark Submit with YARN Master in Cluster mode. As you see here I also include the Spark Streaming Kafka Package.

/usr/hdp/current/spark2-client/bin/spark-submit  --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 kafka_test.py


My example PySpark program is really basic but shows you the integration. This is forked from the standard Spark example.

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext(appName="kafkaTest")
ssc = StreamingContext(sc,5)

print "Connected to spark streaming"

def process(time, rdd):
    print("========= %s =========" % str(time))
    if not rdd.isEmpty():
        rdd.count()
        rdd.first()

ssc = StreamingContext(sc, 5)
kafkaStream = KafkaUtils.createStream(ssc, "server:2181", "pysparkclient1", {"smartPlug": 1})
kafkaStream.pprint()
kafkaStream.foreachRDD(process)

ssc.start()
ssc.awaitTermination()


This program runs every 5 seconds and grabs the Kafka JSON message as an RDD, if it's not empty, I run a count and get the first row.

You can see the application running in Apache YARN UI.

From Apache Ambari, we can monitor the data moving through the Kafka Broker topics.

We can also monitor the Spark job via the URL supplied in the output of the submit.

We can see the STDOUT of the submitted Spark job here in the YARN logs.

Example PySpark Run

root@princeton0 demo]# ./submit.sh
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.6.4.0-91/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.2.0 in central
    found org.apache.kafka#kafka_2.11;0.8.2.1 in central
    found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
    found com.yammer.metrics#metrics-core;2.2.0 in central
    found org.slf4j#slf4j-api;1.7.16 in central
    found org.scala-lang.modules#scala-parser-combinators_2.11;1.0.2 in central
    found com.101tec#zkclient;0.3 in central
    found log4j#log4j;1.2.17 in central
    found org.apache.kafka#kafka-clients;0.8.2.1 in central
    found net.jpountz.lz4#lz4;1.3.0 in central
    found org.xerial.snappy#snappy-java;1.1.2.6 in central
    found org.apache.spark#spark-tags_2.11;2.2.0 in central
    found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 3452ms :: artifacts dl 21ms
    :: modules in use:
    com.101tec#zkclient;0.3 from central in [default]
    com.yammer.metrics#metrics-core;2.2.0 from central in [default]
    log4j#log4j;1.2.17 from central in [default]
    net.jpountz.lz4#lz4;1.3.0 from central in [default]
    org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
    org.apache.kafka#kafka_2.11;0.8.2.1 from central in [default]
    org.apache.spark#spark-streaming-kafka-0-8_2.11;2.2.0 from central in [default]
    org.apache.spark#spark-tags_2.11;2.2.0 from central in [default]
    org.scala-lang.modules#scala-parser-combinators_2.11;1.0.2 from central in [default]
    org.scala-lang.modules#scala-xml_2.11;1.0.2 from central in [default]
    org.slf4j#slf4j-api;1.7.16 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   13  |   2   |   2   |   0   ||   13  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 13 already retrieved (0kB/23ms)
18/02/17 01:03:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/02/17 01:03:01 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
18/02/17 01:03:01 INFO RMProxy: Connecting to ResourceManager at princeton0.field.hortonworks.com/172.26.200.216:8050
18/02/17 01:03:01 INFO Client: Requesting a new application from cluster with 1 NodeManagers
18/02/17 01:03:02 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (43008 MB per container)
18/02/17 01:03:02 INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
18/02/17 01:03:02 INFO Client: Setting up container launch context for our AM
18/02/17 01:03:02 INFO Client: Setting up the launch environment for our AM container
18/02/17 01:03:02 INFO Client: Preparing resources for our AM container
18/02/17 01:03:04 INFO Client: Use hdfs cache file as spark.yarn.archive for HDP, hdfsCacheFile:hdfs://princeton0.field.hortonworks.com:8020/hdp/apps/2.6.4.0-91/spark2/spark2-hdp-yarn-archive.tar.gz
18/02/17 01:03:04 INFO Client: Source and destination file systems are the same. Not copying hdfs://princeton0.field.hortonworks.com:8020/hdp/apps/2.6.4.0-91/spark2/spark2-hdp-yarn-archive.tar.gz
18/02/17 01:03:04 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.kafka_kafka_2.11-0.8.2.1.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.spark_spark-tags_2.11-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.spark-project.spark_unused-1.0.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/com.yammer.metrics_metrics-core-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/com.101tec_zkclient-0.3.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/com.101tec_zkclient-0.3.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.8.2.1.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.kafka_kafka-clients-0.8.2.1.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.slf4j_slf4j-api-1.7.16.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/log4j_log4j-1.2.17.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/log4j_log4j-1.2.17.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/net.jpountz.lz4_lz4-1.3.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.xerial.snappy_snappy-java-1.1.2.6.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/opt/demo/kafka_test.py -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/kafka_test.py
18/02/17 01:03:05 INFO Client: Uploading resource file:/usr/hdp/current/spark2-client/python/lib/pyspark.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/pyspark.zip
18/02/17 01:03:06 INFO Client: Uploading resource file:/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/py4j-0.10.4-src.zip
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/com.101tec_zkclient-0.3.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.8.2.1.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/log4j_log4j-1.2.17.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar added multiple times to distributed cache.
18/02/17 01:03:06 INFO Client: Uploading resource file:/tmp/spark-bc1bedca-6201-4715-812e-cd06f8e6efac/__spark_conf__9099337700911844616.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/__spark_conf__.zip
18/02/17 01:03:06 INFO SecurityManager: Changing view acls to: root
18/02/17 01:03:06 INFO SecurityManager: Changing modify acls to: root
18/02/17 01:03:06 INFO SecurityManager: Changing view acls groups to:
18/02/17 01:03:06 INFO SecurityManager: Changing modify acls groups to:
18/02/17 01:03:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
18/02/17 01:03:06 INFO Client: Submitting application application_1517883514475_0424 to ResourceManager
18/02/17 01:03:06 INFO YarnClientImpl: Submitted application application_1517883514475_0424
18/02/17 01:03:07 INFO Client: Application report for application_1517883514475_0424 (state: ACCEPTED)
18/02/17 01:03:07 INFO Client:
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1518829386408
     final status: UNDEFINED
     tracking URL: http://princeton0.field.hortonworks.com:8088/proxy/application_1517883514475_0424/
     user: root

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,apache spark ,python ,pyspark ,kafka ,streaming ,apache nifi ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}