Streaming ETL With Apache Flink — Part 4
Join the DZone community and get the full member experience.
Join For FreePrevious posts - Part 1 | Part 2 | Part 3.
Introduction
In this post, I am going to solve the event correlation problem from part three of this series using Flink's CEP API. After going through their awesome CEP documentation, here is what I did.
Note: All the code from this example is available on GitHub.
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorEvent> inputStream = senv
.addSource( new EventSourceCEP() )
.keyBy( (KeySelector<SensorEvent, String>) SensorEvent::getDeviceId);
In the above code block, I created an EventSourceCEP
class copied from EventSource
, but it only emits an event object and not any tuple. We've partitioned this input stream by using deviceId
as its key.
SkipPastLastStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
You may also like: Apache Flink vs. Apache Spark.
Here, I select a skip strategy called SKIP_PAST_LAST_EVENT
, which, as per Flink, "Discards every partial match that started after the match started but before it ended." This allows me to get only the matched sequences and not any partial matches.
Pattern<SensorEvent, ?> pattern =
Pattern
.<SensorEvent>begin("start", skipStrategy)
.where( new SimpleCondition<SensorEvent>()
{
@Override
public boolean filter(SensorEvent event)
{
return event.isConnected();
}
})
.next("end")
.where( new SimpleCondition<SensorEvent>()
{
@Override
public boolean filter(SensorEvent event)
{
return event.isDisconnected();
}
})
.within( Time.seconds( 5 ) );
Here, I define a pattern. This is the important part of CEP where we start defining a pattern using begin()
and a unique name, followed by a where()
, which provides a condition that has to be satisfied with an event in order to be considered a match. I have defined two such patterns — one for a connected event and the second one for a disconnected event.
I am using next()
to tell Flink that I expect all matching events to appear strictly one after the other, without any non-matching events in-between. Other are followedBy()
, for relaxed, and followedByAny()
, for non-deterministic relaxed contiguity.
I am also telling Flink to hold on five seconds until a matching pattern is completed in order to be considered valid. (My duration is 10 seconds and threshold is two, so I have created a five-second delay.)
PatternStream<SensorEvent> patternStream = CEP.pattern(inputStream, pattern);
patternStream.process(new PatternProcessFunction<SensorEvent, String>()
{
int count = 0;
@Override
public void processMatch(Map<String, List<SensorEvent>> match,
Context ctx,
Collector<String> out) throws Exception
{
count++;
if(count > THRESHOLD)
{
String message = "Pattern found for " + match.get( "start" ).get( 0 ).deviceId;
out.collect(message);
count = 0;
}
}
}).print();
Once I have the pattern, I need to pass it to CEP to get an instance of PatternStream
. The process()
method is called for every successful match where I am using a counter to ensure that Collector::out()
is invoked only when pattern matches have crossed the threshold.
I didn't find a way to tell Flink that I need an entire pattern sequence matched n times. You can provide this information to individual patterns using times()
, but not to the entire sequence. I hope in the upcoming versions this handy feature is also addressed.
Finally, printing to console. In the real-world, this will be pushed to a DB from where an alert management system will display it to the user.
Further Reading
- Top 5 Enterprise ETL Tools.
- Things to Understand Before Implementing ETL Tools.
- The State of ETL: Traditional to Cloud.
Opinions expressed by DZone contributors are their own.
Trending
-
Transactional Outbox Patterns Step by Step With Spring and Kotlin
-
How To Integrate Microsoft Team With Cypress Cloud
-
Never Use Credentials in a CI/CD Pipeline Again
-
Scaling Site Reliability Engineering (SRE) Teams the Right Way
Comments