DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Deep Dive Into Apache Flink's TumblingWindow — Part 3
  • Deep Dive Into Apache Flink's TumblingWindow - Part 1
  • Migrating from React Router v5 to v6: A Comprehensive Guide
  • Playwright: Filter Visible Elements With locator.filter({ visible: true })

Trending

  • IoT and Cybersecurity: Addressing Data Privacy and Security Challenges
  • Distributed Consensus: Paxos vs. Raft and Modern Implementations
  • Scaling Microservices With Docker and Kubernetes on Production
  • Modern Test Automation With AI (LLM) and Playwright MCP
  1. DZone
  2. Coding
  3. Java
  4. Deep Dive Into Apache Flink's TumblingWindow — Part 2

Deep Dive Into Apache Flink's TumblingWindow — Part 2

In this post, I am going to provide examples of overriding these TumblingWindow defaults and handling late values.

By 
Preetdeep Kumar user avatar
Preetdeep Kumar
DZone Core CORE ·
Updated Mar. 27, 20 · Analysis
Likes (7)
Comment
Save
Tweet
Share
15.0K Views

Join the DZone community and get the full member experience.

Join For Free

In my previous post, I shared examples of how to define and use TumblingWindow in its default behavior. In this post, I am going to provide examples of overriding these defaults and handling late values.

You may also like: Deep Dive Into Apache Flink's TumblingWindow — Part 1

Customizing Tumbling Window's Time Offset

Let us take an example as shown below (continuing from the previous post where Flink stream execution environment object has been created and a simple integer generator is our source).

Java
 




xxxxxxxxxx
1
16


 
1
intStream
2
.windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ), Time.seconds( 2 ) ) )
3
.process( new ProcessAllWindowFunction<Integer, Integer ,TimeWindow>()
4
{
5
  @Override
6
  public void process( Context arg0, Iterable<Integer> input, Collector<Integer> output ) throws Exception
7
  {
8
    logger.info( "Computing sum for {}", input );
9
    int sum = 0;
10
    for(int i : input) {
11
      sum += i;
12
    }
13
    output.collect( sum );
14
  }
15
})
16
.print();



The only change here is Line #2 which now instead of using "timeWindowAll( Time.seconds( 5 ) )" is now using more detailed "windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ), 

Time.seconds( 2 ) ) )"

TimeWindowAll() is a wrapper method and defaults to windowAll(TumblingProcessingTimeWindows.of(size)) i.e. A window of fixed size by time (this time is system's time where Flink job is running).

Note — Flink has more than one notion of time which I will discuss later in this post.

As I shared in the previous post, by default Flink starts window at clock boundary but using a second parameter to windowAll() we can customize the clock boundary.

Following shows, sample run for above code

Figure 1: Overriding default time offset

Line #1 — #5 = Flink starts a window, collects integers. However, at 19:26:37, this window closure triggers and sum of [1,2,3,4] is printed at Line #6

Note — If the offset was not provided then Flink would have closed the window at "19:26:35". But since the offset was 2 seconds, it made the window end at extra 2 seconds beyond clock boundary.

TumblingWindow With EventTime

So far in our discussion, we have taken 'time' as the default system time where Flink is executing the job. However, in many use cases, we want to use actual time of event i.e. when the event was created at the event source. To handle such scenarios Flink supports 3 ways to handle 'time'. Let us look at Event Time and how it can be used in Flink.

In Event time, elements are grouped into windows based on the timestamp of the element itself and not any system clock. Let us see an example.

First I defined a simple POJO class named "Element" as follows. I have used lombok to generate getters/setters for me through annotations.

Java
 




xxxxxxxxxx
1
18


 
1
@Setter @Getter
2
public class Element
3
{
4
  Integer value;
5
  Long timestamp;
6

          
7
  public Element( int counter, long currTime )
8
  {
9
    this.value = counter;
10
    this.timestamp = currTime;
11
  }
12

          
13
  @Override
14
  public String toString()
15
  {
16
    return "" + value;
17
  }
18
}



Next, I define a simple Source class called "ElementGeneratorSource" which will create objects of type Element and assign random increasing timestamp. This is to ensure I do not produce an Element with matching system time. In practice, you would have the timestamp coming as part of the event itself.

Java
 




xxxxxxxxxx
1
41


 
1
class ElementGeneratorSource implements SourceFunction<Element>
2
{
3
  volatile boolean isRunning = true;
4
  final Logger logger = LoggerFactory.getLogger(ElementGeneratorSource.class);
5
    
6
  @Override
7
  public void run( SourceContext<Element> ctx ) throws Exception
8
  {
9
    int counter = 1;    
10
    
11
    // 20 seconds behind flink program's start time
12
    long eventStartTime = System.currentTimeMillis() - 20000;
13
        
14
    // create first event using above timestamp
15
    Element element = new Element(counter++, eventStartTime);
16
                                      
17
    while( isRunning )
18
    {
19
      logger.info("Produced Element with value {} and timestamp {}", element.getValue(), printTime(element.getTimestamp()));
20
            
21
      ctx.collect( element );
22
            
23
      // create elements and assign timestamp with randomness so that they are not same as current system clock time
24
      element = new Element(counter++, element.getTimestamp() + ThreadLocalRandom.current().nextLong( 1000, 6000 ));
25
            
26
      Thread.sleep( 1000 );      
27
      }
28
    }
29

          
30
  @Override
31
  public void cancel()
32
  {
33
    isRunning = false;
34
  }
35
    
36
  // helper function to print epoch time in readable format  
37
  String printTime(long longValue)
38
  {
39
    return LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneId.systemDefault()).toString();
40
  }
41
}



Now, let us define a pipeline to process these elements using TumblingEventTime Windows. (I have removed class and method declaration lines to focus on important code blocks.)

Java
 




x
34


 
1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2
  
3
// set to EventTime else it defaults to ProcessTime 
4
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
5
  
6
DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );
7

          
8
elementStream
9
.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
10
{
11
  @Override
12
  public long extractAscendingTimestamp( Element element )
13
  {
14
    return element.getTimestamp();
15
  }
16
})        
17
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 10 ) ) )        
18
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
19
{
20
  @Override
21
  public void process( Context arg0, Iterable<Element> input, Collector<Integer> output ) throws Exception
22
  {
23
    logger.info( "Computing sum for {}", input );
24
                
25
    int sum = 0;
26
    for(Element e : input) {
27
      sum += e.getValue();
28
    }
29
    output.collect( sum );
30
  }
31
})
32
.print();
33
        
34
env.execute();



Line #1 — Define a streaming execution environment to start with Flink streaming.

Line #4 — Needs to be set to EventTime otherwise Flink will ignore timestamp inside elements and use the default system clock.

Line #6 — Create a DataStream using the ElementGenerator as source (discussed earlier in this article)

Line #9 — Before defining a window, I need to inform Flink how to get timestamp and watermark for each element it receives. 

In this example, I am using a very handy class "AscendingTimestampExtractor" which as per Flink doc is, "A timestamp assigner and watermark generator for streams where timestamps are monotonously ascending. In this case, the local watermarks for the streams are easy to generate, because they strictly follow the timestamps." One more benefit of using this Flink-provided API is that it will generate a watermark for me. A watermark is a way for Flink to know when to close the current Window (last element belonging to a window has arrived).

In short,assignTimestampsAndWatermarks()will allow Flink to know how to read the timestamp from event/element coming to Flink and most important, how to compute watermarks.

 Line #17 — Define a window of type  TumblingEventTimeWindows with size as 10 seconds.

The rest of the code is similar to previous where we sum the values and print it.

Sample output of the above example

Figure 2: Output of TumblingEventWindow

Three elements are produced at Line #1, #2, #3 with timestamp different than that of the system clock. (the system clock time is printed first before log level).

When 3rd element is produced at "2020-02-22T22:22:02.495", it triggers current window closure because watermark has been breached. With a 10 seconds TimeWindow, the end time here will be "2020-02-22T22:21:59.000". So the current window collects only the first two values.

In the next run, the window will close at "2020-02-22T22:22:09.000" which means that value 3 and value 4 will be collected in a new window since Line #7 has an element with timestamp >= current watermark.

Conclusion

In this article, we discussed overriding the default time clock boundary and how to work with TumblingEventTimeWindow. We also saw an example of assigning timestamp to elements.

In the next part, I will share and discuss handling late elements that arrive after a watermark is generated for the current window.


Further Reading

Apache Flink Basic Transformation Example

Log Analysis 101 With Apache Flink, Spring Boot, and ActiveMQ

Apache Flink Element Clock (cryptography)

Opinions expressed by DZone contributors are their own.

Related

  • Deep Dive Into Apache Flink's TumblingWindow — Part 3
  • Deep Dive Into Apache Flink's TumblingWindow - Part 1
  • Migrating from React Router v5 to v6: A Comprehensive Guide
  • Playwright: Filter Visible Elements With locator.filter({ visible: true })

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!