Deep Dive Into Apache Flink's TumblingWindow — Part 3
In this article, see how to use allowedLateness to handle late elements that arrive after a watermark is generated for the current window and more.
Join the DZone community and get the full member experience.
Join For FreeTo Handle Late Arrivals
In practice, elements or events do not arrive in the sequence they were generated. This can be due to external factors such as network delay or user went off-network. This is a common problem in streaming computation and must be handled either by streaming framework or the application itself.
**My usage of words 'elements', 'events', or 'messages' in this post represents a unit of immutable data being generated at source and is available as output from the stream
So what is lateness here, as per Flink, "Late elements are elements that arrive after the system’s event time clock (as signaled by the watermarks) has already passed the time of the late element’s timestamp". In simple terms, if the window ends at 15:00:00, then any element arriving into the Flink system post this timestamp is late. By default, late elements are dropped by Flink.
To allow users to handle late elements, Flink provides a method called "allowedLateness
". You can specify how a windowed transformation should deal with late elements and how much lateness is allowed. Elements that arrive within the allowed lateness are still put into windows and are considered when computing window results. If elements arrive after the allowed lateness they will be dropped.
Note - Setting allowed lateness is only valid for event-time windows.
Example 1
I am using "ElementGeneratorSource" (from the previous post) class as my source, I tweaked it to manually generate some late elements as follows -
Assuming current system time as 2020-03-16 13:42:40
Element | Timestamp | 5 seconds window |
1 | 2020-03-16T13:42:20 | |
2 | 2020-03-16T13:42:21 | |
3 | 2020-03-16T13:42:26 | Triggers 1st watermark |
4 | 2020-03-16T13:42:23 | 1st late element |
5 | 2020-03-16T13:42:24 | 2nd late element |
6 | 2020-03-16T13:42:28 | |
7 | 2020-03-16T13:42:39 | Triggers 2nd watermark |
Now the Flink code to handle late elements
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );
elementStream
.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
{
public long extractAscendingTimestamp( Element element )
{
return element.getTimestamp();
}
})
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 5 ) ) )
.allowedLateness( Time.seconds( 3 ) )
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
{
public void process( Context arg0, Iterable<Element> input, Collector<Integer> output ) throws Exception
{
logger.info( "Computing sum for {}", input );
int sum = 0;
for(Element e : input) {
sum += e.getValue();
}
output.collect( sum );
}
})
.print();
env.execute();
Similar to EventTimestamp example from the previous example except for .allowedLateness( Time.seconds( 3 ) )
I have provided lateness of 3 seconds to take those two late elements from the above table.
A sample run produces the following output
A bit of a surprise here — I was expecting that Flink would create one window for [1, 2, 4, 5] and another for [3, 6], but instead, I got multiple overlapping windows (sort of sliding window having late elements).
Searching this question brought me to this stackoverflow post, which helped me..."Because of the allowedLateness, any late events (within the period of allowed lateness) will cause late (or in other words, extra) firings of the relevant windows. With the default EventTimeTrigger (which you appear to be using), each late event causes an additional window firing, and an updated window result will be emitted"
In simple terms, "allowedLateness() with default EventTimeTrigger will create more than one windows - first with elements within a normal window and second = first_window + late_element". Since I am using the default EventTimeTrigger (didn't provided any custom trigger), the above behavior is an expected one. Another option is to define a custom trigger which will not fire a window till getCurrentWatermark < window_time + allowed_lateness
Example 2
Another recommended approach to handle late arrivals is to use the sideOutputLateData
method. There are two benefits of using an "OutputTag" approach:
1. You can separate normal stream and late streams
2. You can put any data which was not processed correctly in the normal stream to another stream for debugging and triage purposes (sort of dead letter queue)
Let's see an example here
xxxxxxxxxx
final OutputTag<Element> outputLateTag = new OutputTag<Element>("side-output-late") {};
final OutputTag<Element> outputErrTag = new OutputTag<Element>("side-output-error") {};
SingleOutputStreamOperator<Integer> stream =
elementStream
.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
{
public long extractAscendingTimestamp( Element element )
{
return element.getTimestamp();
}
})
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 5 ) ) )
.sideOutputLateData( outputLateTag )
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
{
public void process( Context ctx, Iterable<Element> input, Collector<Integer> output ) throws Exception
{
logger.info( "Computing sum for {}", input);
int sum = 0;
for(Element e : input)
{
sum += e.getValue();
// send to a side stream
ctx.output(outputErrTag, e));
}
output.collect( sum );
}
});
// get late and error streams as side output
DataStream<Element> lateStream = stream.getSideOutput( outputLateTag );
DataStream<Element> errStream = stream.getSideOutput( outputErrTag );
// print to console
stream.print();
lateStream.print();
errStream.print();
Line #1, #3 = Define two side output tags, one for handling late elements and the other one to handle any error cases
Line #16 = Declare that you want every late element in a side output tagged as "outputLateTag"
Line #29 = For the sake of this example, I am writing every element in an "outputErrTag"
Line #36, #37 = Get access to the side streams
Line #40 - #42 = Print the result to console
Conclusion
In this article, we discussed using allowedLateness to handle late elements that arrive after a watermark is generated for the current window. Plus, we saw how to use the "sideOuput" approach to redirect late elements to an entirely new stream.
Opinions expressed by DZone contributors are their own.
Comments