DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Implement a Distributed Database to Your Java Application
  • AI: The Future of HealthTech
  • High-Performance Batch Processing Using Apache Spark and Spring Batch
  • The Magic of Apache Spark in Java

Trending

  • Unlocking Data with Language: Real-World Applications of Text-to-SQL Interfaces
  • Mastering Fluent Bit: Installing and Configuring Fluent Bit on Kubernetes (Part 3)
  • Can You Run a MariaDB Cluster on a $150 Kubernetes Lab? I Gave It a Shot
  • Ethical AI in Agile
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Getting Started With Batch Processing Using Apache Flink

Getting Started With Batch Processing Using Apache Flink

If you've been following software development news recently you probably heard about the new project called Apache Flink. I've already written about it a bit...

By 
Ivan Mushketyk user avatar
Ivan Mushketyk
·
Oct. 13, 17 · Tutorial
Likes (9)
Comment
Save
Tweet
Share
15.8K Views

Join the DZone community and get the full member experience.

Join For Free

If you've been following software development news recently, you probably heard about a new project called Apache Flink. I've already written about it a bit here and here, but if you are not familiar with it, Apache Flink is a new-generation big data processing tool that can process either finite sets of data (this is also called batch processing) or potentially infinite streams of data (stream processing). In terms of new features, many believe Apache Flink is a game changer and can even replace Apache Spark in the future.

In this article, I'll introduce you to how you can use Apache Flink to implement simple batch processing algorithms. We will start with setting up our development environment, and then we will see how we can load data, process a dataset, and write data back to an external system.

Why Batch Processing?

You might have heard that stream processing is "the new hot thing right now" and that Apache Flink is a tool for stream processing. This can pose a question: Why do we need to learn how to implement batch processing applications?

While it is true that stream processing has become more and more widespread, many tasks still require batch processing. Also, if you are just getting started with Apache Flink, in my opinion, it is better to start with batch processing since it is simpler and in a way resembles working with a database. Once you've covered batch processing, you can learn about stream processing where Apache Flink really shines!

How to Follow Examples

If you want to implement some Apache Flink applications yourself, first you need to create a Flink project. In this article, we are going to write applications in Java, but you can also write Flink application in Scala, Python, or R.

To create a Flink Java project, execute the following command:

 mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.3.2

After you enter group id, artifact id, and a project version, this command will create the following project structure:

. ├── pom.xml └── src └── main ├── java │ └── flinkProject │ ├── BatchJob.java │ ├── SocketTextStreamWordCount.java │ ├── StreamingJob.java │ └── WordCount.java └── resources └── log4j.properties

The most important here is the massive pom.xml that specifies all the necessary dependencies. Automatically created Java classes are examples of some simple Flink applications that you can take a look at, but we don't need them for our purposes.

To start developing your first Flink application, create a class with the main method like this:

public class FilterMovies { public static void main(String[] args) throws Exception { // Create Flink execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // We will write our code here // Start Flink application env.execute(); } }

There is nothing special about this main method. All we have to do is to add some boilerplate code.

First, we need to create a Flink execution environment that will behave differently if you run it on a local machine or in a Flink cluster:

  • On a local machine, it will create a full-fledged Flink cluster with multiple local nodes. This is a good way to test how your application will work in a realistic environment
  • On a Flink cluster, it won't create anything but will use existing cluster resources instead

Alternatively, you could create a collection environment like this:

 ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); 

This will create a Flink execution environment that, instead of running Flink application on a local cluster, will emulate all operations using in-memory collections in a single Java process. Your application will run faster, but this environment some subtle differences from a local cluster with multiple nodes.

Where Do We Start?

Before we can do anything, we need to read data into Apache Flink. We can read data from numerous systems, including local filesystem, S3, HDFS, HBase, Cassandra, etc. No matter where we read a dataset from, Apache Flink allows us to work with data in a uniform way using the DataSet class:

DataSet<Integer> numbers = ... 

All items in a dataset should have the same type. The single generics parameter specifies a type of the data that is stored in a dataset.

To read data from a file, we can use the readTextFile method that will read lines in a file line by line and return a dataset of type String:

DataSet<String> lines = env.readTextFile("path/to/file.txt"); 

If you specify a file path like this, Flink will attempt to read a local file. If you want to read a file from HDFS, you need to specify the hdfs:// protocol:

env.readCsvFile("hdfs:///path/to/file.txt") 

Flink also has support for CSV files, but in this case, it won't return a dataset of strings. It will try to parse every line and return a dataset of Tuple instances:

DataSet<Tuple2<Long, String>> lines = env.readCsvFile("data.csv") .types(Long.class, String.class);

Tuple2 is a class that stores an immutable pair of two fields, but there are other classes like Tuple0, Tuple1, Tuple3, up to Tuple25 that store from zero to twenty-five fields. Later, we will see how to work with these classes.

The types method specifies types and number of columns in a CSV file, so Flink could read a parse them.

We can also create small datasets that are very good for small experiments and unit tests:

// Create from a list DataSet<String> letters = env.fromCollection(Arrays.asList("a", "b", "c")); // Create from an array DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

A question that you may ask is what data we can store in a DataSet? Not every Java type can be used in a dataset, and there are four different categories of types that you can use:

  • Built-in Java types and POJO classes
  • Flink tuples and Scala case classes
  • Values, which are special mutable wrappers for Java primitive types that you can use to increase performance (I'll write about this in one of the upcoming articles)
  • Implementations of Hadoop Writable interface

Processing Data With Apache Flink

Now to the data processing part! How do you implement an algorithm for processing your data? To do this, you can use a number of operations that resemble Java 8 streams operations, such as:

  • map: Converts items in a dataset using a user-defined function. Every input element is converted into exactly one output element.
  • filter: Filters items in a dataset according to a user-defined function.
  • flatMap: Similar to the map operator, but allows returning zero, one, or many elements.
  • groupBy: Groups elements by a key. Similar to the GROUP BY operator in SQL.
  • project: Select specified fields in a dataset of tuples, similar to the SELECT operator from SQL.
  • reduce: Combines elements in a dataset into a single value using a user-defined function.

Keep in mind that the biggest difference between Java streams and these operations is that Java 8 works with data in memory and can access local data, while Flink works with data on a cluster in a distributed environment.

Let's take a look at a simple example that uses these operations. The following example is very straightforward. It creates a dataset of numbers, which squares every number and filters out all odd numbers.

// Create a dataset of numbers DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7); // Square every number DataSet<Integer> result = numbers.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer integer) throws Exception { return integer * integer; } }) // Leave only even numbers .filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer integer) throws Exception { return integer % 2 == 0; } });

If you have any experience with Java 8, you are probably wondering why I don't use lambdas here. We can use lambdas here but it can cause some complications, as I've written here.

Saving Data Back

After we've finished processing our data it would make sense to save the result of our hard work. Flink can store data into a number of third-party systems such as HDFS, S3, Cassandra, etc.

For example, to write data to a file, we need to use the writeAsText method from the DataSet class:

DataSet<Integer> ds = ... ds.writeAsText("path/to/file");

For debugging/testing purposes Flink can write data to standard output or to standard output:

DataSet<Integer> ds = ... // Output dataset to the standard output ds.print(); // Output dataset to the standard err ds.printToErr()

More Complicated Example

To implement some meaningful algorithms we need to first download a Grouplens movies dataset. It contains several CSV files with information about movies and movie ratings. We are going to work with the movies.csv file from this dataset which contains a list of all movies and looks like this:

movieId,title,genres 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy 2,Jumanji (1995),Adventure|Children|Fantasy 3,Grumpier Old Men (1995),Comedy|Romance 4,Waiting to Exhale (1995),Comedy|Drama|Romance 5,Father of the Bride Part II (1995),Comedy 6,Heat (1995),Action|Crime|Thriller 7,Sabrina (1995),Comedy|Romance 8,Tom and Huck (1995),Adventure|Children 9,Sudden Death (1995),Action 10,GoldenEye (1995),Action|Adventure|Thriller

It has three columns:

  • movieId: A unique movie id for a movie in this dataset.
  • title: A title of the movie.
  • genres: A | separated list of genres for each movie.

We can now load this CSV file in Apache Flink and perform some meaningful processing. Here, we will load a file from a local filesystem, while in a realistic environment you would read a much bigger dataset and it would probably reside in a distributed system, such as S3 or HDFS.

In this demo, let's find all movies of the "Action" genre. Here is a code snippet that that does this:

// Load dataset of movies DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv") .ignoreFirstLine() .parseQuotedStrings('"') .ignoreInvalidLines() .types(Long.class, String.class, String.class); DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() { @Override public Movie map(Tuple3<Long, String, String> csvLine) throws Exception { String movieName = csvLine.f1; String[] genres = csvLine.f2.split("\\|"); return new Movie(movieName, new HashSet<>(Arrays.asList(genres))); } }); DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() { @Override public boolean filter(Movie movie) throws Exception { return movie.getGenres().contains("Action"); } }); filteredMovies.writeAsText("output.txt");

Let's break it down. First, we read a CSV file using the readCsvFile method:

DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv") // ignore CSV header .ignoreFirstLine() // Set strings quotes character .parseQuotedStrings('"') // Ignore invalid lines in the CSV file .ignoreInvalidLines() // Specify types of columns in the CSV file .types(Long.class, String.class, String.class);

Using helper methods, we specify how to parse strings in the CSV file and that we need to skip the first line. In the last line, we specify a type of each column in the CSV file and Flink will parse data for us.

Now when we have a dataset loaded in a Flink cluster, we can do some data processing. First, we parse a list of genres for every movie using the map method:

DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() { @Override public Movie map(Tuple3<Long, String, String> csvLine) throws Exception { String movieName = csvLine.f1; String[] genres = csvLine.f2.split("\\|"); return new Movie(movieName, new HashSet<>(Arrays.asList(genres))); } });

To transform every movie we need to implement the MapFunction that will receive every CSV record as a Tuple3 instance and will convert it into the Movie POJO class:

class Movie { private String name; private Set<String> genres; public Movie(String name, Set<String> genres) { this.name = name; this.genres = genres; } public String getName() { return name; } public Set<String> getGenres() { return genres; } }

If you recall the structure of the CSV file, the second column contains a name of a movie and the third column contains a list of genres. Hence, we access these columns using fields f1 and f2 respectively.

Now, when we have a dataset of movies, we can implement the core part of our algorithm and filter all action movies:

DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() { @Override public boolean filter(Movie movie) throws Exception { return movie.getGenres().contains("Action"); } });

This will only return movies that contain "Action" in the set of genres.

Now the last step is very straightforward — we store result data into a file:

 filteredMovies.writeAsText("output.txt"); 

This simply stores the result data into a local text file, but as with the readTextFile method, we could write this file into HDFS or S3 by specifying a protocol like hdfs://.

More Information

This was an introductory article, and there is much more to Apache Flink. I will write more articles about Flink in the near future, so stay tuned! You can read my other articles here, or you can you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink. Here is a short preview of this course.

Apache Flink Stream processing Batch processing Big data Database application cluster AI Apache Spark Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • Implement a Distributed Database to Your Java Application
  • AI: The Future of HealthTech
  • High-Performance Batch Processing Using Apache Spark and Spring Batch
  • The Magic of Apache Spark in Java

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!