Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Anatomy of a Scala Spark Program

DZone's Guide to

Anatomy of a Scala Spark Program

An annotated code snippet with examples of how to write great Spark code with Scala.

· Big Data Zone
Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

Here is a simple Spark 1.6.x program written in Scala. Look at the annotated source for an example of how to write Spark.

package com.YOURCOMPANY.YOURMODULE

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.{KryoSerializer}

// Case classes are  JavaBeans without the work.
case class LogRecord( clientIp: String, clientIdentity: String, user: String, dateTime: String, request:String,
                      statusCode:Int, bytesSent:Long, referer:String, userAgent:String )

// Singleton Spark Job
object Logs {

    // A Regular Expression Pattern for parsing our Log Record
    val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)"""".r

    // this simple method extracts the pieces of the line 
    def parseLogLine(log: String): LogRecord = {
      try {
        val res = PATTERN.findFirstMatchIn(log)

        if (res.isEmpty) {
          println("Rejected Log Line: " + log)
          LogRecord("Empty", "-", "-", "", "",  -1, -1, "-", "-" )
        }
        else {
          val m = res.get
          // NOTE:   HEAD does not have a content size.
          if (m.group(9).equals("-")) {
            LogRecord(m.group(1), m.group(2), m.group(3), m.group(4),
              m.group(5), m.group(8).toInt, 0, m.group(10), m.group(11))
          }
          else {
            LogRecord(m.group(1), m.group(2), m.group(3), m.group(4),
              m.group(5), m.group(8).toInt, m.group(9).toLong, m.group(10), m.group(11))
          }
        }
      } catch
      {
        case e: Exception =>
          println("Exception on line:" + log + ":" + e.getMessage);
          LogRecord("Empty", "-", "-", "", "-", -1, -1, "-", "-" )
      }
    }


  //// Main Spark Program
  def main(args: Array[String]) {

    // Set the name of the spark program
    val sparkConf = new SparkConf().setAppName("Logs")

    // only use what you need, I only want 4 cores
    sparkConf.set("spark.cores.max", "4")

    // use the faster more efficient Kryo Serializer instead of built-in Java one
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)

    // Tune Spark to 11 and use the Tungsten acceleartor
    sparkConf.set("spark.sql.tungsten.enabled", "true")

    // Turn on event logging so we can see in our history server
    sparkConf.set("spark.eventLog.enabled", "true")

    // I will name this app so I can see it in the logs
    sparkConf.set("spark.app.id", "Logs")

    // snappy compression works well for me
    sparkConf.set("spark.io.compression.codec", "snappy")

    // compress RDDs to save space
    sparkConf.set("spark.rdd.compress", "true")

    // create my context from this configuration to start.   Must do!
    val sc = new SparkContext(sparkConf)

    // read my log file
    val logFile = sc.textFile("access2.log")

    // map each line to the function to parse it and filter out empties
    val accessLogs = logFile.map(parseLogLine).filter(!_.clientIp.equals("Empty")).cache()

    // cache it for future use

    try {
      println("Log Count: %s".format(accessLogs.count()))

      // lets bring back 25 rows to the worker to examine in the console

      accessLogs.take(25).foreach(println)

      // Calculate statistics based on the content size.
      val contentSizes = accessLogs.map(log => log.bytesSent).cache()
      val contentTotal = contentSizes.reduce(_ + _)

      println("Number of Log Records: %s  Content Size Total: %s, Avg: %s, Min: %s, Max: %s".format(
        contentSizes.count,
        contentTotal,
        contentTotal / contentSizes.count,
        contentSizes.min,
        contentSizes.max))
    } catch
    {
      case e: Exception =>
      println("Exception:" + e.getMessage);
      e.printStackTrace();
    }

    // stop this job, we are done.
    sc.stop()
  }
}


Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:
apache spark ,spark ,scala ,big data

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}