Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Integrating Apache Spark 2.x Jobs With Apache NiFi 1.4+

DZone's Guide to

Integrating Apache Spark 2.x Jobs With Apache NiFi 1.4+

This tutorial will show you several different ways to integrate jobs in Apache NiFi with Apache Spark via Apache Livy for various use cases.

· Integration Zone ·
Free Resource

The new Gartner Critical Capabilities report explains how APIs and microservices enable digital leaders to deliver better B2B, open banking and mobile projects.

There are many ways to integrate Apache NiFi and Apache Spark.

We can call Apache Spark Streaming via S2S (Apache NiFi's Site to Site) or Kafka. If you want to execute a regular Apache Spark job, you can do that via Apache Livy which is included in HDP 2.6+. This is how Apache Zeppelin integrates with Apache Spark, so it's secure and a reasonable approach.

I use this approach when I want to use Spark to process part of my process in the middle of an Apache NiFi flow.

Syntax for Calling a Job

This job is stored in HDFS as /apps/logs*jar with the class name com.dataflowdeveloper.logs.Logs.

Schema For Apache Livy Status Messages

{
 "type": "record",
 "name": "livystatus",
 "fields": [
  {
   "name": "id",
   "type": [
    "null",
    "int"
   ]
  },
  {
   "name": "state",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "appId",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "driverLogUrl",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "sparkUiUrl",
   "type": [
    "null",
    "string"
   ]
  }
 ]
}

Example Apache Spark Apache Livy Status Message Reformatted for Usage

{
  "sparkUiUrl" : "http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0022/",
  "id" : "19",
  "state" : "success",
  "driverLogUrl" : "http://princeton-14-2-1.field.hortonworks.com:8188/applicationhistory/logs/princeton-14-2-1.field.hortonworks.com:45454/container_e02_1511839325046_0022_01_000001/container_e02_1511839325046_0022_01_000001/livy",
  "appId" : "application_1511839325046_0022"
}

Apache Livy Status Monitoring Flow

Query Record Processor for Querying and Determining With Result to Do

Apache Ambari Screen for Turning off CSRF Protection for Apache Livy

Results in JSON from Apache Livy REST Call

Hortonworks Schema Registry

Apache Spark Job Submitted and Running

The Apache Spark Job Environment During the Run

Results From the Apache Spark Job Shown in YARN Logs

Apache YARN Run Information on the Apache Spark Job

This is the result of a completed message. As you can see we get some really cool information here. The State is really important, once it has success you can do the other processing you need.

Driver Log URL will point you to the logs, you could ingest this with Apache NiFi.

Spark UI URL will point you to the running Spark logs.

Raw REST JSON Message

{
  "from" : 0,
  "total" : 1,
  "sessions" : [ {
    "id" : 12,
    "state" : "running",
    "appId" : "application_1511839325046_0015",
    "appInfo" : {
      "driverLogUrl" : "http://princeton-14-2-1.field.hortonworks.com:8042/node/containerlogs/container_e02_1511839325046_0015_01_000001/livy",
      "sparkUiUrl" : "http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0015/"
    },
    "log" : [ "\t diagnostics: [Tue Nov 28 19:24:09 +0000 2017] Scheduler has assigned a container for AM, waiting for AM container to be launched", "\t ApplicationMaster host: N/A", "\t ApplicationMaster RPC port: -1", "\t queue: default", "\t start time: 1511897049505", "\t final status: UNDEFINED", "\t tracking URL: http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0015/", "\t user: livy", "17/11/28 19:24:09 INFO ShutdownHookManager: Shutdown hook called", "17/11/28 19:24:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8ada7f2-43d0-4823-8816-6e930101f2f1" ]
  } ]
}

REST URL: http://yourlivyapi:8999/batches/

You can see this in Ambari.

My Apache Spark Job Configuration

My Apache Spark job needs some data.

hdfs dfs -mkdir -p /user/livy/data/
hdfs dfs -put access3.log /user/livy/data
hdfs dfs -chmod -R 777 /user/livy/data

The Run of the Chosen Apache Spark Job

Log Type: stdout
Log Upload Time: Tue Nov 28 17:59:02 +0000 2017
Log Length: 34968
===== Log Count: 206857
LogRecord(194.187.168.230,-,-,20/Feb/2016:00:00:45 -0500,GET,200,187,-,Mozilla/5.0 (compatible; Qwantify/2.2w; +https://www.qwant.com/)/*)
LogRecord(194.187.168.230,-,-,20/Feb/2016:00:00:46 -0500,GET,200,24810,-,Mozilla/5.0 (compatible; Qwantify/2.2w; +https://www.qwant.com/)/*)
LogRecord(192.0.84.33,-,-,20/Feb/2016:00:01:56 -0500,HEAD,200,0,-,jetmon/1.0 (Jetpack Site Uptime Monitor by WordPress.com))
LogRecord(66.249.64.8,-,-,20/Feb/2016:00:02:13 -0500,GET,200,28486,-,Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html))
LogRecord(192.0.84.33,-,-,20/Feb/2016:00:02:15 -0500,HEAD,200,0,-,jetmon/1.0 (Jetpack Site Uptime Monitor by WordPress.com))
root
 |-- clientIp: string (nullable = true)
 |-- clientIdentity: string (nullable = true)
 |-- user: string (nullable = true)
 |-- dateTime: string (nullable = true)
 |-- request: string (nullable = true)
 |-- statusCode: integer (nullable = true)
 |-- bytesSent: long (nullable = true)
 |-- referer: string (nullable = true)
 |-- userAgent: string (nullable = true)

+-------+-----------------+
|summary|        bytesSent|
+-------+-----------------+
|  count|           206857|
|   mean|28017.72503226867|
| stddev|137716.9426060656|
|    min|                0|
|    max|         15379053|
+-------+-----------------+

== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).clientIp, true) AS clientIp#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).clientIdentity, true) AS clientIdentity#11, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).user, true) AS user#12, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).dateTime, true) AS dateTime#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).request, true) AS request#14, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).statusCode AS statusCode#15, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).bytesSent AS bytesSent#16L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).referer, true) AS referer#17, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).userAgent, true) AS userAgent#18]
+- Scan ExternalRDDScan[obj#9]
After writing results
===== Number of Log Records: 206857  Content Size Total: 5795662547, Avg: 28017, Min: 0, Max: 15379053
=====Status Code counts: [(404,21611),(200,170127),(302,1467),(206,87),(304,1260),(406,3242),(500,1106),(409,28),(301,4968),(403,2601),(407,123),(429,1),(405,236)]
=====IP Addresses Accessed > 10 times: [51.255.65.87,146.127.253.45,201.239.138.159,157.55.39.157,1.22.196.230,54.211.201.215,180.179.40.44,....]

My example simple Apache Spark job to parse Apache Logs is included in the GitHub referenced below.

Gotcha

You may need to disable this (set to false) in Ambari under Apache Spark area:

livy.server.csrf_protection.enabled

Directories

  • /var/log/livy2/livy-livy-server.out

Source Code on GitHub: https://github.com/tspannhw/apachelivy-nifi-spark2-integration

The new Gartner Critical Capabilities for Full Lifecycle API Management report shows how CA Technologies helps digital leaders with their B2B, open banking, and mobile initiatives. Get your copy from CA Technologies.

Topics:
apache nifi ,integration ,apache livy ,scala ,apache spark

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}