Deep Dive Into Apache Flink's TumblingWindow — Part 2

DZone 's Guide to

Deep Dive Into Apache Flink's TumblingWindow — Part 2

In this post, I am going to provide examples of overriding these TumblingWindow defaults and handling late values.

· Big Data Zone ·
Free Resource

In my previous post, I shared examples of how to define and use TumblingWindow in its default behavior. In this post, I am going to provide examples of overriding these defaults and handling late values.

You may also like: Deep Dive Into Apache Flink's TumblingWindow — Part 1

Customizing Tumbling Window's Time Offset

Let us take an example as shown below (continuing from the previous post where Flink stream execution environment object has been created and a simple integer generator is our source).


The only change here is Line #2 which now instead of using "timeWindowAll( Time.seconds( 5 ) )" is now using more detailed "windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ), 

Time.seconds( 2 ) ) )"

TimeWindowAll() is a wrapper method and defaults to windowAll(TumblingProcessingTimeWindows.of(size)) i.e. A window of fixed size by time (this time is system's time where Flink job is running).

Note — Flink has more than one notion of time which I will discuss later in this post.

As I shared in the previous post, by default Flink starts window at clock boundary but using a second parameter to windowAll() we can customize the clock boundary.

Following shows, sample run for above code

Figure 1: Overriding default time offset

Line #1 — #5 = Flink starts a window, collects integers. However, at 19:26:37, this window closure triggers and sum of [1,2,3,4] is printed at Line #6

Note — If the offset was not provided then Flink would have closed the window at "19:26:35". But since the offset was 2 seconds, it made the window end at extra 2 seconds beyond clock boundary.

TumblingWindow With EventTime

So far in our discussion, we have taken 'time' as the default system time where Flink is executing the job. However, in many use cases, we want to use actual time of event i.e. when the event was created at the event source. To handle such scenarios Flink supports 3 ways to handle 'time'. Let us look at Event Time and how it can be used in Flink.

In Event time, elements are grouped into windows based on the timestamp of the element itself and not any system clock. Let us see an example.

First I defined a simple POJO class named "Element" as follows. I have used lombok to generate getters/setters for me through annotations.


Next, I define a simple Source class called "ElementGeneratorSource" which will create objects of type Element and assign random increasing timestamp. This is to ensure I do not produce an Element with matching system time. In practice, you would have the timestamp coming as part of the event itself.


Now, let us define a pipeline to process these elements using TumblingEventTime Windows. (I have removed class and method declaration lines to focus on important code blocks.)


Line #1 — Define a streaming execution environment to start with Flink streaming.

Line #4 — Needs to be set to EventTime otherwise Flink will ignore timestamp inside elements and use the default system clock.

Line #6 — Create a DataStream using the ElementGenerator as source (discussed earlier in this article)

Line #9 — Before defining a window, I need to inform Flink how to get timestamp and watermark for each element it receives. 

In this example, I am using a very handy class "AscendingTimestampExtractor" which as per Flink doc is, "A timestamp assigner and watermark generator for streams where timestamps are monotonously ascending. In this case, the local watermarks for the streams are easy to generate, because they strictly follow the timestamps." One more benefit of using this Flink-provided API is that it will generate a watermark for me. A watermark is a way for Flink to know when to close the current Window (last element belonging to a window has arrived).

In short,assignTimestampsAndWatermarks()will allow Flink to know how to read the timestamp from event/element coming to Flink and most important, how to compute watermarks.

 Line #17 — Define a window of type  TumblingEventTimeWindows with size as 10 seconds.

The rest of the code is similar to previous where we sum the values and print it.

Sample output of the above example

Figure 2: Output of TumblingEventWindow

Three elements are produced at Line #1, #2, #3 with timestamp different than that of the system clock. (the system clock time is printed first before log level).

When 3rd element is produced at "2020-02-22T22:22:02.495", it triggers current window closure because watermark has been breached. With a 10 seconds TimeWindow, the end time here will be "2020-02-22T22:21:59.000". So the current window collects only the first two values.

In the next run, the window will close at "2020-02-22T22:22:09.000" which means that value 3 and value 4 will be collected in a new window since Line #7 has an element with timestamp >= current watermark.


In this article, we discussed overriding the default time clock boundary and how to work with TumblingEventTimeWindow. We also saw an example of assigning timestamp to elements.

In the next part, I will share and discuss handling late elements that arrive after a watermark is generated for the current window.

Further Reading

Apache Flink Basic Transformation Example

Log Analysis 101 With Apache Flink, Spring Boot, and ActiveMQ

Streaming ETL With Apache Flink

flink ,streaming api ,streaming analytics ,flink api ,big data ,apache ,apache flink

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}