DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Beyond Algorithms: The Human Element in AI-Driven Cybersecurity
  • Building a Real-Time Data Mesh With Apache Iceberg and Flink
  • Migrating from React Router v5 to v6: A Comprehensive Guide
  • Playwright: Filter Visible Elements With locator.filter({ visible: true })

Trending

  • Build Self-Managing Data Pipelines With an LLM Agent
  • The Hidden Bottlenecks That Break Microservices in Production
  • Visualizing Matrix Multiplication as a Linear Combination
  • Detecting Advanced Persistent Threats Using Behavioral Analytics and Log Correlation
  1. DZone
  2. Coding
  3. Java
  4. Deep Dive Into Apache Flink's TumblingWindow — Part 3

Deep Dive Into Apache Flink's TumblingWindow — Part 3

In this article, see how to use allowedLateness to handle late elements that arrive after a watermark is generated for the current window and more.

By 
Preetdeep Kumar user avatar
Preetdeep Kumar
·
Mar. 24, 20 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
13.9K Views

Join the DZone community and get the full member experience.

Join For Free

Part-1 | Part-2

To Handle Late Arrivals

In practice, elements or events do not arrive in the sequence they were generated. This can be due to external factors such as network delay or user went off-network. This is a common problem in streaming computation and must be handled either by streaming framework or the application itself. 

**My usage of words 'elements', 'events', or 'messages' in this post represents a unit of immutable data being generated at source and is available as output from the stream

So what is lateness here, as per Flink, "Late elements are elements that arrive after the system’s event time clock (as signaled by the watermarks) has already passed the time of the late element’s timestamp". In simple terms, if the window ends at 15:00:00, then any element arriving into the Flink system post this timestamp is late. By default, late elements are dropped by Flink.

To allow users to handle late elements, Flink provides a method called "allowedLateness". You can specify how a windowed transformation should deal with late elements and how much lateness is allowed. Elements that arrive within the allowed lateness are still put into windows and are considered when computing window results. If elements arrive after the allowed lateness they will be dropped. 

Note - Setting allowed lateness is only valid for event-time windows.

Example 1

I am using "ElementGeneratorSource" (from the previous post) class as my source, I tweaked it to manually generate some late elements as follows -

Assuming current system time as 2020-03-16 13:42:40 

Element Timestamp 5 seconds window
1 2020-03-16T13:42:20
2 2020-03-16T13:42:21
3 2020-03-16T13:42:26 Triggers 1st watermark
4 2020-03-16T13:42:23 1st late element
5 2020-03-16T13:42:24 2nd late element
6 2020-03-16T13:42:28
7 2020-03-16T13:42:39 Triggers 2nd watermark


Now the Flink code to handle late elements

Java
 




x
29


 
1
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2
DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );
3

          
4
elementStream
5
.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
6
{
7
  @Override
8
  public long extractAscendingTimestamp( Element element )
9
  {
10
    return element.getTimestamp();
11
  }
12
})        
13
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 5 ) ) )
14
.allowedLateness( Time.seconds( 3 ) )
15
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
16
{
17
  @Override
18
  public void process( Context arg0, Iterable<Element> input, Collector<Integer> output ) throws Exception
19
  {
20
    logger.info( "Computing sum for {}", input );                
21
    int sum = 0;
22
    for(Element e : input) {
23
      sum += e.getValue();
24
    }
25
    output.collect( sum );
26
  }
27
})
28
.print();
29
env.execute();


 

Similar to EventTimestamp example from the previous example except for .allowedLateness( Time.seconds( 3 ) )

I have provided lateness of 3 seconds to take those two late elements from the above table.

A sample run produces the following output


A bit of a surprise here — I was expecting that Flink would create one window for [1, 2, 4, 5] and another for [3, 6], but instead, I got multiple overlapping windows (sort of sliding window having late elements).

Searching this question brought me to this stackoverflow post, which helped me..."Because of the allowedLateness, any late events (within the period of allowed lateness) will cause late (or in other words, extra) firings of the relevant windows. With the default EventTimeTrigger (which you appear to be using), each late event causes an additional window firing, and an updated window result will be emitted"

In simple terms, "allowedLateness() with default EventTimeTrigger will create more than one windows - first with elements within a normal window and second = first_window + late_element". Since I am using the default EventTimeTrigger (didn't provided any custom trigger), the above behavior is an expected one. Another option is to define a custom trigger which will not fire a window till getCurrentWatermark < window_time + allowed_lateness

Example 2

Another recommended approach to handle late arrivals is to use the sideOutputLateData method. There are two benefits of using an "OutputTag" approach:

1. You can separate normal stream and late streams

2. You can put any data which was not processed correctly in the normal stream to another stream for debugging and triage purposes (sort of dead letter queue)

Let's see an example here

Java
 




xxxxxxxxxx
1
42


 
1
final OutputTag<Element> outputLateTag = new OutputTag<Element>("side-output-late") {};
2

          
3
final OutputTag<Element> outputErrTag = new OutputTag<Element>("side-output-error") {};
4

          
5
SingleOutputStreamOperator<Integer> stream = 
6
        elementStream
7
        .assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
8
        {
9
            @Override
10
            public long extractAscendingTimestamp( Element element )
11
            {
12
                return element.getTimestamp();
13
            }
14
        })
15
        .windowAll( TumblingEventTimeWindows.of( Time.seconds( 5 ) ) )
16
        .sideOutputLateData( outputLateTag )
17
        .process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
18
        {
19
            @Override
20
            public void process( Context ctx, Iterable<Element> input, Collector<Integer> output ) throws Exception
21
            {
22
                logger.info( "Computing sum for {}", input);              
23
                int sum = 0;
24
                for(Element e : input) 
25
                {
26
                  sum += e.getValue();
27
                  
28
                  // send to a side stream
29
                  ctx.output(outputErrTag, e));
30
                }
31
                output.collect( sum );
32
            }
33
        });
34

          
35
// get late and error streams as side output
36
DataStream<Element> lateStream = stream.getSideOutput( outputLateTag );
37
DataStream<Element> errStream = stream.getSideOutput( outputErrTag );
38

          
39
// print to console
40
stream.print();
41
lateStream.print();
42
errStream.print();



Line #1, #3 = Define two side output tags, one for handling late elements and the other one to handle any error cases

Line #16 = Declare that you want every late element in a side output tagged as "outputLateTag"

Line #29 = For the sake of this example, I am writing every element in an "outputErrTag"

Line #36, #37 = Get access to the side streams

Line #40 - #42 = Print the result to console

Conclusion

In this article, we discussed using allowedLateness to handle late elements that arrive after a watermark is generated for the current window. Plus, we saw how to use the "sideOuput" approach to redirect late elements to an entirely new stream.

Element Apache Flink

Opinions expressed by DZone contributors are their own.

Related

  • Beyond Algorithms: The Human Element in AI-Driven Cybersecurity
  • Building a Real-Time Data Mesh With Apache Iceberg and Flink
  • Migrating from React Router v5 to v6: A Comprehensive Guide
  • Playwright: Filter Visible Elements With locator.filter({ visible: true })

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook