Over a million developers have joined DZone.

Implementing Flink Batch Data Connector

DZone 's Guide to

Implementing Flink Batch Data Connector

A code walk through for connecting to external data sources including a Twitter social graph and running it as a batch in Java 8.

· Big Data Zone ·
Free Resource

Apache Flink has a versatile set of connectors for externals data sources. It can read and write data from databases, local and distributed file systems. However, sometimes what Flink provides is not enough, and we need to read some uncommon data format.

In this article, I will show you how to implement a custom connector for reading a dataset in Flink.

Data Format

Before we start writing code let’s take a look at what we are going to read. As an example, I selected Twitter social graph from Stanford Network Analysis Project. The format is pretty straightforward and describes “follows” relationships between Twitter users. If we unpack the archive from the Standford website with the social graph, we will find a number of files with the .edges extension. Every file has a long list of followers list like this:

$ cat 100318079.edges 
  214328887 34428380 17116707 28465635 380580781 18996905 221036078 
  153460275 107830991 17868918 151338729 222261763 

Every line in these files represents a “follows” relationship. The first number is a follower’s Twitter user id and the second value is a user who is followed on Twitter.

If you want to find a Twitter page by ids from these files just use the following URL:


End Goal

Let’s start with the end goal in mind. We would like to point Flink to a directory with all the edges files in it and let it create a dataset of edges for us. This should look like this:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
DataSet<TwitterFollower> twitterFollowers = env.createInput(
  new StanfordTweetsDataSetInputFormat("/path/to/twitter/dataset")); 

To read a dataset, we need to use the createInput function from the ExecutionEnvironment class. The createInput has a single argument of the InputFormat that defines how to read a dataset for data processing. We will implement this interface in this post.

The TwitterFollower is a POJO type that I defined specifically for this InputSource. It has two fields, user id and followers id. Our InputSource will return a dataset of these objects.

public class TwitterFollower extends Tuple2<Integer, Integer> 
public TwitterFollower() 
    super(null, null); 

public TwitterFollower(int user, int follower) 
    super(user, follower); 
public Integer getUser() { 
return f0; 
public Integer getFollower() { 
    return f1; 
 public void setUser(int user) { 
    this.f0 = user; 
 public void setFollower(int follower) { 
    this.f1 = follower; 

As you could notice it inherits Tuple2 class from Flink. It is not mandatory, but it will allow users of this class to use more efficient versions of group by and join operations in Flink when we only need to specify a field index in a dataset.

This is a common pattern that you can find in Flink sources. For example here is an implementation of a graph edge class from Flink’s Gelly library.

InputSource Interface

To understand how to implement InputSource interface we first need to understand how it is used. When Flink is reading data from a data source, it first calls InputSource implementation to split input data into chunks of work that are called splits. Then Flink reads these splits of data in parallel.

These two steps are represented by two groups of methods in the InputFormat interface, and we will implement them one by one.

Splitting Input Data

The first group of methods is used to split input data into separate chunks that can be read in parallel:

  • configure – Method that is called to configure an InputFormat
  • createInputSplits – This method defines how to split reading of the input data into independent chunks
  • getStatistics – Get statistics about the input data

Let’s start with the configure method.

 public class StanfordTweetsDataSetInputFormat extends 
   RichInputFormat<TwitterFollower, TweetFileInputSplit> 
     private transient FileSystem fileSystem; 
     private transient BufferedReader reader; 
     private final String inputPath; 
     private String nextLine;
     public StanfordTweetsDataSetInputFormat(String path) 
     { this.inputPath = path; } 
     @Override public void configure(Configuration parameters) { } ... 

Our implementation does not need it, but other InputFormat implementations use it to read task-specific configuration. For example, FileInputFormat uses this method to read configuration value for recursive file reading:

Configuration parameters = new Configuration(); 
parameters.setBoolean("recursive.file.enumeration", true); 
DataSet<String> logs = env.readTextFile("file:///directory/path") 

Next method that we will implement is createInputSplits that as the name suggests creates an array of splits of input data that can be read in parallel.

 private transient FileSystem fileSystem; 
@Override public TweetFileInputSplit[] 
  createInputSplits(int minNumSplits) throws IOException 
{ FileSystem fileSystem = getFileSystem(); 
 // Get all files in the input directory 
 FileStatus[] statuses = fileSystem.listStatus(new Path(path));
 List<TweetFileInputSplit> splits = new ArrayList<>(); 
 for (int i = 0; i < statuses.length; i++) 
   FileStatus status = statuses[i]; 
  String fileName = status.getPath().getName(); 
   // Ignore other auxiliary files 
   if (fileName.endsWith("edges")) 
   { // Create an input split to read one file 
     splits.add(new TweetFileInputSplit(i, status.getPath())); } } 
 return splits.toArray(new TweetFileInputSplit[splits.size()]); } 

private FileSystem getFileSystem() throws IOException 
{ // Lazy initialization since FileSystem is not serializable 
  if (fileSystem == null) 
  { try 
    fileSystem = FileSystem.get(new URI(path)); }
   catch (URISyntaxException e) 
   { throw new RuntimeException(e); } 
  } return fileSystem; } 
// Input split to read one input file 
class TweetFileInputSplit implements InputSplit 
  private final int splitNumber; 
 private final Path path; 
  public TweetFileInputSplit(int splitNumber, Path path) 
  { this.splitNumber = splitNumber; this.path = path; } 
  @Override public int getSplitNumber() 
  { return splitNumber; } 
  public Path getPath() { return path; } 

Let’s go through this code line by line. At first, we create an instance of Flink’s FileSystem class. This class allows performing operations with local or remote filesystems. For example, if we pass a URL like “hdfs://dir/” Flink will create an HDFS specific implementation.

Since we control how to read each split we have a lot of leeway regarding how to split input data. In this code we every split will contain a file name to read in parallel, but we could split files into blocks or group several files together in a single split.

The last method that we need to implement is getStatistics that returns information about data that we are about to read. Statistics contains three fields: total size of all files in bytes, number of records and average record width. We can only evaluate total files size in this case:

@Override public BaseStatistics 
  getStatistics(BaseStatistics cachedStatistics) 
  throws IOException 
  FileSystem fileSystem = createFileSystem(); 
  FileStatus[] statuses = fileSystem.listStatus(new Path(path)); 
  return new GraphStatistics(statuses.length); 
private class GraphStatistics implements BaseStatistics 
  private long totalInputSize; 
  public GraphStatistics(long totalInputSize) 
  { this.totalInputSize = totalInputSize; } 
  @Override public long getTotalInputSize() 
  { return totalInputSize; } 
  @Override public long getNumberOfRecords() 
  { return BaseStatistics.NUM_RECORDS_UNKNOWN; } 
  @Override public float getAverageRecordWidth() 
  { return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; } 

If we cannot provide any useful information about data to read we can simply return null from the getStatistics method.

One optimization that we could do is to use cached statistic that is passed the getStatistics. To make use of it, we need to track time when previous statistics were calculated and recalculate it only if a filesystem was updated since then.

Reading Data

The second group of methods in the InputSource interface is used for reading a single data split. In our case, it should read a single file with edges and produce an instance of TwitterFollower for every line.

To do this, we need to implement four methods:

  • open – this method is called to read records from a single input split
  • reachedEnd – called to check if there are more records to read in a single input split
  • nextRecord – called to read the next record (TwitterFollower in our case)
  • close – called to close all resources allocated for reading an input split

open method in this implementation is quite simple. We just need to create a reader for a text file:

@Override public void open(TweetFileInputSplit split) 
  throws IOException 
  FileSystem fileSystem = createFileSystem(); 
  this.reader = new BufferedReader(new InputStreamReader(
  // Pre-read next line to easily check if we've reached the end of an input 
  this.nextLine = reader.readLine(); } 

As before we use Flink’s FileSystem type to work with files. Its open method returns an InputStream for a file on a file system. Then we pre-read the first line from the opened file. We do this so that we can use in the nextRecord implementation that Flink is calling to check if any more items that can be read from a current split. All we need to do is to check if we have reached the end of the file:

@Override public boolean reachedEnd() 
throws IOException { 
return nextLine == null; 

Close method simply closes the file reader:

@Override public void close() throws IOException { 
  if (reader != null) { 

Now the only thing that is left is to read records from the file. To do this, we need to implement the nextRecord method that reads a single line and converts it into a single TwitterFollower instance:

@Override public TwitterFollower nextRecord(TwitterFollower reuse) 
  throws IOException 
  String[] split = nextLine.split(" "); 
  int userId = Integer.parseInt(split[1]); 
  int followerId = Integer.parseInt(split[0]); 
  nextLine = reader.readLine(); 
  return reuse; 

Using Built-In Flink Classes

If you need to read data from a file system in Flink, I would not suggest to implement it yourself from scratch. Of course, it is interesting to understand how it can be implemented and useful to be able to implement one yourself, but Flink provides an abstract class called FileInputFormat that can be a good starting point. It can be extended to read data from file systems and supports more advanced features such as reading data from folders recursively and more advanced splitting that tries to balance sizes of input splits.

If FileInputFormat is too generic for you can use other more specific implementations as BinaryInputFormat for reading formats that use binary blocks of fixed size or DelimitedInputFormat that can be inherited if you need to implement a format with delimiter separated records.


In this post, you have learned how to read data from custom source in Flink, and now you can implement one yourself.

If you want to read the final version of the InputSource from this article, you can find it in my GithHub repo.

Also, please check out my Understanding Apache Flink course available through Pluralsight. If you'd like a preview of what's going to be covered, take a look at this video. Thanks!

big data ,flink ,hdfs ,java

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}