Applying SQL Analytics and Windowing Functions to Spark Data Processing

DZone 's Guide to

Applying SQL Analytics and Windowing Functions to Spark Data Processing

The purpose of this post is to share my latest experience with Talend in the field, which is also the first time I have gotten to see the capacity Talend has...

· Big Data Zone ·
Free Resource

The purpose of this post is to share my latest experience with Talend in the field, which is also the first time I have gotten to see the capacity Talend has to perform SQL queries inside any Talend big data batch jobs using the Spark framework. In doing so, I want to teach you how to apply SQL analytics and windowing functions to process data inside Spark!

Depending on how familiar you are with the Talend platform, you may or may not know about how our big data integration solution gives developers and power users the ability to generate code that is natively executable on a Hadoop cluster; whether it's MapReduce, Spark, Spark Streaming, or Storm.

Technically, Talend will not require an agent to be installed on your Hadoop cluster; a connection to YARN is the only prerequisite. This is the beauty of using a solution based on open-source standards that have always taken a no-vendor-lock-in approach.

Because Talend's framework is open, users will be able to inject code inside their integration jobs. Most of the time, they will reuse a Java routine when they can't easily achieve a processing step on the data they are working with. In today's blog, I want to focus on how to leverage Spark SQL code within a big data Spark job, and why it matters a lot!

The Need for Speed!

Everything started with what looked like a simple use case for a recent big data POC. It was a classic banking use case where we needed to show how to calculate a running balance from transaction data. Basically, we had two data sources:

  1. The list of transactions of the day to process (3M+ records)
  2. The historical table of end-of-the-day balance for all the accounts in the bank (8M+ records)

While this scenario would have been simple on the whiteboard, when it finally came time to implement the use case in Talend Studio, and it must be performant, this is when the rubber met the road!

Being kind of a brute-force approach guy, I thought:

"Get the transactions table sorted, add a sequence number, get the end-of-day balance value you need from that 9M+ rows historic table, then put that in the cache (thanks to a nice tCacheOutput component). Then you'll need to do a join between your cached transactions table and the original transactions table, get some join there and BOOM, calculation, computation, magic, BOOM, result."

Of course, that sounds a little bit of an overkill, as it would have required the use a lot of memory and the need to compare a lot of rows together just to get the previous amount of each row. This approach wouldn't have worked, anyway, given that the prospect informed us, "Oh, by the way, your competition did that in three minutes."

All right, it was time to think about a smarter way, brute force has its limitations after all!

SQL Analytics Function Baby!

When it comes to smart stuff in big data, let the truth be told, my peers don't look at me first, they look at our internal Black Belt Bullet Proof Certified Fellows here at Talend. I did the same, got this Jedi-like response: "Little Padawan, the way of tSQLRow, take you must".

My Jedi Data Master started getting quite excited when the challenge came so he helped to build the first draft of jobs where he was using a tSQLRow.

The beauty of tSQLRow in a Spark job is that you can use SQL queries inside your job, and that query will apply ON THE JOB DATA! Yeah, yeah, not a database or any other hybrid stuff, the actual data that is gathered inside your job! So all the WHERE, ORDER BY, GROUP BY, SUM(), COUNT(), and other funny operations can be done through that SQL API inside a job. Yes, that's cool!

These specifics functions exist since version Spark 1.4+; Talend is already at Spark 2.1, so it's usable there.

Thinking about the use case, it was about:

  • Getting the latest value for an end-of-day balance
  • Summing up transaction amount row by row inside each account

Not to mention the need to have some temporary variable to deal with the other constraints of the use case that are not part of the explanation here (i.e. dealing with figure transactions values, generating a row number, etc.).

And this is where I discovered the existence of analytics and windowing functions in SQL; probably not a surprise for some of you reading this article, but a totally new discovery for me!

This where I started getting my hands dirty, and I must say, I just couldn't get enough!

Partition it, sort it, window it, compute it, filter it, shuffle it...

First, let's have a look at the data used for my local test with local Spark engine on my Talend Studio (BTW, I'm using Talend Studio 6.3.1 — the Big Data Platform edition).

EOD balance sample data:

100|12345|2016-06-08 00:00:00|1.02
100|12345|2016-06-07 00:00:00|0.02
102|20006|2016-06-07 00:00:00|5.02
102|20006|2016-06-08 00:00:00|6.02

Transactions sample data:

103|20007|2016-06-09 02:00:00|105508585836|2016-06-10 00:00:00|F|6.90|D|20160609
100|12345|2016-06-09 06:00:00|111454018830|2016-06-12 00:00:00|C|0.6|D|20160609
102|20006|2016-06-09 01:00:00|125508585836|2016-06-09 00:00:00|F|5.50|D|20160609
100|12345|2016-06-09 02:00:00|33042764824|2016-06-08 00:00:00|B|0.05|D|20160609
101|22222|2016-06-09 02:00:00|121554018830|2016-06-09 00:00:00|C|0.5|D|20160609
100|12345|2016-06-09 02:00:00|33042764825|2016-06-08 00:00:00|B|0.08|D|20160609
100|12345|2016-06-09 03:00:00|33042764830|2016-06-09 00:00:00|C|1.06|D|20160609
100|12345|2016-06-09 05:00:00|110451035129|2016-06-11 00:00:00|C|0.21|D|20160609
100|12345|2016-06-09 07:00:00|185508585836|2016-06-13 00:00:00|F|0.38|D|20160609
100|12345|2016-06-09 04:00:00|33042766082|2016-06-10 00:00:00|C|4.51|D|20160609
101|22222|2016-06-09 01:00:00|101554018830|2016-06-08 00:00:00|C|0.8|C|20160609

See below the job design I used to test my logic on dummy data. The first two components are tFixedflow, and I used the above sample data as my test data to validate that it worked correctly.

The initial steps are mainly to filter, sort the data correctly (by account number and transaction date), and retrieve the latest EOD balance for each account when it exists (otherwise, it means this account is new and it is the first time there has been a transaction in it). It also creates a unique transaction ID for each transaction row using a sequence function (available in the numeric library in Talend). This is not just for fun; one of the main headaches I experience was the understanding of the behavior of the analytics function, such as SUM() or LAST_VALUE(). And having a unique identifier to sort the data inside the partition is mandatory to get the result you want in some cases. This was the case for the SUM() function.

The first tSQLRow component happens right after the join of the data is done. Here is the content of it:

"select posting_transit, posting_acct_num, business_date, system_time, posting_date, business_date_indi, txn_amt, dr_cr_ind, proc_dt, end_of_day_bal, isknownaccnt,ROW_NUMBER() OVER(PARTITION BY posting_transit, posting_acct_num ORDER BY business_date, posting_date ASC) as rowNum,   sum(txn_amt) OVER(PARTITION BY posting_transit, posting_acct_num ORDER BY business_date, posting_date,seq ASC) as balance from out1"

The focus here is on the two functions and the partitioning of the data

  1. ROW_NUMBER(): return the row number in the order of appearance inside the partition created (so related to the ORDER BY operation).
  2. SUM(txn_amt): will sum the txn_amt value row by row while the ORDER BY criteria is unique. That's a critical step for the running balance calculation. If the order by criteria of the partition (here business_dateposting_dateseq ASC) were not unique — achieved thanks to the seq variable — then we will end up with a sum of all the txn_amt that happen on the same business_date and posting_date. Which is not what we want.

This is pretty well explained here with sample data.

When It Comes to Size, Partition Matters!

The OVER(PARTITION BY a,b ORDER BY c,d) instruction is key in our job. Because Spark is an in-memory framework that can be run in parallel in a grid, and especially in a Hadoop cluster, partitioning the data makes a lot of sense to be performant!

Without being able to give details, the work of the PARTITION BY a,b will be to select a group of data logically to apply a function only to that group of data.

In my use case, I wanted to partition the data by account number (aggregation of transit_number and acct_num). So I only apply my ORDER BY instruction and my analytic function SUM() or ROW_NUMBER() on that particular chunk of data. So, from 3M+ transactions data in one group, I will now have X number of transactions in Y number of partitions. Here, I would estimate the number of groups to be around 600 000 with an average of five transactions per PARTITION.

So instead of having a huge processing of all the data, I now have a very quick processing in parallel in-memory of numerous, very small groups of data.

Yeah, I know, that's cool! And that should also lead to a pretty good performance result! (Well, that depends on a lot of things, but this partitioning definitely helps!)

After that first tSQLRow, my data looks like this:

The SUM(txn_amnt) Mechanism

The data is sorted and grouped and the Double value you see (with a lot of numbers after the comma) is the running balance calculated without the EOD balance taken into consideration now. This incremental row-by-row behavior is really due to the fact that we ordered the partition with a unique identifier for each row (yes, I insist on that, as it just took me half a day to understand that and create that sequence seq). Not having that and you'll end up — like me at first sight — with a real sum of all the txn_amt with the same business_date and posting_date.

Look at the first seven rows — they are part of the same group (account: 100 12345).

  • 1.02 (row 1 EOD Balance) - 0,05 (row 1 txn_amount) = 0.97 (row1 balance)

  • 0,97 (row 1 balance - previously calculated-) - 0,08 (row 2 txn_amount) = 0,89 (row 2 balance)

  • 0,89 (row 2 balance - previously calculated-) - 1,06 (row 3 tx_amount) = - 0,17 (row 3 balance)

  • And so on!

The ROW_NUM() Mechanism

You can easily see the result of the row_num() function applied by the partition as it reset to 1 after each new transit_number accnt_number.

Last, But Not Least!

The final step of my job was to deal with the future transactions. These transactions don't require the running balance to calculate as the other (the current and backdated transactions). The future transaction (indicated with an F  inside the business_date_indi field) would require having the previous end of day balance value as their running balance value or "null" or the LAST value calculated before.

Let's say I already add three current or backdated transactions for my account 10012345 with an already calculated running balance. So, if I have one or many future transactions for the same account 10012345, then I want to set the running balance as the last calculated value for the running balance (yeah, I know... it sounds logical on your bank account report or your credit card report.).

That's where I looked at the function called LAST_VALUE(). And I used it in my last tSQLRow  component in my job.

See the content below:

"select posting_transit, posting_acct_num, business_date, system_time, posting_date, posting_date, business_date_indi, txn_amt, dr_cr_ind, proc_dt, end_of_day_bal, rowNum , balance,last_value(balance , true) OVER(PARTITION BY posting_transit, posting_acct_num ) as last_balance, max(F_tx_only) OVER(PARTITION BY posting_transit, posting_acct_num) as F_tx_only, isknownaccnt from bal ORDER BY posting_transit, posting_acct_num, business_date, system_time "

Let's focus on this particular piece:

last_value(balance , true) OVER(PARTITION BY posting_transit, posting_acct_num ) as last_balance

So, now you get the partition piece of the story. You'll notice the absence of the ORDER BY instruction. This is on purpose as using another ORDER BY here would result in an error. I think it's likely because we previously ORDER BY-ed the same partition, but honestly, there might be an explanation I don't get (comments appreciated).

So, last_value(balance) will return the last_value of the balance calculated inside that partition, whereas, in the case of a future transaction, we don't have any running balance calculated.

So one of the tricks I used was to set the "balance" to null in a previous step when it was a future transaction. But the last_value(balance) would still return that null value by default.

To avoid that, this makes the difference: last_value(balance, true). I hope you read until this point because it took me five hours of googling to get it right, and when I found that, that's when I decided to write my first blog article!

So, this , true parameter is telling the last_value() function to avoid the null values in its computation. So now you understand why I put those future transaction balance values to null before. It was to avoid them later!

(<private joke> for French readers. I tried the so-called "Rémy OffTheWood" trick and replaced "true" by "trou"...well it doesn't work, don't try it at home) </private joke>)


So, how did all this end up in terms of performance against our competition? Well, our first test of the job with no tuning at all took something like six minutes to compute, but of course, we wanted to do better, so we applied the Spark configuration tuning capabilities inside Talend jobs.

And in one round of tuning, we then turned in 2 minutes, 30 seconds execution time: the most performant result overall!

Next, our prospect tested against more volume and increased both transactions table volume and EOD balance table (70 million EOD_balance and 38 million for transactions).

With no change to the job (not even tuning properties), we ran in ten minutes!

So, basically multiplying by 10x the volume in both lookup and main data just took 4x the time to process without any change. This is what I call native scalability. Oh, and the competition wasn't even close.


I hope that quick read will avoid hours of Googling for answers to a common integration scenario like I did. This function inside Spark framework is great — and combined with Talend Studio's capacity, it's just awesome!

Ease-of-use-wise, it's just great to be able to reuse SQL skills and apply it to the big data world this easily! 

Here is the result of my job:

|posting_transit|posting_acct_num|business_date|system_time |posting_date |business_date_indi|txn_amt|dr_cr_ind|proc_dt |end_of_day_bal|running_balance|
|100 |12345 |1465437600000|33042764824 |1465344000000|B |-0.05 |D |20160609|1.02 |0.97 |
|100 |12345 |1465437600000|33042764825 |1465344000000|B |-0.08 |D |20160609|1.02 |0.89 |
|100 |12345 |1465441200000|33042764830 |1465430400000|C |-1.06 |D |20160609|1.02 |-0.17 |
|100 |12345 |1465444800000|33042766082 |1465516800000|C |-4.51 |D |20160609|1.02 |-4.68 |
|100 |12345 |1465448400000|110451035129|1465603200000|C |-0.21 |D |20160609|1.02 |-4.89 |
|100 |12345 |1465452000000|111454018830|1465689600000|C |-0.6 |D |20160609|1.02 |-5.49 |
|100 |12345 |1465455600000|185508585836|1465776000000|F |-0.38 |D |20160609|1.02 |-5.49 |
|101 |22222 |1465434000000|101554018830|1465344000000|C |0.8 |C |20160609|0.0 |0.8 |
|101 |22222 |1465437600000|121554018830|1465430400000|C |-0.5 |D |20160609|0.0 |0.3 |
|102 |20006 |1465434000000|125508585836|1465430400000|F |-5.5 |D |20160609|6.02 |6.02 |
|103 |20007 |1465437600000|105508585836|1465516800000|F |-6.9 |D |20160609|0.0 |null |


big data ,sql ,data analytics ,apache spark ,data processing ,tutorial

Published at DZone with permission of Adrien Lacombe , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}