Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Running Spark SQL CERN Queries 5x Faster on SnappyData

DZone's Guide to

Running Spark SQL CERN Queries 5x Faster on SnappyData

This test query consists of a join with two conditions: an equality predicate on the column bucket and a non-equality condition. The result is a 4-5x improvement.

· Performance Zone
Free Resource

Transform incident management with machine learning and analytics to help you maintain optimal performance and availability while keeping pace with the growing demands of digital business with this eBook, brought to you in partnership with BMC.

Following the improvements made to Spark SQL execution in SnappyData, we recently revisited a blog by the core Spark engineering team about billion row joins on Spark. We showed how our optimizations deliver ~20X speedups over Apache Spark for identical queries run on identical hardware.

In this blog, we take a look at Luca Canali's CERN blog, which investigated the speed differences between Spark 1.6 and Spark 2.0 in executing a join with two conditions. The blog concluded that based on the results, Spark 2.0 was 7X faster than Spark 1.6 — which is pretty phenomenal considering that Spark 1.6 was no tortoise when it came to running SQL workloads.

The improvements in Spark 2.0 were primarily driven by whole stage code generation. The SnappyData team extended those innovations and also used techniques like colocation to further boost performance by up to 20X for Spark SQL queries running in SnappyData.

The original blog used a Python program to create the tables and run the query. Being Scala hackers, we chose to reimplement the program in Scala (see below), populate the table with the exact same 10 million rows, and re-run the test. We made sure that we used comparable machines in Microsoft Azure to run the tests. The results show that on a single machine with 16 cores, SnappyData can run the exact same queries 4-5 times faster than Spark 2.1. Note that we used the same class of machines that were used in the original blog.

Results

Performance times were averaged over five runs. The first test was on a single machine with a 16 core CPU, 2GB driver memory, and Ubuntu and Java 8 installed. The second test was on two machines, each with 8 core CPUs, 4GB executor memory, and Ubuntu and Java 8 installed.

One Machine

  • Spark 2.1.0: ~14.4 minutes.
  • SnappyData: ~2.9 minutes.

Two Machines

  • Spark 2.1.0: ~12 minutes.
  • SnappyData: ~3 minutes.

The Benchmark

Below, we show the Scala code used to run the single machine benchmark, which is the same as the Python code used in the original blog. This code was run on a single machine with a 16 core CPU and 2GB of driver memory on Azure.

 ./spark-shell --driver-memory 2g

      def benchmark(name: String, times: Int = 5, warmups: Int = 2)(f: => Unit) : Double = {
        for (i <- 1 to warmups) {
          f
        }
        val startTime = System.nanoTime
        for (i <- 1 to times) {
          f
        }
        val endTime = System.nanoTime
        val timeTaken = (endTime - startTime).toDouble / (times * 1000000.0)
        println(s"Average time taken in $name for $times runs: $timeTaken millis")
        timeTaken
      }

      val snSession = new org.apache.spark.sql.SnappySession(sc)

      val test_numrows = 10000000

      snSession.range(0,test_numrows,1).createTempView("t0")

      snSession.dropTable("t1", ifExists = true)

      snSession.sql("Create table t1 using column options(partition_by 'bucket') as (select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0)")

      snSession.sql("select count(*) from t1").show()
      snSession.sql("describe t1").show()

      benchmark("Snappy Perf") {
        snSession.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show()
      }

Following the original test code, we register ten million rows into a data frame. A column table is created in SnappyData using the values in the data frame as data. We then count the rows and show the schema has been created correctly. Finally, we pass the actual test query into the benchmark function to determine how much time it takes.

This test query is exactly the same as the original blog: it consists of a join with two conditions: an equality predicate on the column bucket and a non-equality condition. The query also contains an aggregation operation. As reported above, the result is a 4-5x improvement over Spark 2.1. The code used to test this query within Spark 2.1 can be found at the end of this blog along with the 2 machine code; it is exactly similar to the code used in the original Databricks blog.

So, what is causing this improvement? Much like the original blog, the key to the performance boost is in the query plans for both Spark 2.1 and SnappyData. They are shown below:

The Query Plans

Image title

ShuffleJoin in Spark involves moving data from one node to another to colocate data of a key on the same node. As one can imagine, shuffling is one of the biggest costs of data processing in Spark SQL, as it involves writing to disk, serializing data over networks, multiple copying of data, etc.

In certain cases, SnappyData can remove the shuffle operations altogether as shown in the above query plan. The LocalJoin operator exploits the knowledge that replicated data are present in all nodes and if any relation or intermediate result is present in all nodes, it never does a remote fetch or shuffle. It just scans through records of both the joined relations locally. Like with broadcast joins, LocalJoin never moves data across nodes.

LocalJoin performs a local hash join of two child relations. If a relation (out of a data source) is already replicated across all nodes then rather than doing a Broadcast join, which can be expensive, this join just scans through the single partition of the replicated relation while streaming through the other relation.

The Exchange node in the SnappyData query plan has been avoided by giving the option "partition_by bucket" while creating the table t1. This means the data is already partitioned while inserting the rows and the SnappyData query engine is aware of this fact. Hence, it avoids the expensive shuffle.

Because the Exchange node is avoided, all the nodes of query plan are part of a single generated code. In terms of WholeStageCodeGeneration, this is very performant.

SnappyHashAggregate. Alternate hash aggregation and hash join operators have been added that have been finely optimized for SnappyData’s storage to make full use of its storage layout which includes vectorization, dictionary encoding, and more. These optimizations provide an order of magnitude performance advantage over Spark's default implementations.

In short, we took the queries CERN used to test performance in Spark SQL 1.6 vs Spark SQL 2.0 and ran them on an equivalent machine in SnappyData. While CERN found a 7x improvement from Spark 1.6 to Spark 2.0, SnappyData delivered an additional 5x improvement over Spark 2.0 in performance.

SnappyData, which turns Apache Spark into a hybrid transactional analytics database that is fully compatible with Apache Spark, also continues to make performance improvements that speed up Spark SQL workloads significantly. Stay tuned for significant improvements to ingestion and concurrency in upcoming releases.

Appendix

Below we show the Scala code used to run the two machine benchmark. This code was run on two machines with a 8 core CPU and 4GB of executor memory on Azure.

package org.apache.spark.examples.snappydata

import java.io.{File, PrintWriter}

import com.typesafe.config.Config
import org.apache.spark.sql.{SnappyJobValid, SnappyJobInvalid, SnappyJobValidation, SnappySession,SnappySQLJob}
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.{Failure, Success, Try}

object CERNBlogImplementationEnhanced extends SnappySQLJob{

  override def runSnappyJob(snc: SnappySession, jobConfig: Config): Any = {
    def getCurrentDirectory = new java.io.File(".").getCanonicalPath
    val pw = new PrintWriter("CERNBlogImplementationEnhanced.out")

    Try {
  def benchmark(name: String, times: Int = 5, warmups: Int = 2)(f: => Unit) : Double = {
    for (i <- 1 to warmups) {
      f
    }
    val startTime = System.nanoTime
    for (i <- 1 to times) {
      f
    }
    val endTime = System.nanoTime
    val timeTaken = (endTime - startTime).toDouble / (times * 1000000.0)
    pw.println(s"Average time taken in $name for $times runs: $timeTaken millis")
    timeTaken

  val test_numrows = 10000000

  //creating a temporary table t0
      snc.dropTable("t0", ifExists = true)

      snc.range(0,test_numrows,1).createTempView("t0")

      snc.dropTable("t1", ifExists = true)

      snc.sql("Create table t1 using column options(partition_by 'bucket') as (select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0)")

      pw.println(snc.sql("select count(*) from t1").show())
      pw.println(snc.sql("describe t1").show())

  benchmark("Snappy Perf"){
    pw.println(snc.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket  and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show)
  }
    } match {
      case Success(v) => pw.close()
        s"See ${getCurrentDirectory}/CERNBlogImplementationEnhanced.out"

      case Failure(e) => pw.close();
        throw e;
    }
      }
  override def isValidJob(snc: SnappySession, config: Config): SnappyJobValidation = {
    SnappyJobValid()
  }
}

Commands for running the queries on the spark shell of Spark 2.1.0 on a single machine of 16 core CPU and 2 GB driver memory on Azure:

./spark-shell --driver-memory 2g

def benchmark(name: String, times: Int = 5, warmups: Int = 2)(f: => Unit) : Double = {
  for (i <- 1 to warmups) {
    f
  }
  val startTime = System.nanoTime
  for (i <- 1 to times) {
    f
  }
  val endTime = System.nanoTime
  val timeTaken = (endTime - startTime).toDouble / (times * 1000000.0)
  println(s"Average time taken in $name for $times runs: $timeTaken millis")
  timeTaken
}
val test_numrows = 10000000

spark.range(0,test_numrows,1).registerTempTable("t0")

spark.sql("select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0").cache().registerTempTable("t1")

spark.sql("select count(*) from t1").show()

spark.sql("desc t1").show()

benchmark("Spark Performance") {
spark.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show()
}

Commands for running the queries on the spark shell of Spark 2.1.0 on two machines each of 8 core CPU and each executor node had 4 GB executor memory on Azure:

./spark-shell --driver-memory 2g --master spark://IP:PORT

def benchmark(name: String, times: Int = 5, warmups: Int = 2)(f: => Unit) : Double = {
  for (i <- 1 to warmups) {
    f
  }
  val startTime = System.nanoTime
  for (i <- 1 to times) {
    f
  }
  val endTime = System.nanoTime
  val timeTaken = (endTime - startTime).toDouble / (times * 1000000.0)
  println(s"Average time taken in $name for $times runs: $timeTaken millis")
  timeTaken
}
val test_numrows = 10000000

spark.range(0,test_numrows,1).registerTempTable("t0")

spark.sql("select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0").cache().registerTempTable("t1")

spark.sql("select count(*) from t1").show()

spark.sql("desc t1").show()

benchmark("Spark Performance") {
spark.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show()
}

SnappyData is open-source on GitHub.

Evolve your approach to Application Performance Monitoring by adopting five best practices that are outlined and explored in this e-book, brought to you in partnership with BMC.

Topics:
snappydata ,spark sql ,performance ,cern ,speed

Published at DZone with permission of Sudhir Menon. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}