Stream data processing, a type of data processing that is designed for infinite data sets, is an efficient way to deal with ever-growing, unbounded data that is increasingly common as an object of analysis for many businesses. In fact, more than 85% of Presto queries on Treasure Data are recurring queries that process growing data sets at scheduled intervals.
This post will walk through the implementation of a type of stream data processing that uses incremental queries to generate intermediate tables that speed up the data analysis of continuously growing data.
Problem at Hand
For this task, you may be writing queries that look like this:
Then, that daily summary may be used for a multitude of other data analytics like this.
Get Updates on Analytics, Data, and Open Source
You notice that:
- The entire data of IP address from the first visitor to the most recent one is processed over and over again for each query.
- Each of these queries takes over an hour to finish.
These are the signs that those processes can be sped up by using stream data processing.
The inefficient query that is run repeatedly looks like this:
— inefficient query. Computing everything from scratch SELECT TD_DATE_TRUNK(‘day’, time, ‘PST’) AS time, userid, TD_IP_TO_COUNTRY_CODE(ip) country FROM visitor_raw
The query does not specify a timeframe and as a result, processes the entire data which takes time and slows down all subsequent processes.
Stream Data Processing
The cumbersome process of processing raw data (for example, IP address to country) can be done much more efficiently by using intermediate tables where only the "new" data (data that came in since the last query) is processed and appended to the rest of the processed data, rather than processing the entire data every time. The process goes:
- Create the basis of an intermediate table by processing all the data you have up to a certain point in time. Example: Query the IP address of visitors from day 1 up to July 1, 2017 to convert them to countries and output the result to the intermediate table.
- Schedule a query that processes new data that came in since the last query and appends to the intermediate table Example: Every day, query the IP address of visitors in the past day to convert them to countries and append the result to the intermediate table.
- Query the intermediate table for aggregating data. Example: query the intermediate table to create a daily summary of all visitors by country.
- (Bonus) Create a Treasure Workflow.
The process looks like this:
ΔT = IP address of new visitors since the previous query.
ΔT’ = country of new visitors since the previous query.
T’ = intermediate table.
This way, the queries that are run daily (in red) handle much less data than the previous method that queried the entire data set every time. As a result, analyses that used to take over an hour could be shortened to minutes.
Let’s look at each step in detail.
1. Process All Data on Hand
First, we create the basis for the intermediate table by processing the raw data up to a certain point in time — in this case, one day before the current date. This will be the only time the entire data set would be queried.
Sample database: sample_db Intermediate table: visitor_country Raw data table: visitor_raw — create basis for intermediate table CREATE TABLE visitor_country AS SELECT TD_DATE_TRUNK(‘day’, time, ‘PST’) AS time, userid, TD_IP_TO_COUNTRY_CODE(ip) country FROM visitor_raw WHERE TD_TIME_RANGE(time, null, ‘2017-07-01’, ‘PST’)
2. Schedule and Append to Intermediate Table
Next, we schedule a query that takes in the raw data generated within one day since the last query, and outputs the processed data. The processed new data would be appended to the intermediate table. This will be repeated daily, each time updating the intermediate table with additional processed data since the previous query.
INSERT INTO visitor_country SELECT TD_DATE_TRUNK(‘day’, time, ‘PST’) AS time, userid, TD_IP_TO_COUNTRY_CODE(ip) country FROM visitor_raw WHERE TD_TIME_RANGE(time, TD_TIME_ADD(TD_SCHEDULED_TIME(), ‘-1d’), TD_SCHEDULED_TIME(), ‘PST’);
As a result, the resulting table consists of the website access time and originating country of each website visitor up to the point in time this incremental query is run.
3. Aggregate the Intermediate Table
The intermediate table can be queried to generate summaries of the data, pushed to data analytics integrations, and more. Since the time-consuming conversion from IP address to the country is already being done efficiently on the intermediate table, queries that involved the conversion of IP address to the country that took over an hour to process could be sped up to minutes. Here, we create the summary for the number of visitors from each country.
— Aggregation of the intermediate table SELECT country, approx_distinct(userid) num_users FROM vistitor_country GROUP BY 1
4. (Bonus) Create a Treasure Workflow
Steps 2 and 3 can be made into a Treasure Workflow which enables you to define repeatable sets of dependent queries. This way, the incremental data processing and the resulting data export can be organized into a set of scheduled procedure that is less prone to erroneous handling and has reduced end-to-end latency.
Things to Note
The use of incremental query and intermediate tables outlined above hopefully makes intuitive sense — just count the new data and append to the rest rather than count the entire growing dataset every time. The deeper world of streaming data processing involves topics such as windowing to account for lags between event time and processing time for event-time-sensitive unbounded data, but that would be a topic for another blog post.