Deep Dive Into Apache Flink's TumblingWindow - Part 1
Take a look at this tutorial that demonstrates how to use Apache Flink's TumblingWindow function to get meaningful info from streaming data.
Join the DZone community and get the full member experience.
Join For FreeIn this article, I will share coding examples some of the key aspects of TumblingWindow in Flink. Those not familiar with Flink streaming can get an introduction here.
Before we get into TumblingWindow, let us get a basic understanding of "Window" when it comes to stream processing or streaming computation. In a data stream you have a source that is continuously producing data, making it unfeasible to compute a final value.
In most use cases, to get meaningful information, two methods are preferred
- Computation is done for a finite set over time (e.g. HTTP 401 errors per minute)
- Computation is done as a rolling update (e.g. score-board, trending topics)
A “Window” defines a finite set of elements on an unbounded stream over which we can apply computations. This set can be based on time, element counts, a combination of counts and time, or some custom logic to assign elements to windows.
- number of orders received every minute (fixed time)
- average time to complete the last 100 orders (fixed elements)
Streaming framework vendors implement more than one variation of how a “Window” can be defined. Flink has three types (a) Tumbling (b) Sliding and (c) Session window out of which I will focus on the first one in this article.
TumblingWindow
This window is simple to understand and easy to get started with. It is a fixed size window where "size" is either a time (30 seconds, 5 minutes) or just count (100 elements).
A time window of 5 minutes will collect all elements arrived in a window and evaluate it after five minutes. A new window will be started every five minutes. A count window of 100 will collect 100 elements in a window and evaluate the window when the 100th element has been added.
Most importantly, there will be no overlapping of windows and no duplicate elements. Each element gets assigned to only one window. If you have specified a key then Flink will logically partition the stream and run parallel window operations for each keyed elements.
Let us see an example to understand them better. A simple "IntegerGenerator
" class is acting as a source producing an integer every second (starting from 1). The following lines initialize a local Flink environment and create a DataStream object.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> intStream = env.addSource( new IntegerGenerator() );
Tumbling Time window
xxxxxxxxxx
intStream
.timeWindowAll( Time.seconds( 5 ) )
.process( new ProcessAllWindowFunction<Integer, Integer ,TimeWindow>()
{
public void process( Context arg0, Iterable<Integer> input, Collector<Integer> output ) throws Exception
{
logger.info( "Computing sum for {}", input );
int sum = 0;
for(int i : input) {
sum += i;
}
output.collect( sum );
}
})
.print();
env.execute();
Line 2 - Defines a tumbling window of five seconds (fixed size by time)
Line 3 - Defines the computation (business logic) using Flink's ProcessAllWindowFunction API. Here I am simply computing a sum of all integers collected during a given window.
Note - ProcessAllWindowFunction
will let Flink buffer all the elements of a window in memory and then pass entire elements for computation. That is why you have an Iterable<>
object as an input parameter to process()
.
Line 13 - Returns the result of this window back to Flink for the next step which is just printing on console.
Following shows output of a sample run
Let's dissect the output here.
- Line #1 - #3 = Two integers were produced before the current window closed. Notice that even though we said five seconds, the first window didn't run for five seconds. The reason being, by default Flink, will round off to the nearest clock boundary which in our case occurred at "13:33:55". This triggered the Flink TriggerWindow to close the current window and pass it to the next step (Flink's operator).
- Line #4 = The
process()
method was called with all the elements [1, 2] and the sum '3' was printed to console - Line #5 - #10 = New window starts and the next set of integers are collected. After 5 seconds at "13:34:00", the window is closed. All the data collected is sent to process which prints the integers it received and computes the sum of the numbers in this window = '18'.
- Line #11 = The current window sum is printed to console.
- Similar logic is applied further from Line #12 onwards.
Note - The integers in the previous window are not present in this new window. The next window starts after the current window is closed (no overlapping and no duplicates).
Tumbling Count window
x
intStream
.countWindowAll( 4 )
.reduce( new ReduceFunction<Integer>()
{
public Integer reduce( Integer value1, Integer value2 ) throws Exception
{
logger.info( "Reducing {} and {}", value1, value2 );
return value1 + value2;
}
})
.print();
env.execute();
Using the same IntegerGenerator
as a source, above is an example of count window
Line 2 = Defines a Tumbling window of four elements (fixed size by count)
Line 3 = Defines the computation to be done on the elements of a window using Flink's ReduceFunction API. The logic is the same (sum of numbers)
Note - ReduceFunction
will let Flink perform incremental computation (rolling update). In memory footprint is very less compared to ProcessFunction. The first parameter is the computed value from the previous window and the Second parameter is the current element assigned to this window.
Line 4 = Print the final result of this window to console.
The following shows the output of a sample run:
Let us understand the output here
- Line #1 - #3 = First two integers are collected and then Flink triggers TriggerWindow which calls the reduce() method with the first two elements. The computed value '3' is buffered in Flink
- Line #4 = Next integer value '3' is produced by source.
- Line #5 =
reduce()
method is called, please note in here the first parameter is '3' from the previous calculation and the second parameter is the current integer produced by the source. The computed value '6' is buffered in Flink. - Line #6 = Next integer value '4' is produced by source.
- Line #7 =
reduce()
method is called with first parameter as '6' and second parameter as '4'. The computed value is 10 now. At this point, Flink has collected 4 integers from source and thus our count condition has been satisfied for this window. - Line #8 = Since the current window count size has been reached, Flink prints the value 10 (1+2+3+4) of this window.
- Line #9 - #10 = A new window starts and it waits for the next two integers from the source.
- Line #11 =
reduce()
is called with a new set of numbers which right now are 5 and 6
Similar logic is applied for the next two numbers and reduce()
is called accordingly to perform incremental updates.
Line #16 = When Flink gets 4 numbers for the current window, it calls print()
and outputs 26 (5+6+7+8)
Conclusion
In this article, we observed both types of TumblingWindow (time vs count) with their default behavior. We also saw two window functions, ProcessAllFunction and ReduceFunction for accumulative and incremental computation.
In the next part, I will share and discuss how to override some of the default behavior and how to handle late arrival to data.
Further Reading
Apache Flink Basic Transformation Example
Consuming Kafka Messages From Apache Flink
Opinions expressed by DZone contributors are their own.
Comments