Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Joining a Billion Rows 20x Faster Than Apache Spark

DZone's Guide to

Joining a Billion Rows 20x Faster Than Apache Spark

Spark is lightning fast when joining a billion records. It can join at about the same speed as a single sum. How can SnappyData

· Performance Zone
Free Resource

One of Databricks’ most well-known blogs is the blog where they describe joining a billion rows in a second on a laptop. Since this is a fairly easy benchmark to replicate, we thought, why not try it on SnappyData and see what happens? We found that for joining two columns with a billion rows, SnappyData is nearly 20x faster. SnappyData is open source on GitHub.

Let’s start with the benchmark as in the original post. The machine is a Dell Latitude E7450 laptop with Core(TM) i7-5600U CPU at 2.60GHz having 16GB of RAM.

Start the SnappyData shell with some decent memory (required for the data load test):

./bin/spark-shell --master local[4] --driver-memory 14g --driver-java-options "-XX:+U

The GC options are similar to that used by a SnappyData cluster by default.

Define a simple benchmark util function:

def benchmark(name: String)(f: => Unit) {
  val startTime = System.nanoTime
  f
  val endTime = System.nanoTime
  println(s"Time taken in $name: " + (endTime - startTime).toDouble / 1000000000 + " seconds")
}

Let’s do a warmup first that will also initialize some Spark components and sum a billion numbers:

spark.range(1000L * 1000 * 500).selectExpr("sum(id)").show()

Spark 2.0 can sum even a billion numbers really fast:

benchmark("Spark 2.0 (sum of a billion)") {
  spark.range(1000L * 1000 * 1000).selectExpr("sum(id)").show()
}

Time taken in Spark 2.0 (sum of a billion): 0.70 seconds.

Very impressive sub-second timing.

Let's try loading these into a SnappyData column table and try again (uses SnappySession, which is the entry point for SnappyData’s extensions).

// turn off compression for this test
val snappy = org.apache.spark.sql.SnappyContext(sc).snappySession
snappy.sql("set spark.sql.inMemoryColumnarStorage.compressed=false")
snappy.sql("drop table if exists rangeTest")
snappy.sql("create table rangeTest (id bigint not null) using column")
snappy.range(1000L * 1000 * 1000).write.insertInto("rangeTest")
snappy.table("rangeTest").selectExpr("sum(id)").show()
benchmark("SnappyData (sum of a billion)") {
  snappy.table("rangeTest").selectExpr("sum(id)").show()
}

Time taken in SnappyData (sum of a billion): 0.44 seconds.

This is somewhat faster than even direct evaluation. One might say "Oh, but all you have to do is read longs from memory and add them.” Let’s try with Spark memory caching.

Drop the table first because there is not enough memory to hold both on the laptop:

snappy.sql("drop table rangeTest")
System.gc()
System.runFinalization()

spark.sql("set spark.sql.inMemoryColumnarStorage.compressed=false")
val rangeData = spark.range(1000L * 1000 * 1000).toDF()
rangeData.cache()
// force materialize the cache
rangeData.count()

rangeData.selectExpr("sum(id)").show()
benchmark("Spark 2.0 cache (sum of a billion)") {
  rangeData.selectExpr("sum(id)").show()
}

Time taken in Spark 2.0 cache (sum of a billion): 7.5 seconds.

Whoa! More than 15X slower.

Some technical folks may recognize this due to lack of vectorization in the Spark 2.0 memory caching mechanism. One may try Spark's vectorized parquet reader, but comparing that is not apples to apples even if the Parquet file is in the OS cache. Incidentally, the Parquet reader is actually quite a bit faster than Spark caching when compression is disabled and the file is completely in the OS cache, again showing the power of vectorization and code generation.

Spark is lightning fast when joining a billion records.

spark.range(1000L * 1000 * 500).join(spark.range(1000L).toDF(), "id").count()
benchmark("Spark 2.0 (join)") {
  spark.range(1000L * 1000 * 1000).join(spark.range(1000L).toDF(), "id").count()
}

Time taken in Spark 2.0 (join): 0.63 seconds.

It’s almost unbelievable that Spark can join at about the same speed as a simple sum. The trick lies in Spark's optimized implementation for single column join on integral types when the values are contiguous where it can use a dense array with upper and lower bounds instead of a full hashmap.

Let’s try the same with a SnappyData replicated table.

snappy.sql("drop table if exists ref")
// table without any options will create a replicated table (a "reference" table)
snappy.sql("create table ref (id bigint not null)")
snappy.range(1000L).write.insertInto("ref")
snappy.range(1000L * 1000 * 1000).join(snappy.table("ref"), "id").count()
benchmark("SnappyData (join)") {
  snappy.range(1000L * 1000 * 1000).join(snappy.table("ref"), "id").count()
}

Time taken in SnappyData (join): 0.64 seconds.

Close enough. Perhaps SnappyData is also using a similar implementation. Let’s make it a bit more complex joining on two columns.

benchmark("Spark 2.0 (two column join)") {
  spark.range(1000L * 1000 * 1000).selectExpr("id", "(id * 5L) as k").join(spark.range(1000L)
.selectExpr("id", "(id * 5) as k").toDF(), Seq("id", "k")).count()

}

Time taken in Spark 2.0 (two column join): 15.6 seconds.

A whopping 25x drop in performance. Let’s try the same with SnappyData:

snappy.sql("drop table if exists ref2")
snappy.sql("create table ref2 (id bigint not null, k bigint not null)")
snappy.range(1000L).selectExpr("id", "(id * 5) as k").write.insertInto("ref2")
benchmark("SnappyData (two column join)") {
  snappy.range(1000L * 1000 * 1000).selectExpr("id", "(id * 5L) as k").join(snappy.table("ref2"),
Seq("id", "k")).count()
}

Time taken in SnappyData (two column join): 0.81 seconds.

Still sub-second and ~19x faster than Spark 2.0!

The improvement seen in this particular case is due to a more generic approach to optimizing joins on contiguous values. While Spark uses a single column dense vector to optimize the single column join case, the SnappyData’s hash-join implementation uses per-column min/max tracking to quickly reject streamed rows if any of the join keys lie beyond the limits. Thus, while Spark’s optimization works only for specialized single column cases, the approach in SnappyData works for a much wider range of queries. Beyond this specific optimization, the hash grouping and join operators in SnappyData are themselves tuned to work much better with its column store.

A more general case join will be an order of magnitude slower (or more depending on the hash map size) than the examples above. Let’s try out an example closer to the real world. This example is a shortened version of NYSE TAQ having a “quote” table with some random values for BID and SYMBOL columns.

// clear previously cached data
spark.catalog.clearCache()
snappy.sql("drop table if exists rangeTest")

// create the quote dataset
val quotes = spark.range(1000L * 1000 * 100).selectExpr("(rand() * 1000.0) as bid", "concat('sym', cast((id % 500) as string)) as sym")
// the symbol dataset holds the top 50 symbols which are of interest
val syms = spark.range(50).selectExpr("concat('sym', cast(id as string)) as sym")
// now join and group on the symbols of interest with maximum BID price for each symbol
quotes.join(syms, "sym").groupBy("sym").agg("bid" -> "max").collect()
benchmark("Spark (join and groupBy)") {
  quotes.join(syms, "sym").groupBy("sym").agg("bid" -> "max").collect()
}

Time taken in Spark (join and groupBy): 12.4 seconds.

So, the performance drops by more than two orders of magnitude slower for such a join and groupBy (the data size is 100M). Much of the time is being spent in random value generation on-the-fly, so let’s try after caching the datasets:

quotes.cache()
quotes.count()
syms.cache()
syms.count()

quotes.join(syms, "sym").groupBy("sym").agg("bid" -> "max").collect()
benchmark("Spark (join and groupBy)") {
  quotes.join(syms, "sym").groupBy("sym").agg("bid" -> "max").collect()
}

Time taken in Spark (join and groupBy): 6.6 seconds.

Caching improves the performance quite a bit as was expected. How well does SnappyData do for such a query? Let’s find out:

val snappy = org.apache.spark.sql.SnappyContext(sc).snappySession
val quotes = snappy.range(1000L * 1000 * 100).selectExpr("(rand() * 1000.0) as bid", "concat('sym', cast((id % 500) as string)) as sym")
val syms = snappy.range(50).selectExpr("concat('sym', cast(id as string)) as sym")

// create and populate the tables
snappy.sql("drop table if exists quote")
snappy.sql("create table quote (bid double not null, sym string not null) using column")
quotes.write.insertInto("quote")
snappy.sql("drop table if exists symbol")
snappy.sql("create table symbol (sym string not null)")
syms.write.insertInto("symbol")

// same query as before but with datasets on the SnappyData tables
val quotes = snappy.table("quote")
val syms = snappy.table("symbol")
quotes.join(syms, "sym").groupBy("sym").agg("bid" -> "max").collect()
benchmark("SnappyData (join and groupBy)") {
  quotes.join(syms, "sym").groupBy("sym").agg("bid" -> "max").collect()
}

Time taken in SnappyData (join and groupBy): 0.49 seconds.

This is ~13x faster than Spark cache. Beyond numbers, it brings many such queries on larger datasets and clusters into the realm of real-time analytics.

Why does SnappyData achieve this performance gain?

Apache Spark’s optimizations are designed for disparate data sources which tend to be mostly external, such as HDFS or Alluxio. For better response times to queries on a non-changing data sets, Spark recommends caching data from external data sources as cached tables in Spark. Then, they recommend running the queries on these cached data structures where the data is stored in optimized column formats. While this dramatically improves performance, we found a number of areas for further improvements. For instance, a scan of columns managed as byte arrays is copied into an UnsafeRow object for each row, and then the column values are read from this row breaking vectorization and introducing lots of expensive copying.

Addressing these inefficiencies, however, is not that easy as the data in the column store may have been compressed using a number of different algorithms like dictionary encoding, run length encoding etc. SnappyData has implemented alternate decoders for these, so it can get the full benefit of code generation and vector processing. When compared to Spark’s vectorized readers (Parquet) that need to copy the data to an in-memory ColumnVector, the column table of SnappyData is able to directly overlay the decoders on serialized data without the need of any additional copying. The end result: scan speed of column tables now matches custom code which just stores and reads directly from column arrays (e.g. read/write an integer array for integer column).

Similar optimizations have also been applied to row table scans through the Spark Engine, thus giving about 5x better query response times compared to the pre-0.7 state of the product.

Hash joins and grouped aggregations have also seen significant improvements compared to the default Spark operators. The key idea is avoiding creation of UnsafeRows for both key lookups as well as storing the joined/grouped result. The key hash code is now inline generated from key variables, and so is the equality comparison without requiring the creation of a lookup key object. These changes combined with a finely tuned hash map implementation provides multi-fold improvements in join and group by operators in comparison to even the single-column vectorized operators of Spark (and even more when Spark has to use non-vectorized ones).

Some of the other significant optimizations include:

  • SnappyData's storage layer now allows for collocation of partitioned tables. This information has been used to eliminate the most expensive portions of many joins (like shuffle) and, turned them into collocated one-to-one joins. This release significantly enhances the physical plan strategy phase to aggressively eliminate data shuffle and movement for many more cases.

  • Support for plan caching to avoid query parsing, analysis, optimization, strategy, and preparation phases.

  • Spark has been changed to broadcast data with tasks themselves to significantly reduce task latencies. A set of tasks scheduled from the driver to an executor is grouped as a single TaskSet message with common task data sent only once instead of separate messages for each task. Task data is also compressed to further reduce network usage.

  • An efficient, pooled version of Kryo serializer has been added that is now used by default for data, closures, and lower level netty messaging. This, together with improvements mentioned in the previous point, significantly reduce overall latency of short tasks (from close to 100ms down to a few ms). These also reduce the CPU consumed by the driver enhancing the concurrency of tasks, especially shorter ones.

  • Enhancements in column level statistics allow for skipping column batches based on query predicates if possible. For example time range based queries will now be able to scan only the batches that fall in the said range and skip others, providing a huge boost to such queries on time-series data.

  • Alternate hash aggregation and hash join operators have been added that have been finely optimized for SnappyData storage to make full use of storage layout with vectorization, dictionary encoding to provide an order of magnitude performance advantage over Spark's default implementations.

We are working on several other ideas on the performance front like updatable global and local indexes on column tables too to optimize join queries. Row tables already support these indexes. Additionally, we are using WSCG to optimize several other areas like faster stream and batch ingests. Stay tuned for more performance improvements in the next release.

Topics:
apache spark ,performance ,speed ,joining ,snappydata

Published at DZone with permission of Sumedh Wale. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}