{{announcement.body}}
{{announcement.title}}

Streaming ETL With Apache Flink — Part 3

DZone 's Guide to

Streaming ETL With Apache Flink — Part 3

In this article, we discuss how to solve a real-world issue while streaming ETL with Apache Flink in Java.

· Big Data Zone ·
Free Resource

squirrel-on-deck

Previous posts - Part 1  |  Part 2.

Introduction

In this post, let us start exploring Flink to solve a real-world problem. This post from zalando.com shows how they are using Flink to perform a complex event correlation. I will take a simplified and practical event correlation problem and try to solve using Flink.

Problem

Many IoT devices (e.g. sensors) send status information to a central cloud-based IoT Management System, which in turn publishes these state changes as events stream for further processing and analytics.

In this example, I am assuming two categories of state change events in the following format

{
  "event_name" : "CONNECTED",
  "timestamp" : "yyyy-MM-dd'T'HH:mm:ss:SSS",
  "device_id" : "UNIQUE-DEVICE-ID"
}

{
  "event_name" : "DISCONNECTED",
  "timestamp" : "yyyy-MM-dd hh:mm:ss:milli",
  "device_id" : "UNIQUE-DEVICE-ID"
}


So, the problem I am going to solve using Flink is — "For every device, identify a pattern sequence of a "CONNECTED" event followed by a "DISCONNECTED" event. If such pattern sequence occurs more than two times during 10 second time interval, then log it as an anomaly (alert condition)".

Note: All code examples are available on GitHub, so I'm going to share important code blocks only.

You may also like: Apache Flink Basic Transformation Example.

Assumption

  • Event source may or may not send individual events when they arrive and may send bulk events.
  • Events are not sorted but arrived in the order they are generated for each device.
  • Events cannot be resent from source (no replay option).

Solution One

To start with, I implemented an EventSource to generate 1000 pseudo-random events per second, which are wrapped as an instance of Tuple2 class Tuple2<SensorEvent, Integer>. The first parameter is the SensorEvent object itself, and the second parameter is the match count (initialized to 0). SensorEvent is a simple POJO to represent events as Java objects. 

This EventSource is then fed into my StreamExecutionEnvironment, as follows:

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<SensorEvent, Integer>> stream = senv.addSource(new EventSource());


Next, I will build a data pipeline as follows:

stream
.keyBy( new KeySelector<Tuple2<SensorEvent,Integer>, String>()
 {
   @Override
   public String getKey( Tuple2<SensorEvent,Integer> value ) throws Exception
   {
     return value.f0.getDeviceId();
   }
 })
.timeWindow( Time.seconds( 10 ) ) // 10 seconds window
.reduce(new ReduceFunction<Tuple2<SensorEvent,Integer>>()
 {   
   @Override
   public Tuple2<SensorEvent,Integer> reduce( Tuple2<SensorEvent, Integer> prev,
                                              Tuple2<SensorEvent, Integer> curr )
      throws Exception
   {  
     // find pattern
     if( prev.f0.isConnected() && curr.f0.isDisconnected())
     {
       // pattern found, increment threshold of current event
       curr.setField( prev.f1 + 1, 1 );       
     }
     // return current event which will be prevEvent in next computation
     return curr;
  })
 .addSink( new SinkFunction<Tuple2<SensorEvent,Integer>>()
  {    
    @Override
    public void invoke(Tuple2<SensorEvent,Integer> result) 
    {
      if( result.f1 > THRESHOLD )
      {
        System.out.println( "================================================================" );
        System.out.println("Device ID = " + result.f0.deviceId + " Total pattern count = " + result.f1);
        System.out.println( "================================================================" );
      }
    }
  });


Line 19 to 25 — Once I found a pattern match, I increment the second parameter to curr event object and return it back to Flink with the assumption that the same object will become the previous event object in the next computation. 

After running this code, I didn't see anything printed to the console (didn’t work). So, my assumption that Flink preserves the current event object and reuses it as the previous object for the next computation turned out to be false.

To confirm, I added the following system.out before line 18 to get the object hashcode along with deviceID.

System.out.println( "Device ID = " + prev.f0.getDeviceId() + " PREV = " + prev.hashCode() + " CURR = " + curr.hashCode() );


Here is what I got, (taking Device ID = 3).

Device ID = id-3 PREV = 949826765 CURR = -860377082
Device ID = id-3 PREV = -860377081 CURR = 1597592773
Device ID = id-3 RESULT = -29665316
Device ID = id-3 PREV = -1332999334 CURR = -1731677695
Device ID = id-3 PREV = -1731677694 CURR = -824321195
Device ID = id-3 PREV = -824321195 CURR = -1756705545
Device ID = id-3 RESULT = -1965323446

Well, it is clear now that I can’t rely on internal Flink objects to hold information and pass it through to the next event in the stream.

Out of curiosity, I decided to check whether reduce()is reused or not. I added another system.outas the first statement in reduce() method. Here is what I got:

Device ID = id-2 reduce() = 1183697316
Device ID = id-1 reduce() = 1320344024
Device ID = id-2 reduce() = 1183697316
Device ID = id-2 reduce() = 1183697316
Device ID = id-2 reduce() = 1183697316
Device ID = id-3 reduce() = 1732401171
Device ID = id-1 reduce() = 1320344024
Device ID = id-1 RESULT = -620487983
Device ID = id-2 RESULT = -766823468
Device ID = id-3 RESULT = 944562943
Device ID = id-1 reduce() = 1320344024
Device ID = id-3 reduce() = 1732401171
Device ID = id-2 reduce() = 1183697316


Findings

  • Each keyed data pipeline gets its own copy of the ReduceFunction implementation.
  • This copy is persisted throughout the stream cycle, crossing the time window.
  • All keyed computations are independent and can’t pass information through Flink.

Given these findings, I edited the reduce() method to have a class variable that will hold my pattern count as follows:

// showing only new reduce method code snippet
.reduce(new ReduceFunction<Tuple2<SensorEvent,Integer>>()
 {
   Tuple2<SensorEvent,Integer> tempResult = new Tuple2<SensorEvent,Integer>(new SensorEvent(), 0);

   @Override
   public Tuple2<SensorEvent,Integer> reduce( Tuple2<SensorEvent, Integer> prev,
                                              Tuple2<SensorEvent, Integer> curr )
           throws Exception
   {
     tempResult.f0 = curr.f0;

     // find pattern
     if( prev.f0.isConnected() && curr.f0.isDisconnected())
     {
       // pattern found, increment threshold of current event
       tempResult.f0 = curr.f0;
       tempResult.f1 = tempResult.f1 + 1;
     }
     return tempResult;
  }
 })


This resulted in the following being printed to the console:

===============================
Device ID = id-1 Total pattern count = 3
===============================

It worked, BUT there was a problem; this pattern count didn’t initialize itself to 0 after the tumbling window period ended and a new one began. I ended up using the same tempResult object to store match count, and it keeps on incrementing. This is a false positive and not the actual pattern I am looking for.

After all this “Do-It-Yourself,” I decided to check with Flink if it has the answer to this common use case in data streaming and I found CEP

In the next post, I will share how I solved this problem using CEP.


Further Reading

Topics:
big data, bigdata, data analysis, flink, streaming analytics, streaming data, streaming etl, tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}