DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • This Is How You Give Good Feedback at Work
  • Running Streaming ETL Pipelines with Apache Flink on Zeppelin Notebooks
  • Unbounded Stream Processing Using Apache Beam
  • Process Mining Key Elements

Trending

  • Designing for Sustainability: The Rise of Green Software
  • Next-Gen IoT Performance Depends on Advanced Power Management ICs
  • Building Resilient Networks: Limiting the Risk and Scope of Cyber Attacks
  • Optimizing Software Performance for High-Impact Asset Management Systems
  1. DZone
  2. Culture and Methodologies
  3. Career Development
  4. How to Build and Debug a Flink Pipeline Based in Event Time

How to Build and Debug a Flink Pipeline Based in Event Time

In this article, we’ll take a look at Event time-based pipelines and also to some common problems and misunderstandings working on this type of pipeline.

By 
Anton Rodriguez user avatar
Anton Rodriguez
·
Mar. 25, 21 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
8.1K Views

Join the DZone community and get the full member experience.

Join For Free

One of the most important concepts for stream-processing frameworks is the concept of time. There are different concepts of time:

  • Processing time: It’s the time-based on the clock of the machine where the event is being processed. It’s easy to use, but, because that time changes when the job is executed, the result of the job isn’t consistent. Each time you execute the job, you may have different results. This isn’t an acceptable trade-off for many use cases.
  • Event time: It’s time-based on some of the fields in the event, typically a timestamp field. Each time you execute the pipeline with the same input, you obtain the same result, which is a good thing. But it also tends to be a bit harder to work with it for several reasons. We’ll cover them later in the article.
  • Ingestion time: It’s based on the timestamp when the event was ingested in the streaming platform (Kafka) and it usually goes in the metadata. From a Flink perspective, we can consider it a particular mix of Event time and processing time with the disadvantages of both.

Apache Flink has excellent support for Event time processing, probably the best of the different stream-processing frameworks available. For more information, you can read Notions of Time: Event Time and Processing Time in the official documentation. If you prefer videos, Streaming Concepts and Introduction to Flink - Event Time and Watermarks is a good explanation.

In this article, we’ll take a look at Event time-based pipelines and also to some common problems and misunderstandings working on this type of pipeline.

Timestamps and Watermarks

When we speak about timestamps in Flink, we are referring to a particular field in the event. We can extract it and make it available to Flink, so it knows what’s the actual time from the pipeline perspective. The format expected by Flink is Unix time, specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z, so we may need to do some type of conversion. To be able to map current time with the event timestamp, Flink expects an implementation of the TimestampAssigner. We’ll see later an example.

Once Flink knows what time it is, it’s the moment to generate a watermark. This is one of the most surprising and genial thinks about working with Flink. A watermark is a special type of event. That means it flows through your job and it’s processed under the hood for each task. This is a clever way to propagate a change through the entire pipeline and it’s used for more things in Flink, like, for example, savepoints.

Generating watermarks is the way to tell the system about progress in event time. To do it, you use a WatermarkGenerator. We’ll see later an example.

Both together, TimestampAssigner and WatermarkGenerator form a WatermarkStrategy which defines how to generate Watermarks in the stream sources.

Use Case Example

Let’s illustrate this with an example. Our Flink job will receive readings from different sensors. Every sensor will send measures for each 100ms. We would like to detect when a measure from a particular sensor is missing, for example, because it was off-line.

Sensors send a JSON file like this one:

JSON
 




xxxxxxxxxx
1


 
1
{
2
  "id": "sensor0",
3
  "timestamp": 0,
4
  "measure": 0.1
5
}



The job will generate a normal event but the measure value will have a value of -1 when the event is not generated by the sensor or lost in the network.

Initial Implementation With Periodic Watermark Generators

We’ll have to choose a WatermarkStrategy. We have several options, let’s start with Periodic WatermarkGenerator:

  • WatermarkStrategy.forBoundedOutOfOrderness: this is a periodic generator that allows us to deal with records out of order when it’s inside a defined range.
  • WatermarkStrategy.forMonotonousTimestamps: this is the same as forBoundedOutOfOrderness, but the out-of-order tolerance is zero.

In both cases, the framework periodically invokes the Strategy which generates the watermark. setAutoWatermarkInterval allows us to define that periodicity:

Java
 




xxxxxxxxxx
1


 
1
env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(100).toMillis());


The problem with this approach is that we are mixing processing and event time so the result won’t be deterministic, or even correct depending on the circumstances.

For example, with BoundedOutOfOrdernessStrategyJob, we start defining the watermark interval each 100 ms.

Java
 




xxxxxxxxxx
1


 
1
env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(100).toMillis());



Then we create the DataStream with the watermarks:

Java
 




xxxxxxxxxx
1
13


 
1
DataStream<SensorData> sensorStream =
2
    env.addSource(source)
3
        .returns(TypeInformation.of(SensorData.class));
4
 
          
5
var sensorEventTimeStream =
6
    sensorStream.assignTimestampsAndWatermarks(
7
        WatermarkStrategy.<SensorData>forBoundedOutOfOrderness(
8
            Duration.ofMillis(100)
9
        ).withTimestampAssigner(
10
            (event, timestamp) -> event.getTimestamp()
11
        )
12
    );



To detect missing events, we used a timer so we need a keyed stream and a KeyedProcessFunction:

Java
 




xxxxxxxxxx
1


 
1
sensorEventTimeStream
2
    .keyBy((event) -> event.getId())
3
    .process(new TimeoutFunction())
4
    .addSink(sink);



The TimeoutFunction stores each event in the state and creates a timer for each one. It cancels the timer if the next event arrives on time. If not, onTimer should be invoked and the event in the state identifying the missing sensor is emitted.

Testing and Debugging the First Implementation

Let’s create a simple test: two sensors and one of them misses one of the measures. When we launch the test testBoundedOutOfOrdernessStrategyJob, we obtain the following result:

Timer: 500 -> sensor0
Timer: 500 -> sensor1
SensorData{id=‘sensor0’, timestamp=0, measure=0.1}
SensorData{id=‘sensor1’, timestamp=0, measure=0.2}
SensorData{id=‘sensor0’, timestamp=100, measure=0.3}
SensorData{id=‘sensor1’, timestamp=100, measure=0.4}
SensorData{id=‘sensor0’, timestamp=200, measure=0.5}
SensorData{id=‘sensor0’, timestamp=300, measure=0.7}
SensorData{id=‘sensor1’, timestamp=300, measure=0.8}
SensorData{id=‘sensor0’, timestamp=400, measure=0.9}
SensorData{id=‘sensor1’, timestamp=400, measure=1.0}
SensorData{id=‘sensor0’, timestamp=500, measure=-1.0}
SensorData{id=‘sensor1’, timestamp=500, measure=-1.0}

The job doesn’t detect the missing event, it detects the end of the stream. Why? It’s time to do some debugging. Debugging watermarks issues isn’t easy. There are three options:

  • Check the current watermark metrics. This is ideal for real jobs, but a bit more complicated with tests because they finish almost immediately.
  • Check the current watermark in the Flink UI: as with the previous one, it doesn’t work with tests if they finish too quickly.
  • Introduce a custom operator which has access to the current watermark. I used this one which allows us also to play with some more advanced operators.

Flink UI

StreamWatermarkDebugFilter is the internal class StreamFilter with some minor modifications:

  • We don’t want to filter any event. This could be improved a bit avoiding the filtering, but because it’s a class only for debugging, I didn’t care too much.
  • In the method processWatermark, emit the watermark to be consumed for the next operator and print it for debugging purposes.

We apply the new operator to the job:

Java
 




xxxxxxxxxx
1


 
1
sensorEventTimeStream
2
    .transform("debugFilter", sensorEventTimeStream.getType(), new StreamWatermarkDebugFilter<>())
3
    .keyBy((event) -> event.getId())
4
    .process(new TimeoutFunction())
5
    .addSink(sink);



We can see the watermarks generated executing the test again:

Watermark: 9223372036854775807

Only one watermark is generated: Long.MAX_VALUE. This watermark seems to be executed at the end of the job because it’s the bigger possible watermark. This is consequent with the last two timers we see. They are launched with timestamp 500, but there is no watermark with that value: it’s just the end of the job.

So the Watermark generator isn’t generating a watermark. The only reason I see for that it’s because the job is ending before any watermark is generated. We could set the periodic generation with a smaller value but the problem remains: we are mixing processing and event time so it’s really hard to know how the pipeline is going to proceed in some conditions.

Final Implementation With a Punctuated WatermarkGenerator

We are going to create a generator that will be able to generate watermarks based on the elements of the stream: a Punctuated WatermarkGenerator. So we create a new job CustomStrategyJob:

Java
 




xxxxxxxxxx
1
23


 
1
var sensorEventTimeStream =
2
    sensorStream
3
        .assignTimestampsAndWatermarks(
4
            new WatermarkStrategy<SensorData>() {
5
                @Override
6
                public WatermarkGenerator<SensorData> createWatermarkGenerator(
7
                    WatermarkGeneratorSupplier.Context context) {
8
                    return new BoundedOutOfOrdernessWatermarks<>(
9
                        Duration.ofMillis(0)
10
                    ) {
11
                        @Override
12
                        public void onEvent(
13
                            SensorData event,
14
                            long eventTimestamp,
15
                            WatermarkOutput output) {
16
                            super.onEvent(event, eventTimestamp, output);
17
                            super.onPeriodicEmit(output);
18
                        }
19
                    };
20
                }
21
            }
22
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
23
        );



It’s similar to BoundedOutOfOrdernessWatermarks but we modify the method onEvent to invoke onPeriodicEmit which emits the watermark. So, instead of being invoked by the framework, now it emits a new watermark each time it receives an event.

Flink Pipeline with Punctuated Generator

The test produces the following output now:

Watermark: -1
Watermark: -1
Watermark: 99
Watermark: 99
Watermark: 199
Watermark: 299
Watermark: 299
Watermark: 399
Watermark: 399
Watermark: 9223372036854775807
Watermark: 9223372036854775807

This seems a lot better, but there is a problem. We don’t have a watermark generated for the sensor 0 at 199. The problem here it’s our stream is keyed, so it’s being processed by two different tasks. Watermarks generations work per task so they don’t advance at the same time. To solve this, the easier way is to set parallelism to 1. Unfortunately, this approach isn’t very efficient.

Relaunching the test, we obtain the expected result:

SensorData{id=‘sensor0’, timestamp=0, measure=0.1}
SensorData{id=‘sensor1’, timestamp=0, measure=0.2}
SensorData{id=‘sensor0’, timestamp=100, measure=0.3}
SensorData{id=‘sensor1’, timestamp=100, measure=0.4}
SensorData{id=‘sensor0’, timestamp=200, measure=0.5}
SensorData{id=‘sensor0’, timestamp=300, measure=0.7}
SensorData{id=‘sensor1’, timestamp=200, measure=-1.0}
SensorData{id=‘sensor1’, timestamp=300, measure=0.8}
SensorData{id=‘sensor0’, timestamp=400, measure=0.9}
SensorData{id=‘sensor1’, timestamp=400, measure=1.0}
SensorData{id=‘sensor0’, timestamp=500, measure=-1.0}
SensorData{id=‘sensor1’, timestamp=500, measure=-1.0}

Summary and Next Steps

Apache Flink is a great framework and it supports Event time in a nice way. The concept of watermarks as events in the pipeline is superb and full of advantages over other frameworks. But it’s also quite complex to understand because:

  1. The official documentation is scarce.
  2. APIs have changed a lot between versions. It’s hard to find updated examples even in GitHub.
  3. Debug Event Time pipelines are hard.

I wrote this article to contribute to these points. But I have yet some doubts about different points in Event Time so take my conclusions with skepticism and draw your own. If they are different or you would like to share your thoughts, I would appreciate knowing more about it.

Resources and Further Reading

There are some resources that helped me a lot and they may help you too:

  • Book Stream Processing with Apache Flink: Fundamentals, Implementation, and Operation of Streaming Applications: Chapter 6 provides the better explanation I found about watermarks with some nice diagrams. It’s a bit outdated now but the general concepts apply in the same way.
  • The Dataflow Model paper.
  • Flink mail list: there are some very interesting discussions about this particular topic and people tend to help. I recommend two particular threads which I found very illustrative:
    • Timers not firing until stream end.
    • assignTimestampsAndWatermarks not work after Keyed.

Source code is on this GitHub repository.

Event Pipeline (software) IT Stream processing career Debug (command) Concept (generic programming)

Published at DZone with permission of Anton Rodriguez. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • This Is How You Give Good Feedback at Work
  • Running Streaming ETL Pipelines with Apache Flink on Zeppelin Notebooks
  • Unbounded Stream Processing Using Apache Beam
  • Process Mining Key Elements

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!