Applying SQL Analytics and Windowing Functions to Spark Data Processing
Applying SQL Analytics and Windowing Functions to Spark Data Processing
The purpose of this post is to share my latest experience with Talend in the field, which is also the first time I have gotten to see the capacity Talend has...
Join the DZone community and get the full member experience.Join For Free
The purpose of this post is to share my latest experience with Talend in the field, which is also the first time I have gotten to see the capacity Talend has to perform SQL queries inside any Talend big data batch jobs using the Spark framework. In doing so, I want to teach you how to apply SQL analytics and windowing functions to process data inside Spark!
Depending on how familiar you are with the Talend platform, you may or may not know about how our big data integration solution gives developers and power users the ability to generate code that is natively executable on a Hadoop cluster; whether it's MapReduce, Spark, Spark Streaming, or Storm.
Technically, Talend will not require an agent to be installed on your Hadoop cluster; a connection to YARN is the only prerequisite. This is the beauty of using a solution based on open-source standards that have always taken a no-vendor-lock-in approach.
Because Talend's framework is open, users will be able to inject code inside their integration jobs. Most of the time, they will reuse a Java routine when they can't easily achieve a processing step on the data they are working with. In today's blog, I want to focus on how to leverage Spark SQL code within a big data Spark job, and why it matters a lot!
The Need for Speed!
Everything started with what looked like a simple use case for a recent big data POC. It was a classic banking use case where we needed to show how to calculate a running balance from transaction data. Basically, we had two data sources:
- The list of transactions of the day to process (3M+ records)
- The historical table of end-of-the-day balance for all the accounts in the bank (8M+ records)
While this scenario would have been simple on the whiteboard, when it finally came time to implement the use case in Talend Studio, and it must be performant, this is when the rubber met the road!
Being kind of a brute-force approach guy, I thought:
"Get the transactions table sorted, add a sequence number, get the end-of-day balance value you need from that 9M+ rows historic table, then put that in the cache (thanks to a nice
tCacheOutputcomponent). Then you'll need to do a join between your cached transactions table and the original transactions table, get some join there and BOOM, calculation, computation, magic, BOOM, result."
Of course, that sounds a little bit of an overkill, as it would have required the use a lot of memory and the need to compare a lot of rows together just to get the previous amount of each row. This approach wouldn't have worked, anyway, given that the prospect informed us, "Oh, by the way, your competition did that in three minutes."
All right, it was time to think about a smarter way, brute force has its limitations after all!
SQL Analytics Function Baby!
When it comes to smart stuff in big data, let the truth be told, my peers don't look at me first, they look at our internal Black Belt Bullet Proof Certified Fellows here at Talend. I did the same, got this Jedi-like response: "Little Padawan, the way of tSQLRow, take you must".
My Jedi Data Master started getting quite excited when the challenge came so he helped to build the first draft of jobs where he was using a tSQLRow.
The beauty of tSQLRow in a Spark job is that you can use SQL queries inside your job, and that query will apply ON THE JOB DATA! Yeah, yeah, not a database or any other hybrid stuff, the actual data that is gathered inside your job! So all the
COUNT(), and other funny operations can be done through that SQL API inside a job. Yes, that's cool!
These specifics functions exist since version Spark 1.4+; Talend is already at Spark 2.1, so it's usable there.
Thinking about the use case, it was about:
- Getting the latest value for an end-of-day balance
- Summing up transaction amount row by row inside each account
Not to mention the need to have some temporary variable to deal with the other constraints of the use case that are not part of the explanation here (i.e. dealing with figure transactions values, generating a row number, etc.).
And this is where I discovered the existence of analytics and windowing functions in SQL; probably not a surprise for some of you reading this article, but a totally new discovery for me!
This where I started getting my hands dirty, and I must say, I just couldn't get enough!
Partition it, sort it, window it, compute it, filter it, shuffle it...
First, let's have a look at the data used for my local test with local Spark engine on my Talend Studio (BTW, I'm using Talend Studio 6.3.1 — the Big Data Platform edition).
EOD balance sample data:
100|12345|2016-06-08 00:00:00|1.02 100|12345|2016-06-07 00:00:00|0.02 102|20006|2016-06-07 00:00:00|5.02 102|20006|2016-06-08 00:00:00|6.02
Transactions sample data:
103|20007|2016-06-09 02:00:00|105508585836|2016-06-10 00:00:00|F|6.90|D|20160609 100|12345|2016-06-09 06:00:00|111454018830|2016-06-12 00:00:00|C|0.6|D|20160609 102|20006|2016-06-09 01:00:00|125508585836|2016-06-09 00:00:00|F|5.50|D|20160609 100|12345|2016-06-09 02:00:00|33042764824|2016-06-08 00:00:00|B|0.05|D|20160609 101|22222|2016-06-09 02:00:00|121554018830|2016-06-09 00:00:00|C|0.5|D|20160609 100|12345|2016-06-09 02:00:00|33042764825|2016-06-08 00:00:00|B|0.08|D|20160609 100|12345|2016-06-09 03:00:00|33042764830|2016-06-09 00:00:00|C|1.06|D|20160609 100|12345|2016-06-09 05:00:00|110451035129|2016-06-11 00:00:00|C|0.21|D|20160609 100|12345|2016-06-09 07:00:00|185508585836|2016-06-13 00:00:00|F|0.38|D|20160609 100|12345|2016-06-09 04:00:00|33042766082|2016-06-10 00:00:00|C|4.51|D|20160609 101|22222|2016-06-09 01:00:00|101554018830|2016-06-08 00:00:00|C|0.8|C|20160609
See below the job design I used to test my logic on dummy data. The first two components are
tFixedflow, and I used the above sample data as my test data to validate that it worked correctly.
The initial steps are mainly to filter, sort the data correctly (by account number and transaction date), and retrieve the latest EOD balance for each account when it exists (otherwise, it means this account is new and it is the first time there has been a transaction in it). It also creates a unique transaction ID for each transaction row using a sequence function (available in the numeric library in Talend). This is not just for fun; one of the main headaches I experience was the understanding of the behavior of the analytics function, such as
LAST_VALUE. And having a unique identifier to sort the data inside the partition is mandatory to get the result you want in some cases. This was the case for the
tSQLRow component happens right after the join of the data is done. Here is the content of it:
"select posting_transit, posting_acct_num, business_date, system_time, posting_date, business_date_indi, txn_amt, dr_cr_ind, proc_dt, end_of_day_bal, isknownaccnt,ROW_NUMBER() OVER(PARTITION BY posting_transit, posting_acct_num ORDER BY business_date, posting_date ASC) as rowNum, sum(txn_amt) OVER(PARTITION BY posting_transit, posting_acct_num ORDER BY business_date, posting_date,seq ASC) as balance from out1"
The focus here is on the two functions and the partitioning of the data
ROW_NUMBER(): return the row number in the order of appearance inside the partition created (so related to the
SUM(txn_amt): will sum the
txn_amtvalue row by row while the
ORDER BYcriteria is unique. That's a critical step for the running balance calculation. If the order by criteria of the partition (here
seq ASC) were not unique — achieved thanks to the seq variable — then we will end up with a sum of all the
txn_amtthat happen on the same
posting_date. Which is not what we want.
This is pretty well explained here with sample data.
When It Comes to Size, Partition Matters!
OVER(PARTITION BY a,b ORDER BY c,d) instruction is key in our job. Because Spark is an in-memory framework that can be run in parallel in a grid, and especially in a Hadoop cluster, partitioning the data makes a lot of sense to be performant!
Without being able to give details, the work of the
PARTITION BY a,b will be to select a group of data logically to apply a function only to that group of data.
In my use case, I wanted to partition the data by account number (aggregation of
acct_num). So I only apply my
ORDER BY instruction and my analytic function
ROW_NUMBER() on that particular chunk of data. So, from 3M+ transactions data in one group, I will now have X number of transactions in Y number of partitions. Here, I would estimate the number of groups to be around 600 000 with an average of five transactions per
So instead of having a huge processing of all the data, I now have a very quick processing in parallel in-memory of numerous, very small groups of data.
Yeah, I know, that's cool! And that should also lead to a pretty good performance result! (Well, that depends on a lot of things, but this partitioning definitely helps!)
After that first
tSQLRow, my data looks like this:
The SUM(txn_amnt) Mechanism
The data is sorted and grouped and the
Double value you see (with a lot of numbers after the comma) is the running balance calculated without the EOD balance taken into consideration now. This incremental row-by-row behavior is really due to the fact that we ordered the partition with a unique identifier for each row (yes, I insist on that, as it just took me half a day to understand that and create that sequence
seq). Not having that and you'll end up — like me at first sight — with a real sum of all the
txn_amt with the same
Look at the first seven rows — they are part of the same group (account: 100 12345).
1.02 (row 1 EOD Balance) - 0,05 (row 1 txn_amount) = 0.97 (row1 balance)
0,97 (row 1 balance - previously calculated-) - 0,08 (row 2 txn_amount) = 0,89 (row 2 balance)
0,89 (row 2 balance - previously calculated-) - 1,06 (row 3 tx_amount) = - 0,17 (row 3 balance)
And so on!
The ROW_NUM() Mechanism
You can easily see the result of the
row_num() function applied by the partition as it reset to 1 after each new
Last, But Not Least!
The final step of my job was to deal with the future transactions. These transactions don't require the running balance to calculate as the other (the current and backdated transactions). The future transaction (indicated with an
F inside the
business_date_indi field) would require having the previous end of day balance value as their running balance value or "null" or the LAST value calculated before.
Let's say I already add three current or backdated transactions for my account 10012345 with an already calculated running balance. So, if I have one or many future transactions for the same account 10012345, then I want to set the running balance as the last calculated value for the running balance (yeah, I know... it sounds logical on your bank account report or your credit card report.).
That's where I looked at the function called
LAST_VALUE(). And I used it in my last
tSQLRow component in my job.
See the content below:
"select posting_transit, posting_acct_num, business_date, system_time, posting_date, posting_date, business_date_indi, txn_amt, dr_cr_ind, proc_dt, end_of_day_bal, rowNum , balance,last_value(balance , true) OVER(PARTITION BY posting_transit, posting_acct_num ) as last_balance, max(F_tx_only) OVER(PARTITION BY posting_transit, posting_acct_num) as F_tx_only, isknownaccnt from bal ORDER BY posting_transit, posting_acct_num, business_date, system_time "
Let's focus on this particular piece:
last_value(balance , true) OVER(PARTITION BY posting_transit, posting_acct_num ) as last_balance
So, now you get the partition piece of the story. You'll notice the absence of the
ORDER BY instruction. This is on purpose as using another
ORDER BY here would result in an error. I think it's likely because we previously
ORDER BY-ed the same partition, but honestly, there might be an explanation I don't get (comments appreciated).
last_value(balance) will return the
last_value of the balance calculated inside that partition, whereas, in the case of a future transaction, we don't have any running balance calculated.
So one of the tricks I used was to set the "balance" to null in a previous step when it was a future transaction. But the
last_value(balance) would still return that null value by default.
To avoid that, this makes the difference:
last_value(balance, true). I hope you read until this point because it took me five hours of googling to get it right, and when I found that, that's when I decided to write my first blog article!
, true parameter is telling the
last_value() function to avoid the null values in its computation. So now you understand why I put those future transaction balance values to null before. It was to avoid them later!
(<private joke> for French readers. I tried the so-called "Rémy OffTheWood" trick and replaced "true" by "trou"...well it doesn't work, don't try it at home) </private joke>)
So, how did all this end up in terms of performance against our competition? Well, our first test of the job with no tuning at all took something like six minutes to compute, but of course, we wanted to do better, so we applied the Spark configuration tuning capabilities inside Talend jobs.
And in one round of tuning, we then turned in 2 minutes, 30 seconds execution time: the most performant result overall!
Next, our prospect tested against more volume and increased both transactions table volume and EOD balance table (70 million
EOD_balance and 38 million for transactions).
With no change to the job (not even tuning properties), we ran in ten minutes!
So, basically multiplying by 10x the volume in both lookup and main data just took 4x the time to process without any change. This is what I call native scalability. Oh, and the competition wasn't even close.
I hope that quick read will avoid hours of Googling for answers to a common integration scenario like I did. This function inside Spark framework is great — and combined with Talend Studio's capacity, it's just awesome!
Ease-of-use-wise, it's just great to be able to reuse SQL skills and apply it to the big data world this easily!
Here is the result of my job:
|=-----+------+-----+----+-----+------+---+---+---+-----+-----=| |posting_transit|posting_acct_num|business_date|system_time |posting_date |business_date_indi|txn_amt|dr_cr_ind|proc_dt |end_of_day_bal|running_balance| |=-----+------+-----+----+-----+------+---+---+---+-----+-----=| |100 |12345 |1465437600000|33042764824 |1465344000000|B |-0.05 |D |20160609|1.02 |0.97 | |100 |12345 |1465437600000|33042764825 |1465344000000|B |-0.08 |D |20160609|1.02 |0.89 | |100 |12345 |1465441200000|33042764830 |1465430400000|C |-1.06 |D |20160609|1.02 |-0.17 | |100 |12345 |1465444800000|33042766082 |1465516800000|C |-4.51 |D |20160609|1.02 |-4.68 | |100 |12345 |1465448400000|110451035129|1465603200000|C |-0.21 |D |20160609|1.02 |-4.89 | |100 |12345 |1465452000000|111454018830|1465689600000|C |-0.6 |D |20160609|1.02 |-5.49 | |100 |12345 |1465455600000|185508585836|1465776000000|F |-0.38 |D |20160609|1.02 |-5.49 | |101 |22222 |1465434000000|101554018830|1465344000000|C |0.8 |C |20160609|0.0 |0.8 | |101 |22222 |1465437600000|121554018830|1465430400000|C |-0.5 |D |20160609|0.0 |0.3 | |102 |20006 |1465434000000|125508585836|1465430400000|F |-5.5 |D |20160609|6.02 |6.02 | |103 |20007 |1465437600000|105508585836|1465516800000|F |-6.9 |D |20160609|0.0 |null | '-----+------+-----+----+-----+------+---+---+---+-----+-----'
Published at DZone with permission of Adrien Lacombe , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.