{{announcement.body}}
{{announcement.title}}

Streaming ETL With Apache Flink — Part 2

DZone 's Guide to

Streaming ETL With Apache Flink — Part 2

In this tutorial, we look at tumbling windows, incremental computation, keyed data, and more.

· Big Data Zone ·
Free Resource

Image title

Streaming ETL With Apache Flink

You can read part 1 here.

Introduction

In this post, I will improve upon the example shared in the previous post. We will also try to see a few details of Flink’s tumbling time window.

You might also like:  Real-Time Streaming ETL With SnappyData

Tumbling Window

The example I used in the previous post uses a tumbling window. It is an intuitive way to start with, where it collects all the data in a given window and ensures data do not overlap.

As per Flink doc, "Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes"

Carrying on from the example used in the previous post, I updated class IntegerSum to use a 10-second TumblingWindow interval, added few log statements to print current local time to know when ctx.collect(),  apply(), and addSink() are called. Inside the apply() method, I am using AllWindowFunction, which, as per the Flink doc, is called once all the data has been collected for this time window. Finally, I edited the variable "sleepTimer" so that only 2 integers are generated per second from IntegerSourceFunction.

Note: All examples used here are available on GitHub, hence showing important code blocks only.

The following figure shows the eclipse console output with the log statements.

Image title

Points to note from the above output:

  1. A Tumbling Window started at 20:22:40
  2. The ctx.collect() statements were executed (“Emitted at xx:yy:zz”)
  3. These values are buffered by Flink in memory
  4. The Tumbling Window trigger fires and apply() method is called at 20:22:50.003 (10 seconds later as expected)
  5. Finally, addSink() is called with the aggregated result almost immediately after apply() at 20:22:50:049

Incremental Computation

In many cases, it is not a good idea to hold all data of a window in memory. Flink provides an incremental way of computation through a handy interface called  ReduceFunction, which can be used in place of  AllWindowFunction. Instead of using the apply() method with AllWindowFunction, we can use the reduce() method with ReduceFunction.

As per the Flink Java doc,

reduce method = “…This window will try and incrementally aggregate data as much as the window policies permit. For example, tumbling time windows can aggregate the data, meaning that only one element per key is stored”

ReduceFunction interface = “…combine groups of elements to a single value, by taking always two elements and combining them into one”

Fair enough, let's try this out. I created a new class IntegerSumWithReduce and replaced the apply() method with reduce()as follows:

.reduce(new ReduceFunction<Integer>() 
{
   private static final long serialVersionUID = -6449994291304410741L;
   @Override
   public Integer reduce(Integer value1, Integer value2) throws Exception 
   {
      logger.info("Reduce called at {} with value1 {} and value2 {}", LocalTime.now(), value1, value2);
  return value1 + value2;
   }
})


Logic is still the same (sum of integers), but the good part is that now Flink can compute incrementally and doesn’t need to buffer all data first.

The following figure shows the output from the eclipse console

Image title

So, after every integer that is passed to  ctx.collect(), Flink calls the reducemethod, where the first parameter represents the calculation of the previous window, and the second parameter represents the current value in the stream. Finally, sink is called with the final result.

Note: As expected, both methods will take the same time for the tumbling window (sink will be called after 10 seconds). Computation is incremental, and limited data is stored in the buffer as compared to apply(AllWindowFunction).

Keyed Data

Stepping up a little now, in many real-world use cases, some sort of key is used to partition the data into multiple data streams. This pattern not only helps in faster computation but also distributes the data according to its property. As expected, Flink supports the keyed data stream and it is super easy to start with.

Assume we have a key (or any id) per stream of integers and now want to compute the sum for each key. For this example, I am using a new IntegerSourceWithKeyclass that emits both the key and the integer. We can either send this key and value as a Flink TupleXX class where XX is any number between 2 and 25 (Tuple2, Tuple3 … Tuple25), or we can use a POJO and a getter method to tell Flink where to look for the key.

First, let's see an example with TupleXX.

Below is the IntegerGeneratorSourceWithKey::run(), which implements SourceFunction>. Also, defined 3 ids, which will be used in a round-robin fashion along with each integer generated. Each record sent to stream is an instance of  Tuple2, with id being the first member and the data second.

private String[] id = new String[] {"id-1", "id-2", "id-3"};

@Override
public void run( SourceContext<Tuple2<String, Integer>> ctx ) throws Exception
{
   int counter = 1;
   while( isRunning )
   {
      Thread.sleep( sleepTimer );
      // generate integers with an id
      ctx.collect( new Tuple2<String, Integer>(id[counter%3],counter++));            
   }
}


IntegerSumWithKey is the class that is going to compute the sum from integers generated from IntegerGeneratorSourceWithKey

public class IntegerSumWithKey
{
   private StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
   private IntegerGeneratorSourceWithKey source = new IntegerGeneratorSourceWithKey(500);

   public void init() throws Exception
   {
     // add a simple integer generator as source 
     DataStream<Tuple2<String, Integer>> dataStream = senv.addSource(source);

     // build the pipeline using tumbling window size of 10 seconds
     dataStream
     .keyBy(0)
     .timeWindowAll(Time.seconds(10))
     .sum(1)
     .print();

     senv.execute(this.getClass().getSimpleName());
   }
}


Note: There are no reduce(), apply(), or addSink() methods in this class. Hats off to the Flink team for providing a simple API for developers to run locally and easily test the result.

Here,  keyBy(0) will let the data stream know that it will find a key at the first index of the data for partition. The time window is set to 10 seconds, after which it will sum(1) the values present at index 1 of data, and finally print()to stdout.

Behind the scenes, sum() is actually a predefined reduce() method with Flink provided aggregator and  print() a predefined addSink() method using Flink’s PrintSinkFunction class.

The following example uses Java POJO instead of Flink Tuple to achieve the same result.

public class IntegerSumWithKeyFromPojo
{
  private StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

  public void init() throws Exception
  {
   // add a source using simple POJO 
   DataStream<MyData> dataStream = 
   senv.addSource(new SourceFunction<MyData>() 
             {
  private static final long serialVersionUID = -1356281802334002703L;
  private String[] id = new String[] {"id-1", "id-2", "id-3"};

@Override
public void run(SourceContext<MyData> ctx) throws Exception {
  int counter = 1;
  while(true) {
 Thread.sleep( 500 );
 ctx.collect(new MyData(id[counter%3], counter++));
  }
}

@Override
public void cancel() {
  // production code must handle cancellation properly.
}
});

        // build the pipeline using tumbling window size of 10 seconds
    dataStream
    .keyBy((KeySelector<MyData, String>) MyData::getId)
    .timeWindowAll(Time.seconds(10))
    .sum("value")
    .print();

        senv.execute(this.getClass().getSimpleName());
    }
}


Conclusion

I believe we now have good examples to get started with Flink. In the next few posts, I will take a real-world use case and walk you through various steps to analyze streaming data.

Further Reading

Apache Kafka vs Integration Middleware (MQ, ETL, ESB)

Example of ETL Application Using Apache Spark and Hive

Topics:
flink ,data analysis ,big data ,streaming data ,integration ,streaming etl ,tutorial ,tumbling window

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}