DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Deep Dive Into Apache Flink's TumblingWindow — Part 2
  • Deep Dive Into Apache Flink's TumblingWindow — Part 3
  • Migrating from React Router v5 to v6: A Comprehensive Guide
  • Playwright: Filter Visible Elements With locator.filter({ visible: true })

Trending

  • Unlocking Data with Language: Real-World Applications of Text-to-SQL Interfaces
  • Building Reliable LLM-Powered Microservices With Kubernetes on AWS
  • Blue Skies Ahead: An AI Case Study on LLM Use for a Graph Theory Related Application
  • AI-Based Threat Detection in Cloud Security
  1. DZone
  2. Coding
  3. Java
  4. Deep Dive Into Apache Flink's TumblingWindow - Part 1

Deep Dive Into Apache Flink's TumblingWindow - Part 1

Take a look at this tutorial that demonstrates how to use Apache Flink's TumblingWindow function to get meaningful info from streaming data.

By 
Preetdeep Kumar user avatar
Preetdeep Kumar
DZone Core CORE ·
Feb. 20, 20 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
21.4K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, I will share coding examples some of the key aspects of TumblingWindow in Flink. Those not familiar with Flink streaming can get an introduction here. 

Before we get into TumblingWindow, let us get a basic understanding of  "Window" when it comes to stream processing or streaming computation.  In a data stream you have a source that is continuously producing data, making it unfeasible to compute a final value.  

In most use cases, to get meaningful information, two methods are preferred

  • Computation is done for a finite set over time (e.g. HTTP 401 errors per minute)
  • Computation is done as a rolling update (e.g. score-board, trending topics)

A “Window” defines a finite set of elements on an unbounded stream over which we can apply computations. This set can be based on time, element counts, a combination of counts and time, or some custom logic to assign elements to windows.

  • number of orders received every minute (fixed time)
  • average time to complete the last 100 orders (fixed elements)

Streaming framework vendors implement more than one variation of how a “Window” can be defined. Flink has three types (a) Tumbling (b) Sliding and (c) Session window out of which I will focus on the first one in this article.

TumblingWindow

This window is simple to understand and easy to get started with. It is a fixed size window where "size" is either a time  (30 seconds, 5 minutes)  or just count (100 elements).

Fig-1 : An example of Window

A time window of 5 minutes will collect all elements arrived in a window and evaluate it after five minutes. A new window will be started every five minutes. A count window of 100 will collect 100 elements in a window and evaluate the window when the 100th element has been added.

Most importantly, there will be no overlapping of windows and no duplicate elements. Each element gets assigned to only one window. If you have specified a key then Flink will logically partition the stream and run parallel window operations for each keyed elements.

Let us see an example to understand them better. A simple "IntegerGenerator" class is acting as a source producing an integer every second (starting from 1). The following lines initialize a local Flink environment and create a DataStream object.

Java
 




x


 
1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2
DataStream<Integer> intStream = env.addSource( new IntegerGenerator() );



Tumbling Time window

Java
 




xxxxxxxxxx
1
18


 
1
intStream
2
.timeWindowAll( Time.seconds( 5 ) )
3
.process( new ProcessAllWindowFunction<Integer, Integer ,TimeWindow>()
4
{
5
  @Override
6
  public void process( Context arg0, Iterable<Integer> input, Collector<Integer> output ) throws Exception
7
  {
8
    logger.info( "Computing sum for {}", input );
9
    int sum = 0;
10
    for(int i : input) {
11
      sum += i;
12
    }
13
    output.collect( sum );
14
  }
15
})
16
.print();
17
        
18
env.execute();



Line 2 - Defines a tumbling window of five seconds (fixed size by time)

Line 3 - Defines the computation (business logic) using Flink's ProcessAllWindowFunction API. Here I am simply computing a sum of all integers collected during a given window.

Note - ProcessAllWindowFunction will let Flink buffer all the elements of a window in memory and then pass entire elements for computation. That is why you have an  Iterable<> object as an input parameter to  process() .

Line 13 - Returns the result of this window back to Flink for the next step which is just printing on console.

Following shows output of a sample run

Fig-2: Output of TumblingTimeWindow example


Let's dissect the output here.

  • Line #1 - #3  = Two integers were produced before the current window closed. Notice that even though we said five seconds, the first window didn't run for five seconds. The reason being, by default Flink, will round off to the nearest clock boundary which in our case occurred at "13:33:55". This triggered the Flink TriggerWindow to close the current window and pass it to the next step (Flink's operator). 
  • Line #4 = The  process() method was called with all the elements [1, 2] and the sum '3' was printed to console
  • Line #5 - #10 = New window starts and the next set of integers are collected. After 5 seconds at "13:34:00", the window is closed. All the data collected is sent to process which prints the integers it received and computes the sum of the numbers in this window = '18'. 
  • Line #11 = The current window sum is printed to console.
  • Similar logic is applied further from Line #12 onwards.

Note - The integers in the previous window are not present in this new window. The next window starts after the current window is closed (no overlapping and no duplicates).

Tumbling Count window

Java
 




x


 
1
intStream
2
.countWindowAll( 4 )
3
.reduce( new ReduceFunction<Integer>()
4
{
5
  @Override
6
  public Integer reduce( Integer value1, Integer value2 ) throws Exception
7
  {
8
    logger.info( "Reducing {} and {}", value1, value2 );
9
    return value1 + value2;
10
  }
11
})
12
.print();
13

          
14
env.execute();


 

Using the same IntegerGenerator as a source, above is an example of count window

Line 2 = Defines a Tumbling  window of four elements (fixed size by count)

Line 3 = Defines the computation to be done on the elements of a window using Flink's ReduceFunction API. The logic is the same (sum of numbers)

Note -  ReduceFunction will let Flink perform incremental computation (rolling update). In memory footprint is very less compared to ProcessFunction. The first parameter is the computed value from the previous window and the Second parameter is the current element assigned to this window.

Line 4 = Print the final result of this window to console.

The following shows the output of a sample run:

Fig-3: Output of TumblingCountWindow example


Let us understand the output here

  • Line #1 - #3 = First two integers are collected and then Flink triggers TriggerWindow which calls the reduce() method with the first two elements.  The computed value '3' is buffered in Flink
  • Line #4 = Next integer value '3' is produced by source.
  • Line #5  =   reduce() method is called, please note in here the first parameter is '3' from the previous calculation and the second parameter is the current integer produced by the source. The computed value '6' is buffered in Flink.
  • Line #6 = Next integer value '4' is produced by source.
  • Line #7 =  reduce() method is called with first parameter as '6' and second parameter as '4'. The computed value is 10 now. At this point, Flink has collected 4 integers from source and thus our count condition has been satisfied for this window.
  • Line #8 = Since the current window count size has been reached, Flink prints the value 10 (1+2+3+4) of this window.
  • Line #9 - #10 = A new window starts and it waits for the next two integers from the source.
  • Line #11 = reduce() is called with a new set of numbers which right now are 5 and 6

Similar logic is applied for the next two numbers and reduce() is called accordingly to perform incremental updates.

Line #16 = When Flink gets 4 numbers for the current window, it calls print() and outputs 26 (5+6+7+8)

Conclusion

In this article, we observed both types of TumblingWindow (time vs count) with their default behavior. We also saw two window functions, ProcessAllFunction and ReduceFunction for accumulative and incremental computation.

In the next part, I will share and discuss how to override some of the default behavior and how to handle late arrival to data.

Further Reading

Apache Flink Basic Transformation Example

Consuming Kafka Messages From Apache Flink


Apache Flink Element

Opinions expressed by DZone contributors are their own.

Related

  • Deep Dive Into Apache Flink's TumblingWindow — Part 2
  • Deep Dive Into Apache Flink's TumblingWindow — Part 3
  • Migrating from React Router v5 to v6: A Comprehensive Guide
  • Playwright: Filter Visible Elements With locator.filter({ visible: true })

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!