{{announcement.body}}
{{announcement.title}}

Deep Dive Into Apache Flink's TumblingWindow — Part 3

DZone 's Guide to

Deep Dive Into Apache Flink's TumblingWindow — Part 3

In this article, see how to use allowedLateness to handle late elements that arrive after a watermark is generated for the current window and more.

· Integration Zone ·
Free Resource

Part-1 | Part-2

To Handle Late Arrivals

In practice, elements or events do not arrive in the sequence they were generated. This can be due to external factors such as network delay or user went off-network. This is a common problem in streaming computation and must be handled either by streaming framework or the application itself. 

**My usage of words 'elements', 'events', or 'messages' in this post represents a unit of immutable data being generated at source and is available as output from the stream

So what is lateness here, as per Flink, "Late elements are elements that arrive after the system’s event time clock (as signaled by the watermarks) has already passed the time of the late element’s timestamp". In simple terms, if the window ends at 15:00:00, then any element arriving into the Flink system post this timestamp is late. By default, late elements are dropped by Flink.

To allow users to handle late elements, Flink provides a method called "allowedLateness". You can specify how a windowed transformation should deal with late elements and how much lateness is allowed. Elements that arrive within the allowed lateness are still put into windows and are considered when computing window results. If elements arrive after the allowed lateness they will be dropped. 

Note - Setting allowed lateness is only valid for event-time windows.

Example 1

I am using "ElementGeneratorSource" (from the previous post) class as my source, I tweaked it to manually generate some late elements as follows -

Assuming current system time as 2020-03-16 13:42:40 

Element Timestamp 5 seconds window
1 2020-03-16T13:42:20
2 2020-03-16T13:42:21
3 2020-03-16T13:42:26 Triggers 1st watermark
4 2020-03-16T13:42:23 1st late element
5 2020-03-16T13:42:24 2nd late element
6 2020-03-16T13:42:28
7 2020-03-16T13:42:39 Triggers 2nd watermark


Now the Flink code to handle late elements

Java
 






 

Similar to EventTimestamp example from the previous example except for .allowedLateness( Time.seconds( 3 ) )

I have provided lateness of 3 seconds to take those two late elements from the above table.

A sample run produces the following output


A bit of a surprise here — I was expecting that Flink would create one window for [1, 2, 4, 5] and another for [3, 6], but instead, I got multiple overlapping windows (sort of sliding window having late elements).

Searching this question brought me to this stackoverflow post, which helped me..."Because of the allowedLateness, any late events (within the period of allowed lateness) will cause late (or in other words, extra) firings of the relevant windows. With the default EventTimeTrigger (which you appear to be using), each late event causes an additional window firing, and an updated window result will be emitted"

In simple terms, "allowedLateness() with default EventTimeTrigger will create more than one windows - first with elements within a normal window and second = first_window + late_element". Since I am using the default EventTimeTrigger (didn't provided any custom trigger), the above behavior is an expected one. Another option is to define a custom trigger which will not fire a window till getCurrentWatermark < window_time + allowed_lateness

Example 2

Another recommended approach to handle late arrivals is to use the sideOutputLateData method. There are two benefits of using an "OutputTag" approach:

1. You can separate normal stream and late streams

2. You can put any data which was not processed correctly in the normal stream to another stream for debugging and triage purposes (sort of dead letter queue)

Let's see an example here

Java
 







Line #1, #3 = Define two side output tags, one for handling late elements and the other one to handle any error cases

Line #16 = Declare that you want every late element in a side output tagged as "outputLateTag"

Line #29 = For the sake of this example, I am writing every element in an "outputErrTag"

Line #36, #37 = Get access to the side streams

Line #40 - #42 = Print the result to console

Conclusion

In this article, we discussed using allowedLateness to handle late elements that arrive after a watermark is generated for the current window. Plus, we saw how to use the "sideOuput" approach to redirect late elements to an entirely new stream.

Topics:
flink ,flink api ,streaming analytics ,big data ,integration ,tutorial ,apache flink

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}