Implementing Flink Batch Data Connector
A code walk through for connecting to external data sources including a Twitter social graph and running it as a batch in Java 8.
Join the DZone community and get the full member experience.
Join For FreeApache Flink has a versatile set of connectors for externals data sources. It can read and write data from databases, local and distributed file systems. However, sometimes what Flink provides is not enough, and we need to read some uncommon data format.
In this article, I will show you how to implement a custom connector for reading a dataset in Flink.
Data Format
Before we start writing code let’s take a look at what we are going to read. As an example, I selected Twitter social graph from Stanford Network Analysis Project. The format is pretty straightforward and describes “follows” relationships between Twitter users. If we unpack the archive from the Standford website with the social graph, we will find a number of files with the .edges extension. Every file has a long list of followers list like this:
$ cat 100318079.edges
214328887 34428380 17116707 28465635 380580781 18996905 221036078
153460275 107830991 17868918 151338729 222261763
Every line in these files represents a “follows” relationship. The first number is a follower’s Twitter user id and the second value is a user who is followed on Twitter.
If you want to find a Twitter page by ids from these files just use the following URL:
https://twitter.com/intent/user?user_id=[user-id]
End Goal
Let’s start with the end goal in mind. We would like to point Flink to a directory with all the edges files in it and let it create a dataset of edges for us. This should look like this:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<TwitterFollower> twitterFollowers = env.createInput(
new StanfordTweetsDataSetInputFormat("/path/to/twitter/dataset"));
To read a dataset, we need to use the createInput
function from the ExecutionEnvironment
class. The createInput
has a single argument of the InputFormat
that defines how to read a dataset for data processing. We will implement this interface in this post.
The TwitterFollower
is a POJO type that I defined specifically for this InputSource
. It has two fields, user id and followers id. Our InputSource
will return a dataset of these objects.
public class TwitterFollower extends Tuple2<Integer, Integer>
{
public TwitterFollower()
{
super(null, null);
}
public TwitterFollower(int user, int follower)
{
super(user, follower);
}
public Integer getUser() {
return f0;
}
public Integer getFollower() {
return f1;
}
public void setUser(int user) {
this.f0 = user;
}
public void setFollower(int follower) {
this.f1 = follower;
}
}
As you could notice it inherits Tuple2
class from Flink. It is not mandatory, but it will allow users of this class to use more efficient versions of group by and join operations in Flink when we only need to specify a field index in a dataset.
This is a common pattern that you can find in Flink sources. For example here is an implementation of a graph edge class from Flink’s Gelly library.
InputSource Interface
To understand how to implement InputSource
interface we first need to understand how it is used. When Flink is reading data from a data source, it first calls InputSource
implementation to split input data into chunks of work that are called splits. Then Flink reads these splits of data in parallel.
These two steps are represented by two groups of methods in the InputFormat
interface, and we will implement them one by one.
Splitting Input Data
The first group of methods is used to split input data into separate chunks that can be read in parallel:
- configure – Method that is called to configure an
InputFormat
- createInputSplits – This method defines how to split reading of the input data into independent chunks
- getStatistics – Get statistics about the input data
Let’s start with the configure
method.
public class StanfordTweetsDataSetInputFormat extends
RichInputFormat<TwitterFollower, TweetFileInputSplit>
{
private transient FileSystem fileSystem;
private transient BufferedReader reader;
private final String inputPath;
private String nextLine;
public StanfordTweetsDataSetInputFormat(String path)
{ this.inputPath = path; }
@Override public void configure(Configuration parameters) { } ...
}
Our implementation does not need it, but other InputFormat
implementations use it to read task-specific configuration. For example, FileInputFormat
uses this method to read configuration value for recursive file reading:
Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<String> logs = env.readTextFile("file:///directory/path")
.withParameters(parameters);
Next method that we will implement is createInputSplits
that as the name suggests creates an array of splits of input data that can be read in parallel.
private transient FileSystem fileSystem;
@Override public TweetFileInputSplit[]
createInputSplits(int minNumSplits) throws IOException
{ FileSystem fileSystem = getFileSystem();
// Get all files in the input directory
FileStatus[] statuses = fileSystem.listStatus(new Path(path));
List<TweetFileInputSplit> splits = new ArrayList<>();
for (int i = 0; i < statuses.length; i++)
{
FileStatus status = statuses[i];
String fileName = status.getPath().getName();
// Ignore other auxiliary files
if (fileName.endsWith("edges"))
{ // Create an input split to read one file
splits.add(new TweetFileInputSplit(i, status.getPath())); } }
return splits.toArray(new TweetFileInputSplit[splits.size()]); }
private FileSystem getFileSystem() throws IOException
{ // Lazy initialization since FileSystem is not serializable
if (fileSystem == null)
{ try
{
fileSystem = FileSystem.get(new URI(path)); }
catch (URISyntaxException e)
{ throw new RuntimeException(e); }
} return fileSystem; }
// Input split to read one input file
class TweetFileInputSplit implements InputSplit
{
private final int splitNumber;
private final Path path;
public TweetFileInputSplit(int splitNumber, Path path)
{ this.splitNumber = splitNumber; this.path = path; }
@Override public int getSplitNumber()
{ return splitNumber; }
public Path getPath() { return path; }
}
Let’s go through this code line by line. At first, we create an instance of Flink’s FileSystem
class. This class allows performing operations with local or remote filesystems. For example, if we pass a URL like “hdfs://dir/” Flink will create an HDFS specific implementation.
Since we control how to read each split we have a lot of leeway regarding how to split input data. In this code we every split will contain a file name to read in parallel, but we could split files into blocks or group several files together in a single split.
The last method that we need to implement is getStatistics
that returns information about data that we are about to read. Statistics contains three fields: total size of all files in bytes, number of records and average record width. We can only evaluate total files size in this case:
@Override public BaseStatistics
getStatistics(BaseStatistics cachedStatistics)
throws IOException
{
FileSystem fileSystem = createFileSystem();
FileStatus[] statuses = fileSystem.listStatus(new Path(path));
return new GraphStatistics(statuses.length);
}
private class GraphStatistics implements BaseStatistics
{
private long totalInputSize;
public GraphStatistics(long totalInputSize)
{ this.totalInputSize = totalInputSize; }
@Override public long getTotalInputSize()
{ return totalInputSize; }
@Override public long getNumberOfRecords()
{ return BaseStatistics.NUM_RECORDS_UNKNOWN; }
@Override public float getAverageRecordWidth()
{ return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; }
}
If we cannot provide any useful information about data to read we can simply return null
from the getStatistics
method.
One optimization that we could do is to use cached statistic that is passed the getStatistics
. To make use of it, we need to track time when previous statistics were calculated and recalculate it only if a filesystem was updated since then.
Reading Data
The second group of methods in the InputSource
interface is used for reading a single data split. In our case, it should read a single file with edges and produce an instance of TwitterFollower
for every line.
To do this, we need to implement four methods:
- open – this method is called to read records from a single input split
- reachedEnd – called to check if there are more records to read in a single input split
- nextRecord – called to read the next record (
TwitterFollower
in our case) - close – called to close all resources allocated for reading an input split
open
method in this implementation is quite simple. We just need to create a reader for a text file:
@Override public void open(TweetFileInputSplit split)
throws IOException
{
FileSystem fileSystem = createFileSystem();
this.reader = new BufferedReader(new InputStreamReader(
fileSystem.open(split.getPath())));
// Pre-read next line to easily check if we've reached the end of an input
//split
this.nextLine = reader.readLine(); }
As before we use Flink’s FileSystem
type to work with files. Its open
method returns an InputStream
for a file on a file system. Then we pre-read the first line from the opened file. We do this so that we can use in the nextRecord
implementation that Flink is calling to check if any more items that can be read from a current split. All we need to do is to check if we have reached the end of the file:
@Override public boolean reachedEnd()
throws IOException {
return nextLine == null;
}
Close method simply closes the file reader:
@Override public void close() throws IOException {
if (reader != null) {
reader.close();
}
}
Now the only thing that is left is to read records from the file. To do this, we need to implement the nextRecord
method that reads a single line and converts it into a single TwitterFollower
instance:
@Override public TwitterFollower nextRecord(TwitterFollower reuse)
throws IOException
{
String[] split = nextLine.split(" ");
int userId = Integer.parseInt(split[1]);
int followerId = Integer.parseInt(split[0]);
reuse.setUser(userId);
reuse.setFollower(followerId);
nextLine = reader.readLine();
return reuse;
}
Using Built-In Flink Classes
If you need to read data from a file system in Flink, I would not suggest to implement it yourself from scratch. Of course, it is interesting to understand how it can be implemented and useful to be able to implement one yourself, but Flink provides an abstract class called FileInputFormat
that can be a good starting point. It can be extended to read data from file systems and supports more advanced features such as reading data from folders recursively and more advanced splitting that tries to balance sizes of input splits.
If FileInputFormat
is too generic for you can use other more specific implementations as BinaryInputFormat
for reading formats that use binary blocks of fixed size or DelimitedInputFormat
that can be inherited if you need to implement a format with delimiter separated records.
Conclusions
In this post, you have learned how to read data from custom source in Flink, and now you can implement one yourself.
If you want to read the final version of the InputSource
from this article, you can find it in my GithHub repo.
Also, please check out my Understanding Apache Flink course available through Pluralsight. If you'd like a preview of what's going to be covered, take a look at this video. Thanks!
Published at DZone with permission of Ivan Mushketyk, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Implementing RBAC in Quarkus
-
Competing Consumers With Spring Boot and Hazelcast
-
AI Technology Is Drastically Disrupting the Background Screening Industry
-
Does the OCP Exam Still Make Sense?
Comments