DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Building Scalable Real-Time Apps with AstraDB and Vaadin
Register Now

Trending

  • Transactional Outbox Patterns Step by Step With Spring and Kotlin
  • How To Integrate Microsoft Team With Cypress Cloud
  • Never Use Credentials in a CI/CD Pipeline Again
  • Scaling Site Reliability Engineering (SRE) Teams the Right Way

Trending

  • Transactional Outbox Patterns Step by Step With Spring and Kotlin
  • How To Integrate Microsoft Team With Cypress Cloud
  • Never Use Credentials in a CI/CD Pipeline Again
  • Scaling Site Reliability Engineering (SRE) Teams the Right Way
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Streaming ETL With Apache Flink — Part 4

Streaming ETL With Apache Flink — Part 4

Preetdeep Kumar user avatar by
Preetdeep Kumar
CORE ·
Dec. 09, 19 · Tutorial
Like (2)
Save
Tweet
Share
7.92K Views

Join the DZone community and get the full member experience.

Join For Free

Image title

Previous posts - Part 1  |  Part 2  |  Part 3.

Introduction

In this post, I am going to solve the event correlation problem from part three of this series using Flink's CEP API. After going through their awesome CEP documentation, here is what I did.

Note: All the code from this example is available on GitHub.

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorEvent> inputStream = senv
                                      .addSource( new EventSourceCEP() )
                                      .keyBy( (KeySelector<SensorEvent, String>) SensorEvent::getDeviceId);


In the above code block, I created an EventSourceCEP class copied from EventSource, but it only emits an event object and not any tuple. We've partitioned this input stream by using deviceId as its key.

SkipPastLastStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();


You may also like: Apache Flink vs. Apache Spark.

Here, I select a skip strategy called SKIP_PAST_LAST_EVENT, which, as per Flink, "Discards every partial match that started after the match started but before it ended." This allows me to get only the matched sequences and not any partial matches.

Pattern<SensorEvent, ?> pattern = 
    Pattern
    .<SensorEvent>begin("start", skipStrategy)
    .where( new SimpleCondition<SensorEvent>() 
                {
                @Override
                public boolean filter(SensorEvent event) 
                {
                 return event.isConnected();
                }
                })
               .next("end")
               .where( new SimpleCondition<SensorEvent>() 
                {
                    @Override
                    public boolean filter(SensorEvent event) 
                    {
                      return event.isDisconnected();
                    }
               })
     .within( Time.seconds( 5 ) );


Here, I define a pattern. This is the important part of CEP where we start defining a pattern using begin() and a unique name, followed by a where(), which provides a condition that has to be satisfied with an event in order to be considered a match. I have defined two such patterns — one for a connected event and the second one for a disconnected event.

I am using next() to tell Flink that I expect all matching events to appear strictly one after the other, without any non-matching events in-between. Other  are followedBy(), for relaxed, and followedByAny(), for non-deterministic relaxed contiguity.

I am also telling Flink to hold on five seconds until a matching pattern is completed in order to be considered valid. (My duration is 10 seconds and threshold is two, so I have created a five-second delay.) 

PatternStream<SensorEvent> patternStream = CEP.pattern(inputStream, pattern);

patternStream.process(new PatternProcessFunction<SensorEvent, String>()
           {
            int count = 0;

            @Override
            public void processMatch(Map<String, List<SensorEvent>> match,
                                     Context ctx,
                                     Collector<String> out) throws Exception 
            {
                count++;

                if(count > THRESHOLD)
                {
                    String message = "Pattern found for " + match.get( "start" ).get( 0 ).deviceId;
                    out.collect(message);
                    count = 0;
                }
            }
        }).print();


Once I have the pattern, I need to pass it to CEP to get an instance of PatternStream. The process()method is called for every successful match where I am using a counter to ensure that  Collector::out() is invoked only when pattern matches have crossed the threshold.

I didn't find a way to tell Flink that I need an entire pattern sequence matched n times. You can provide this information to individual patterns using times(), but not to the entire sequence. I hope in the upcoming versions this handy feature is also addressed.

Finally, printing to console. In the real-world, this will be pushed to a DB from where an alert management system will display it to the user.


Further Reading

  • Top 5 Enterprise ETL Tools.
  • Things to Understand Before Implementing ETL Tools.
  • The State of ETL: Traditional to Cloud.

Extract, transform, load Apache Flink

Opinions expressed by DZone contributors are their own.

Trending

  • Transactional Outbox Patterns Step by Step With Spring and Kotlin
  • How To Integrate Microsoft Team With Cypress Cloud
  • Never Use Credentials in a CI/CD Pipeline Again
  • Scaling Site Reliability Engineering (SRE) Teams the Right Way

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: