{{announcement.body}}
{{announcement.title}}

Streaming ETL With Apache Flink — Part 4

DZone 's Guide to

Streaming ETL With Apache Flink — Part 4

In this article, we finish our series on streaming ETL with Apache Flink and describe how to solve a common pitfall with Flink's CEP API.

· Big Data Zone ·
Free Resource

Image title

Previous posts - Part 1  |  Part 2  |  Part 3.

Introduction

In this post, I am going to solve the event correlation problem from part three of this series using Flink's CEP API. After going through their awesome CEP documentation, here is what I did.

Note: All the code from this example is available on GitHub.

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorEvent> inputStream = senv
                                      .addSource( new EventSourceCEP() )
                                      .keyBy( (KeySelector<SensorEvent, String>) SensorEvent::getDeviceId);


In the above code block, I created an EventSourceCEP class copied from EventSource, but it only emits an event object and not any tuple. We've partitioned this input stream by using deviceId as its key.

SkipPastLastStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();


You may also like: Apache Flink vs. Apache Spark.

Here, I select a skip strategy called SKIP_PAST_LAST_EVENT, which, as per Flink, "Discards every partial match that started after the match started but before it ended." This allows me to get only the matched sequences and not any partial matches.

Pattern<SensorEvent, ?> pattern = 
    Pattern
    .<SensorEvent>begin("start", skipStrategy)
    .where( new SimpleCondition<SensorEvent>() 
                {
                @Override
                public boolean filter(SensorEvent event) 
                {
                 return event.isConnected();
                }
                })
               .next("end")
               .where( new SimpleCondition<SensorEvent>() 
                {
                    @Override
                    public boolean filter(SensorEvent event) 
                    {
                      return event.isDisconnected();
                    }
               })
     .within( Time.seconds( 5 ) );


Here, I define a pattern. This is the important part of CEP where we start defining a pattern using begin() and a unique name, followed by a where(), which provides a condition that has to be satisfied with an event in order to be considered a match. I have defined two such patterns — one for a connected event and the second one for a disconnected event.

I am using next() to tell Flink that I expect all matching events to appear strictly one after the other, without any non-matching events in-between. Other  are followedBy(), for relaxed, and followedByAny(), for non-deterministic relaxed contiguity.

I am also telling Flink to hold on five seconds until a matching pattern is completed in order to be considered valid. (My duration is 10 seconds and threshold is two, so I have created a five-second delay.) 

PatternStream<SensorEvent> patternStream = CEP.pattern(inputStream, pattern);

patternStream.process(new PatternProcessFunction<SensorEvent, String>()
           {
            int count = 0;

            @Override
            public void processMatch(Map<String, List<SensorEvent>> match,
                                     Context ctx,
                                     Collector<String> out) throws Exception 
            {
                count++;

                if(count > THRESHOLD)
                {
                    String message = "Pattern found for " + match.get( "start" ).get( 0 ).deviceId;
                    out.collect(message);
                    count = 0;
                }
            }
        }).print();


Once I have the pattern, I need to pass it to CEP to get an instance of PatternStream. The process()method is called for every successful match where I am using a counter to ensure that  Collector::out() is invoked only when pattern matches have crossed the threshold.

I didn't find a way to tell Flink that I need an entire pattern sequence matched n times. You can provide this information to individual patterns using times(), but not to the entire sequence. I hope in the upcoming versions this handy feature is also addressed.

Finally, printing to console. In the real-world, this will be pushed to a DB from where an alert management system will display it to the user.


Further Reading

Topics:
flink ,data analysis ,streaming analytics ,bigdata ,streaming etl ,big data ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}