DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
The Latest "Software Integration: The Intersection of APIs, Microservices, and Cloud-Based Systems" Trend Report
Get the report
  1. DZone
  2. Coding
  3. Java
  4. Monitoring Wikipedia Edit Streams with Apache Flink and Packaging the Application with Dependencies

Monitoring Wikipedia Edit Streams with Apache Flink and Packaging the Application with Dependencies

Now that you know what Apache Flink is, let's start using it! Learn how to perform real-time stream processing with Flink and more!

Md. Rezaul Karim user avatar by
Md. Rezaul Karim
·
Jan. 03, 17 · Opinion
Like (6)
Save
Tweet
Share
8.55K Views

Join the DZone community and get the full member experience.

Join For Free

In my previous article, I discussed how Apache Flink introduced a new wave in real-time stream processing. In this article, we will see how to perform real-time stream processing with Apache Flink. A step-by-step example will be shown that collects real-time Wikipedia article change/edit as data streams. More specifically, we will see which article are being edited currently and what's the difference with the original ones. 

Moreover, we will see how to package Flink streaming applications as executable Java Archive (jar) files with all the required dependencies to achieve portability so that we can execute the streaming job somewhere else as platform independent manner. 

Apache Flink Ecosystem for Stream Processing

The core of Apache Flink or the Flink ecosystem is shown in the architecture diagram in Figure 1. It can be considered as the Kernel of Flink which is a distributed streaming data flow engine that provides fault tolerant data distribution and communication. Unlike Apache Spark's Directed Acyclic Graph (DAG) like computation, the streaming data flow engine in Flink interprets every program as a data flow graph which is computationally different than Spark's one. 

Figure 1: Apache Flink Ecosytem

Figure 1: Apache Flink Ecosystem (image credit: https://flink.apache.org/)

In the next section, we will see an example of using the DataStream API that computationally collects Wikipedia content edits operation through the Internet Relay Chat (IRC) channel. 

Please be noted that the IRC is an application layer protocol that facilitates the transfer of messages in the form of text. The chat process works on a client/server networking model. 

Monitoring the Wikipedia Edit Stream

Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to read this channel in Flink and count the number of bytes that each user edits within a given window of time. This is easy enough to implement in a few minutes using Flink but will give you a good foundation from which to start building more complex analysis programs on your own. 

Viewing Wikipedia Edit Stream: A Flink-based Solution

For this demo, I am assuming that the environment for Flink computation is already ready on your IDE like Eclipse. If not refer next section below for how to setup the working environment on Eclipse. The demo which is adapted from [1] and modified to fullfill our requirements is consisting of several steps as shown below:

Step-1: Loading required packages and APIs

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

Step-2: Get the stream execution environment for Flink-based streaming

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Step-3: Creating the data stream by adding the stream execution environment

DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

As you can see the above statement creates a DataStream object/variable of type WikipediaEditEvent, which is basically a class that takes the following parameters and creates the stream of events for Wikipedia edit as steam: 

1. timestamp 
2. channel
3. title
4. diffUrl
5. user
6. byteDiff
7. summary
8. flags

Step-4: Creating the KeyedStream

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        public String getKey(WikipediaEditEvent event) {
          return event.getUser();
        }
      });

A KeyedStream represents a DataStream on which operator state is partitioned by key using a provided KeySelector. Typical operations supported by a DataStream are also possible
on a KeyedStream, with the exception of partitioning methods such as shuffle, forward and keyBy. It produces, Reduce-style operations, such as:
1. reduce
2. sum
3. fold
that work on elements that have the same key.
Step-5: Creating DataStream of the byte difference for the users

Let's collect the data streams as a tuple of the article title and how much are the latest changes. 

    DataStream<Tuple2<String, Long>> result = keyedEdits
      .timeWindow(Time.seconds(5))
      .fold(new Tuple2<String, Long>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
          acc.f0 = event.getTitle(); 
          acc.f1 += event.getByteDiff();
          return acc;
        }
      });

The DataStream class creates a new data stream in the given execution environment with partitioning set to forward by default.

Step-6: Printing the edit streams

result.print();

Step-7: Execute the streaming environment

env.execute();

You should observe the following output upon execution the above steps:

6> (Rashaan Salaam,-1)
2> (Thomas William Salmon,2041)
2> (Talk:Constitution of the United Kingdom,898)
7> (Oregano (software),2)
5> (Bač, Serbia,-32)
6> (17 in sports,43)
5> (16 Milwaukee riots,806)
1> (Mike Slyziuk,-210)
8> (Veeraalluri,830)
1> ( Your Eyez Only,3)
5> (Talk:Elpida Astromeriti,1230)
3> (Draft:The Sisters of Mary School Philippines,4)
8> (Yuddham,4)
3> (Case Cookus,631)
6> (User talk:2602:306:370F:18B0:9CC3:9C81:E5E1:A4C7,915)
7> (Anna Vissi (1981 album),8)
5> (Richard Di Natale,-57)
1> (Morten Søgård,-127)
5> (Water wheel,1)
1> (Talk:Saraiki dialect,-5696)
7> (Nova Bus LF Series,176)
7> (Karnıyarık,287)
.
.
.
.

The execute() method here works a bit similar to start() and the awaitTermination() methods in Spark. However, in Spark, the concept of streaming is discrete stream or DStream. However, here we are getting direct data stream that means real-time streaming!  

Packaging the Streaming Application With Dependencies

In this section, we will see how to package an application with all the required dependencies for running the Flink application somewhere else towards the portability. 

Step-1: Create a Maven project on Eclipse

Create a Maven project on Eclipse. For this demo, I used Eclipse Mars and downloaded the Maven nature. For this step, you can refer this tutorial how to. 

Step-2: Adding required dependencies via Maven project file (pom.xml)

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-wikiedits_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>

For this demo, I used Flink version 1.0.0. However, you could try using the latest release as well. 

Step-3: Adding project properties

Now add Java version (JDK version) and Flink version as follows:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jdk.version>1.8</jdk.version>
        <flink.version>1.0.0</flink.version>
    </properties>

Step-4: Adding build properties

In this step you need to add the following in the pom.xml file:

1. download source code in Eclipse
2. Set a compiler level
3. Maven Assembly Plugin
     - get all project dependencies
     - MainClass in manifest make an executable jar 
     - bind to the packaging phase

For the ease, I have provided a sample here that I used to prepare the demo:       

<build>
        <plugins>
            <!-- download source code in Eclipse, best practice -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.9</version>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <downloadJavadocs>false</downloadJavadocs>
                </configuration>
            </plugin>
            <!-- Set a compiler level -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <configuration>
                    <shadeTestJar>true</shadeTestJar>
                </configuration>
            </plugin>
            <!-- Maven Assembly Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4.1</version>
                <configuration>
                    <!-- get all project dependencies -->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <!-- MainClass in mainfest make a executable jar -->
                    <archive>
                        <manifest>
                            <mainClass>com.example.ApacheFlinkDemo.WikipediaAnalysis</mainClass>
                        </manifest>
                    </archive>

                    <property>
                        <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
                        <value>true</value>
                    </property>

                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <!-- bind to the packaging phase -->
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Here most importantly, I have pointed the main class with the full package declaration (i.e. com.example.ApacheFlinkDemo.WikipediaAnalysis)

Step-5: Maven update

Once you have added all the dependencies in step 2, 3 and 4, you need to update the Maven project. Right-click on the project name and go to Maven then Update project (Alt+F5) as shown in Figure 2 below: 

Figure 2: Maven project update

Figure 2: Maven project update

However, once you update the project, the JRE System Library usually changes to JRE which will disallow to install all the required packages and jar files as the dependencies. To get rid off this problem, Right-click on the 'JRE System Library' folder => Properties => Select the 'Alternate JRE:' as JDK (show the JDK installation directory -i.e. C:\Program Files\Java\jdk1.8.0_102).

Step-6: Maven install

This will install all the required jar files under the 'Maven Dependencies' in your project path. For doing so, right-click on your project=> 'Run as' => 'Maven install' as shown in Figure 3 below: 

Figure 3: Installing dependencies using Maven install

Figure 3: Installing dependencies using Maven install

If the build is successful,  you should see all the required jar files under the Maven Dependencies directory.

Step-7: Maven build (generating jar with dependencies)

This will build your project and generate a jar file including all the dependencies. For doing so, just right-click on your project name => 'Run as' => 'Maven build' as (shown in Figure 4) => write 'clean package' in the box => 'Apply' => 'Run' (as shown in Figure 5): 

Figure 4: Building and generating jar with all the depnendencies

Figure 4: Building and generating jar with all the dependencies 

Figure 5: Project cleaning and generating jar

Figure 5: Project cleaning and generating jar

Step-8: Get the jar

If the above step completes successfully, the jar should be generated under the 'target' folder under your project tree as shown in Figure 6 below:

Figure 6: Jar with required dependencies

Figure 6: Jar with required dependencies

It is to be noted the name of the jar file here is 'ApacheFlinkDemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar'; where 'ApacheFlinkDemo' indicates theMaven project name and the latter part is the jar descriptor as specified in the pom.xml file. 

Step-9: Portability/ Executing the jar somewhere else (or locally)

Now let's see how to reuse the jar file that we created in the previous step. Suppose you want to perform the computation on the local cluster or remote (e.g. Cloud computing) nodes. In this scenario, you can ship this jar file to your desired node(s) and perform the computation. It's pretty straight forward and simple. The same jar file can be executed on your Windows, Linux, or MacOS environment in platform independent manner by using below command:

$ java -jar <jar_file_name>

For our case, it would be as follows as shown in Figure 7:

Image title

 Figure 7: Running the jar file on Windows 7 environment

$ java -jar ApacheFlinkDemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar

You should be continuously receiving the edit streams as shown in Figure 8 (with different values since each time you will get new streams):

Figure 8: Wikipedia edit streams

Figure 8: Wikipedia edit streams

Conclusion

This way, we can package any streaming project/application with Flink. You can apply same steps even if your application is written in Scala on Eclipse. However, you will have to have the Scala nature in the Eclipse. 

Source Code Availability

The source code including the Maven friendly pom.XML file can be downloaded from my GitHub repositories here.

References:

1. https://ci.apache.org/projects/flink/flink-docs-release-1.0/quickstart/run_example_quickstart.html


Data stream Stream processing Dependency Apache Flink application JAR (file format) Apache Maven Internet Relay Chat Eclipse

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • What Is Advertised Kafka Address?
  • Rust vs Go: Which Is Better?
  • Introduction To OpenSSH
  • Testing Level Dynamics: Achieving Confidence From Testing

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: