Monitoring Wikipedia Edit Streams with Apache Flink and Packaging the Application with Dependencies
Now that you know what Apache Flink is, let's start using it! Learn how to perform real-time stream processing with Flink and more!
Join the DZone community and get the full member experience.
Join For FreeIn my previous article, I discussed how Apache Flink introduced a new wave in real-time stream processing. In this article, we will see how to perform real-time stream processing with Apache Flink. A step-by-step example will be shown that collects real-time Wikipedia article change/edit as data streams. More specifically, we will see which article are being edited currently and what's the difference with the original ones.
Moreover, we will see how to package Flink streaming applications as executable Java Archive (jar) files with all the required dependencies to achieve portability so that we can execute the streaming job somewhere else as platform independent manner.
Apache Flink Ecosystem for Stream Processing
The core of Apache Flink or the Flink ecosystem is shown in the architecture diagram in Figure 1. It can be considered as the Kernel of Flink which is a distributed streaming data flow engine that provides fault tolerant data distribution and communication. Unlike Apache Spark's Directed Acyclic Graph (DAG) like computation, the streaming data flow engine in Flink interprets every program as a data flow graph which is computationally different than Spark's one.
Figure 1: Apache Flink Ecosystem (image credit: https://flink.apache.org/)
In the next section, we will see an example of using the DataStream API that computationally collects Wikipedia content edits operation through the Internet Relay Chat (IRC) channel.
Please be noted that the IRC is an application layer protocol that facilitates the transfer of messages in the form of text. The chat process works on a client/server networking model.
Monitoring the Wikipedia Edit Stream
Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to read this channel in Flink and count the number of bytes that each user edits within a given window of time. This is easy enough to implement in a few minutes using Flink but will give you a good foundation from which to start building more complex analysis programs on your own.
Viewing Wikipedia Edit Stream: A Flink-based Solution
For this demo, I am assuming that the environment for Flink computation is already ready on your IDE like Eclipse. If not refer next section below for how to setup the working environment on Eclipse. The demo which is adapted from [1] and modified to fullfill our requirements is consisting of several steps as shown below:
Step-1: Loading required packages and APIs
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
Step-2: Get the stream execution environment for Flink-based streaming
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Step-3: Creating the data stream by adding the stream execution environment
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
As you can see the above statement creates a DataStream object/variable of type WikipediaEditEvent, which is basically a class that takes the following parameters and creates the stream of events for Wikipedia edit as steam:
1. timestamp
2. channel
3. title
4. diffUrl
5. user
6. byteDiff
7. summary
8. flags
Step-4: Creating the KeyedStream
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
A KeyedStream represents a DataStream on which operator state is partitioned by key using a provided KeySelector. Typical operations supported by a DataStream are also possible
on a KeyedStream, with the exception of partitioning methods such as shuffle, forward and keyBy. It produces, Reduce-style operations, such as:
1. reduce
2. sum
3. fold
that work on elements that have the same key.
Step-5: Creating DataStream of the byte difference for the users
Let's collect the data streams as a tuple of the article title and how much are the latest changes.
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<String, Long>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getTitle();
acc.f1 += event.getByteDiff();
return acc;
}
});
The DataStream class creates a new data stream in the given execution environment with partitioning set to forward by default.
Step-6: Printing the edit streams
result.print();
Step-7: Execute the streaming environment
env.execute();
You should observe the following output upon execution the above steps:
6> (Rashaan Salaam,-1)
2> (Thomas William Salmon,2041)
2> (Talk:Constitution of the United Kingdom,898)
7> (Oregano (software),2)
5> (Bač, Serbia,-32)
6> (17 in sports,43)
5> (16 Milwaukee riots,806)
1> (Mike Slyziuk,-210)
8> (Veeraalluri,830)
1> ( Your Eyez Only,3)
5> (Talk:Elpida Astromeriti,1230)
3> (Draft:The Sisters of Mary School Philippines,4)
8> (Yuddham,4)
3> (Case Cookus,631)
6> (User talk:2602:306:370F:18B0:9CC3:9C81:E5E1:A4C7,915)
7> (Anna Vissi (1981 album),8)
5> (Richard Di Natale,-57)
1> (Morten Søgård,-127)
5> (Water wheel,1)
1> (Talk:Saraiki dialect,-5696)
7> (Nova Bus LF Series,176)
7> (Karnıyarık,287)
.
.
.
.
The execute() method here works a bit similar to start() and the awaitTermination() methods in Spark. However, in Spark, the concept of streaming is discrete stream or DStream. However, here we are getting direct data stream that means real-time streaming!
Packaging the Streaming Application With Dependencies
In this section, we will see how to package an application with all the required dependencies for running the Flink application somewhere else towards the portability.
Step-1: Create a Maven project on Eclipse
Create a Maven project on Eclipse. For this demo, I used Eclipse Mars and downloaded the Maven nature. For this step, you can refer this tutorial how to.
Step-2: Adding required dependencies via Maven project file (pom.xml)
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
For this demo, I used Flink version 1.0.0. However, you could try using the latest release as well.
Step-3: Adding project properties
Now add Java version (JDK version) and Flink version as follows:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<flink.version>1.0.0</flink.version>
</properties>
Step-4: Adding build properties
In this step you need to add the following in the pom.xml file:
1. download source code in Eclipse
2. Set a compiler level
3. Maven Assembly Plugin
- get all project dependencies
- MainClass in manifest make an executable jar
- bind to the packaging phase
For the ease, I have provided a sample here that I used to prepare the demo:
<build>
<plugins>
<!-- download source code in Eclipse, best practice -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>
<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<shadeTestJar>true</shadeTestJar>
</configuration>
</plugin>
<!-- Maven Assembly Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>com.example.ApacheFlinkDemo.WikipediaAnalysis</mainClass>
</manifest>
</archive>
<property>
<name>oozie.launcher.mapreduce.job.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Here most importantly, I have pointed the main class with the full package declaration (i.e. com.example.ApacheFlinkDemo.WikipediaAnalysis)
Step-5: Maven update
Once you have added all the dependencies in step 2, 3 and 4, you need to update the Maven project. Right-click on the project name and go to Maven then Update project (Alt+F5) as shown in Figure 2 below:
Figure 2: Maven project update
However, once you update the project, the JRE System Library usually changes to JRE which will disallow to install all the required packages and jar files as the dependencies. To get rid off this problem, Right-click on the 'JRE System Library' folder => Properties => Select the 'Alternate JRE:' as JDK (show the JDK installation directory -i.e. C:\Program Files\Java\jdk1.8.0_102).
Step-6: Maven install
This will install all the required jar files under the 'Maven Dependencies' in your project path. For doing so, right-click on your project=> 'Run as' => 'Maven install' as shown in Figure 3 below:
Figure 3: Installing dependencies using Maven install
If the build is successful, you should see all the required jar files under the Maven Dependencies directory.
Step-7: Maven build (generating jar with dependencies)
This will build your project and generate a jar file including all the dependencies. For doing so, just right-click on your project name => 'Run as' => 'Maven build' as (shown in Figure 4) => write 'clean package' in the box => 'Apply' => 'Run' (as shown in Figure 5):
Figure 4: Building and generating jar with all the dependencies
Figure 5: Project cleaning and generating jar
Step-8: Get the jar
If the above step completes successfully, the jar should be generated under the 'target' folder under your project tree as shown in Figure 6 below:
Figure 6: Jar with required dependencies
It is to be noted the name of the jar file here is 'ApacheFlinkDemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar'; where 'ApacheFlinkDemo' indicates theMaven project name and the latter part is the jar descriptor as specified in the pom.xml file.
Step-9: Portability/ Executing the jar somewhere else (or locally)
Now let's see how to reuse the jar file that we created in the previous step. Suppose you want to perform the computation on the local cluster or remote (e.g. Cloud computing) nodes. In this scenario, you can ship this jar file to your desired node(s) and perform the computation. It's pretty straight forward and simple. The same jar file can be executed on your Windows, Linux, or MacOS environment in platform independent manner by using below command:
$ java -jar <jar_file_name>
For our case, it would be as follows as shown in Figure 7:
Figure 7: Running the jar file on Windows 7 environment
$ java -jar ApacheFlinkDemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
You should be continuously receiving the edit streams as shown in Figure 8 (with different values since each time you will get new streams):
Figure 8: Wikipedia edit streams
Conclusion
This way, we can package any streaming project/application with Flink. You can apply same steps even if your application is written in Scala on Eclipse. However, you will have to have the Scala nature in the Eclipse.
Source Code Availability
The source code including the Maven friendly pom.XML file can be downloaded from my GitHub repositories here.
References:
1. https://ci.apache.org/projects/flink/flink-docs-release-1.0/quickstart/run_example_quickstart.html
Opinions expressed by DZone contributors are their own.
Comments