Over a million developers have joined DZone.

How Mutable DataFrames Improve Join Performance in Spark SQL

DZone's Guide to

How Mutable DataFrames Improve Join Performance in Spark SQL

The ability to combine database-like mutability into Spark provides a way to stream processing and SQL querying within the comforts of a familiar Spark API.

· Database Zone ·
Free Resource

Navigating today's database scaling options can be a nightmare. Explore the compromises involved in both traditional and new architectures.

Recently, a user wrote into the Spark Mailing List asking about how to refresh data in a Spark DataFrame without reloading the application. The user stated:

“We have a Structured Streaming application that gets [credit card] accounts from Kafka into a streaming data frame. We have a blacklist of accounts stored in S3 and we want to filter out all the accounts that are blacklisted. So, we are loading the blacklisted accounts into a batch data frame and joining it with the streaming data frame to filter out the bad accounts. ... We wanted to cache the blacklist data frame to prevent going out to S3 every time. Since the blacklist might change, we want to be able to refresh the cache at a cadence, without restarting the whole app.”

This application makes perfect sense. A credit card issuer is liable for charges made on cards that are stolen, misplaced, or otherwise misused. In 2012, unauthorized/fraudulent credit card transactions cost banks $6.1 billion. It is in the credit card issuer’s interest to ensure that transactions involving a blacklisted card are caught right after the card has been flagged.

The definitive reply to his email came later in the thread. It stated:

“Yes, you will have to recreate the streaming Dataframe along with the static DataFrame, and restart the query. There isn't a currently feasible to do this without a query restart. But restarting a query WITHOUT restarting the whole application + Spark cluster is reasonably fast. If your application can tolerate 10-second latencies, then stopping and restarting a query within the same Spark application is a reasonable solution.”

In short, a streaming DataFrame collects credit card transactions coming in over a messaging system like Apache Kafka. Existing blacklisted credit cards are loaded from a data store (in this case, S3) into a static DataFrame in Spark’s cache. Spark SQL is written to join the streaming DataFrame with the static DataFrame and detect any incoming blacklisted cards. This works great until a new blacklisted card is added to the datastore (S3). Now the DataFrame containing the blacklisted cards must be reloaded from S3. During the reload, the streaming DataFrame must be stopped and then restarted while the static DataFrame reloads. As stated in the reply, this can take 10 seconds or longer, depending on data volumes. In that time frame, transactions could be occurring on that blacklisted card since it has not been recognized yet. Solving the problem of recognizing blacklisted credit card accounts in a stream of credit cards accounts is impossible to do in real-time in a pure Spark solution.

So how can we use mutable DataFrames to make this solution faster?

Using SnappyData, we can simplify the process of adding new blacklisted cards to the blacklist DataFrame and greatly improve latency. We will follow the below overview with code snippets:

  1. Define a streaming DataFrame that represents incoming credit card transactions.

  2. Define a DataFrame that represents the blacklisted cards table. (Note: This DF will be mutable.)

  3. Join the streaming DF with the blacklist DF and remove fraudulent cards. Write those transactions out to a rejected transactions table. Write the safe transactions to their own table.

  4. Optional: Define a machine learning job using Spark MLLib that operates on the rejected transactions table to train models to detect fraud automatically.

So what does this look like in code?

Below, we create the SnappySession object and use it to create a mutable row table for the blacklisted cards and then a column table for the rejected transactions. These tables are immediately materialized as DataFrames.

val snSession = new SnappySession(spark.sparkContext)

// Create a table to hold black listed cards details; this table is Mutable
    snSession.createTable(tableName = blackListTable,
      provider = "row",
      schema = blackListCardsSchema,
      options = Map.empty[String, String])

// Create a SnappyData column table to store all failed transactions
    snSession.createTable(tableName = RejectedTransactionsTable,
      provider = "column",
      schema = rejectedTableSchema,
      options = Map.empty[String, String])

Next, we create a SnappyStreamingSession object to handle the streaming transactions then build the Kafka stream. We then create an object for each message in the Kafka stream. Finally, we set up a separate stream for the blacklisted cards. In our example, we’re building the blacklist DF from a Kafka stream as opposed to S3 for simplicity. This does not change the what is going on with the join between DataFrames and the blacklist DF is being continually updated by Kafka.

val snStreamSession = new SnappyStreamingContext(snSession.sparkContext, Seconds(batchIntervalSeconds))

// Creates a transaction stream(txnKafkaStream) that pulls messages from Kafka(cc_transactions_topic).
    val txnKafkaStream = 
KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](
streamSession, kafkaParams, Set("cc_transactions_topic")).map(_._2)

// Create a CardTransaction object for each message in txnKafkaStream
    val cardTxns = txnKafkaStream.map(s => StringToTxn.parseLine(s))

// Creates a black list stream(blackListKafkaStream) that pulls messages from Kafka(cc_blacklist_topic).
    val blackListKafkaStream =
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        streamSession, kafkaParams, Set("cc_blackList_topic")).map(_._2)

// Create a BlackListEntry object for each message in blackListCards
    val blackListCards = blackListKafkaStream.map(s => StringToBlackList.parseLine(s)

Next, we insert the new blacklisted cards into the blackList table and join the main transaction stream with the blackList table to discover fraudulent transactions and write them into the rejected transactions table.

// For each batch in blackListCards stream, either insert into or delete from the CC_BLACKLIST_CARDS
    blackListCards.foreachRDD(rdd => {
      val streamDS = rdd.toDS()
      // deleteFrom is a SnappyData extension and deletes data from the table.
      streamDS.where("ACTION = 'DELETE'").write.deleteFrom(blackListTable)
      // PutInto is a SnappyData extension which either insert a row or update the row
      // if row is already existing in the table.
      streamDS.where("ACTION = 'INSERT'").write.putInto(blackListTable)

// For each batch of transaction stream (cardTxns), join with CC_BLACKLIST_CARDS table to reject transactions with black listed cards
    cardTxns.foreachRDD(rdd => {
      val streamDS = rdd.toDS()

// Create a DataFrame from the blacklist table
      val blackListedCards = snSession.table(blackListTable)

// Using Semi-Join to select only those rows which have matching rows in blacklist table
      val failedTxns = streamDS.join(blackListedCards, $"CC_NUMBER" === $"CARD_NUMBER", "leftsemi")
      val failedTransactions = failedTxns.select("CC_NUMBER", "CC_TYPE", "CC_NAME", "MERCHANT_ID", "CC_PUBLICKEY")

// insert the failed transactions DataFrame into the column table (RejectedTransactionsTable)

      val totalFailedTxnCount = snSession.table(RejectedTransactionsTable).count
      println(s"Total Failed transaction count = $totalFailedTxnCount")

Finally, we write the valid transactions into a Parquet file:

// Using Left Anti Semi-Join to get only the valid transactions and writing them into parquet files
      val passedTxns = streamDS.join(blackListedCards, $"CC_NUMBER" === $"CARD_NUMBER", "leftanti")
      val batchTime = System.currentTimeMillis()

The ability to combine database-like mutability (and reliability) into Apache Spark provides users with a single platform that can handle stream processing, SQL querying, and machine learning all within the comforts of a very familiar Spark API, and all with live data.

In the above code, we’ve shown how simple it is to have our blacklisted cards DataFrame be continually updated with new data (in our case, using a Kafka stream) while maintaining a join to a streaming DataFrame of transactions without restarting or reloading. Because of this mutability, SnappyData can speed up the 10-second latency incurred by vanilla Spark every time the Spark SQL join needs to be restarted. Further, because of our 20x speedup over Spark’s native cache, the performance of the above application will be even faster.

A data platform built for the big data era should offer reliability, security, elasticity, scale, and latency, and democratize the ability for users to build live applications that are reactive, responsive and offer true predictive analytics. 

Understand your options for deploying a database across multiple data centers - without the headache.

database ,spark ,sql ,dataframe ,join performance ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}