DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • High-Performance Java Serialization to Different Formats
  • Implement a Distributed Database to Your Java Application
  • Java Memory Management
  • Java in Mule for Java Programmers

Trending

  • Microsoft Azure Synapse Analytics: Scaling Hurdles and Limitations
  • Understanding Java Signals
  • Solid Testing Strategies for Salesforce Releases
  • Ensuring Configuration Consistency Across Global Data Centers
  1. DZone
  2. Coding
  3. Java
  4. The Foundations for Building an Apache Flink Application

The Foundations for Building an Apache Flink Application

By 
Lior Shalom user avatar
Lior Shalom
DZone Core CORE ·
Apr. 16, 20 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
10.8K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we'll work to give you a better understanding of stream processing using Flink from the bottom-up; cloud services and other platforms provide stream processing solutions (for some, Flink is integrated under the hood). If you miss the basics, this guide is for you.

Stream processing in Flink

Stream processing in Flink

Our monolith solution does not cope with the increased load of incoming data, and thus it has to evolve. This is the time for the next generation of our product. Stream processing is the new data ingestion paradigm, as compared to the batch processing we’ve implemented so far.

So, my team embarked on processing information using Flink. There is an abundance of articles about Flink’s features and benefits. Cloudera shared an excellent slide about Flink; this article is a practical hands-on guide on how to build a simple stream processing application starting from the basics.

Apache Flink in Short

Apache Flink is a scalable, distributed stream-processing framework, meaning it is able to process continuous streams of data. This framework provides a variety of functionalities: sources, stream transformations, parallel processing, scheduling, resource assignments, and a wide range of destinations. Some of its connectors are HDFS, Kafka, Amazon Kinesis, RabbitMQ, and Cassandra.

Flink is known for its high throughput and low latency, supporting exactly-one consistency (all data is processed once without duplications), and it also supports high availability. Like any other successful open-source product, it has a broad community that cultivates and extends its features.

Flink can process indefinite data streams or definite data sets. This blog will focus on the former (working with DataStream objects).

Streams Processing :  The Challenges

Nowadays, when IoT devices and other sensors are ubiquitous, data is flowing endlessly from many sources. This endless flow of data forces the traditional batch computation to adapt.

  • This data is unbounded; there is no start and end.
  • Unpredictable and inconsistent intervals of new data.
  • Data can be out of order, with various timestamps.

Due to these unique characteristics, processing and querying data are intricate tasks. Results change rapidly, and it is almost impossible to obtain definite conclusions; at times, the computation may be hindered when trying to produce valid results. Moreover, the results are not repeatable, since the data keeps on changing. Lastly, the latency is a factor, as it impacts the accuracy of the results.

Apache Flink copes with these problems by processing based on timestamps in the source of the incoming data. It has a mechanism to accumulate events based on their timestamp before applying the processing execution. It eliminates the use of micro-batches, and with that, it improves the accuracy of the results.

Flink implements exactly once consistency, which ensures the correctness of computations without the developer programming for it.

The Foundations :  Flink Packages Building Blocks

Flink predominantly ingests streams from various sources. The basic object is DataStream<T> , which represents a stream of elements of the same type; its elements’ type is defined in compile time by setting the generic type T (read here about the DataStream object).

The DataStream object contains many useful methods to transform, split, and filter its data[1]. Familiarity with the methods map, reduce, and filter is a good start; these are the main transformation methods:

Map: receives T object and returns a result of an object of type R; the MapFunction is applied exactly once on each element of the DataStream object.

SingleOutputStreamOperator<R> map(MapFunction<T,R> mapper)


Reduce: receives two consecutive values and returns one object after combining them into the same object type; this method runs on all values in the group until only a single value remains.

T reduce(T value1, T value2)


Filter: receives T object and returns a stream of T objects; this method runs on each element in the DataStream but returns only those in which the function returns true.

SingleOutputStreamOperator<T> filter(FilterFunction<T> filter)


Data Sink

Besides transforming the data, Flink’s main purpose is to steer streams after processing them into different destinations. These destinations are called “sinks”. Flink has built-in sinks (text, CSV, socket), as well as out-of-the-box connectors to other systems (such as Apache Kafka)[2].

Flink Event Timestamps

The notion of time is paramount for processing data streams. There are three options to define a timestamp:

Processing time (the default option): refers to the system time of the machine that executes the stream processing operation, and thus it is the simplest notion of time; it does not require any coordination between streams and machines. Since it is based on the machine’s time, it provides the best performance and the lowest latency.

The drawback of using processing time is significant in distributed and asynchronous environments since it is not a deterministic method. The timestamp of the stream’s events can go out of sync if there’s a gap between machines’ clocks; network latency can also create a gap between the time an event left one machine and arrived at the other.

Java
xxxxxxxxxx
1
 
1
// Setting the Processing Time attribute of StreamExecutionEnvironment object
2
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);


Event time: refers to the time that each individual event received on its producing source, before entering Flink. The event time is embedded in the event itself and can be extracted so Flink can process it properly.
Since the timestamp is not set by Flink, there should be a mechanism to signal the event should be processed or not; this mechanism is called Watermark. 

This topic is beyond the scope of this blog-post (since I wanted to keep it concise); you can find more information in Flink documentation.

Java
xxxxxxxxxx
1
25
 
1
// Defining the Event Time as the timestamp method
2
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
3
DataStream<String> dataStream
4
   = streamEnv.readFile(auditFormat,
5
          dataDir,    // the progon of the events
6
          FileProcessingMode.PROCESS_CONTINUOUSLY,
7
          1000).
8
          assignTimestampsAndWatermarks(
9
                  new TimestampExtractor());
10

           
11
// ... more code ...
12

           
13
// Defining a class to extract the timestamp from the stream events
14
public class TimestampExtractor implements 
15
                AssignerWithPeriodicWatermarks<String>{
16
@Override
17
public Watermark getCurrentWatermark() {
18
      return new Watermark(System.currentTimeMillis()-maxTimeFrame);
19
   }
20

           
21
@Override
22
public long extractTimestamp(String str, long l) {
23
     return InputData.getDataObject(str).timestamp;
24
   }
25
}


Ingestion time: refers to the time that the event enters Flink; it is assigned once at the source, and thus is considered as more stable than processing time, which is assigned upon commencing the process.
Ingestion time cannot handle out-of-order events or late data, since the timestamp is set once the ingestion starts, as opposed to event time that has the feature to identify delayed events and handle them based on the watermarking mechanism.

Java
xxxxxxxxxx
1
 
1
// Setting the Ingestion Time attribute of StreamExecutionEnvironment object
2
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);


You can read more about timestamp and how it affects the stream processing in the following link.

Windowing

By definition, a stream is endless; therefore, the mechanism for processing is by defining frames (a time-based window, for example). With that, the stream is divided into buckets for aggregation and analysis. The window definition is an operation on a DataStream object or one of its inheritors.

There are several time-based windows:

Tumbling Window (the Default Configuration)
The stream is divided into equivalent-sized windows, without any overlapping. As long as the stream flows, Flink calculates the data based on this fixed time-frame continuously.

Tumbling Window
Tumbling Window


Code implementation:

Java
xxxxxxxxxx
1
 
1
// To be used for a non-keyed stream
2
public AllWindowedStream<T,TimeWindow> timeWindowAll(Time size)
3
// Tumbling window for a key-based stream
4
public WindowedStream<T,KEY,TimeWindow> timeWindow(Time size)


Sliding Window
An overlapping window that is composed of window size and an offset (when to start the next window). With that, events can be processed in more than one window in a given time.

Sliding Window
Sliding Window


and this is how it looks in the code:

Java
xxxxxxxxxx
1
 
1
// sliding time window of 1 minute length and 30 secs trigger interval
2
dataStreamObject.timeWindow(Time.minutes(1), Time.seconds(30));


Session Window
Includes all events under the session’s boundary. A session ends when there is no activity or no events after a defined time-frame. This time-frame can be fixed or dynamic, based on the processed events. Theoretically, if the session’s gap between events is smaller than the size of the window, the session can never end.

Session Window
Session Window

The first code snippet below exemplifies a fixed time-based session (2 seconds). The second session window implements a dynamic window, base on the stream’s events.

Java
xxxxxxxxxx
1
 
1
// Defining a fixed session window of 2 seconds
2
dataStreamObject.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)));
3

           
4
// Defining a dynamic window session, which can be set by the stream elements
5
dataStreamObject.window(EventTimeSessionWindows.withDynamicGap((elem) -> {
6
        // return the session gap, which can be based on the stream's events
7
    }));


Global Window
Treats the entire stream as one single window.

Global Window
Global Window

Flink also enables the implementation of custom windows with user-defined logic, which will be a topic for another blog-post.

Other than time-based windows, there are other windows such as Count Window, which verges the limits by the number of the incoming events; once an X threshold has reached, Flink processes X events. The illustration below describes a count window of three elements:

Count Window

Count Window

After the theoretical introduction, let’s dive into a practical data flow. You can find more information about Apache Flink and stream processes on  the official website.

Steaming Flow Description

To recap the theoretical part, the below diagram portrays the main data flow of the codes samples in this blog-post. The flow below starts from a source (files are written into a folder) and continues with processing the events into objects.

The depicted implementation below is composed of two processing tracks. The one on the tops splits a stream into two side streams and then merges them to form a third type of stream. The scenario in the bottom processes a stream and then transfers the results into a sink.

Stream Processing Worflow

Stream Processing Workflow

The next part aims to convert the theoretical stream processing into tangible practice; you can find the full source code on GitHub.

Basic Stream Handling  (Example #1)

Starting with a basic application is much easier to grasp the concepts of Flink. In this application, the producer writes files into a folder, which simulates a flowing stream. Flink reads files from this folder, processes them, and writes a summary into a destination folder; this is the sink.

Now, let’s focus on the process part:

1. Converting the raw data into an object:
Java
xxxxxxxxxx
1
 
1
// Convert each record to an InputData object; each new line is considered a new record
2
DataStream<InputData> inputDataObjectStream
3
          = dataStream
4
           .map((MapFunction<String, InputData>) inputStr -> {
5
                //pring the raw data before onverting to an object
6
                System.out.println("--- Received event : " + inputStr);
7
                return InputData.getDataObject(inputStr);
8
          });

2. The code sample below converts the stream object (InputData) into a Tuple of string and integer. It extracts only certain fields from a stream of objects, grouping them by one field in quants of two seconds.

Java
xxxxxxxxxx
1
15
 
1
       // Convert each record to a Tuple with name and score
2
        DataStream<Tuple2<String, Integer>> userCounts
3
                = inputDataObjectStream
4
                .map(new MapFunction<InputData,Tuple2<String,Integer>>() {
5
6
                    @Override
7
                    public Tuple2<String,Integer> map(InputData item) {
8
                        return new Tuple2<String,Integer>(item.getName() ,item.getScore() );
9
                    }
10
                })
11
                .returns(Types.TUPLE(Types.STRING, Types.INT))
12
                .keyBy(0)  // returns KeyedStream<T, Tuple> based on the first item ('name' fields)
13
                //.timeWindowAll(Time.seconds(windowInterval)) // DO NOT use timeWindowAll for a key-based stream
14
                .timeWindow(Time.seconds(2)) // return WindowedStream<T, KEY, TimeWindow>
15
                .reduce((x,y) -> new Tuple2<String,Integer>( x.f0+"-"+y.f0, x.f1+y.f1));

3. Creating a destination for the stream (implementing data sink):
Java
xxxxxxxxxx
1
21
 
1
           // Define a time window and count the number of records
2
           DataStream<Tuple2<String,Integer>> inputCountSummary
3
                    = inputDataObjectStream
4
                    .map( item
5
                            -> new Tuple2<String,Integer>
6
                            (String.valueOf(System.currentTimeMillis()),1))
7
                    .returns(Types.TUPLE(Types.STRING ,Types.INT))
8
                    .timeWindowAll(Time.seconds(windowInterval)) // a tunbmling window
9
                    .reduce((x,y) -> // sum the numbers, until reaching a single result
10
                            (new Tuple2<String, Integer>(x.f0, x.f1 + y.f1)));
11
12
            // Setup a streaming file sink to the output directory
13
            final StreamingFileSink<Tuple2<String,Integer>> countSink
14
                    = StreamingFileSink
15
                        .forRowFormat(new Path(outputDir),
16
                                new SimpleStringEncoder<Tuple2<String,Integer>>
17
                                        ("UTF-8"))
18
                        .build();
19
20
            // Add the sink file stream to the DataStream; with that, the inputCountSummary will be written into the countSink path
21
            inputCountSummary.addSink(countSink);


Splitting Streams (Example #2)

In this example, we demonstrate how to split the main stream while using the side output streams. Flink enables producing multiple side streams from the main DataStream . The type of data resides in each side stream can vary from the main stream and from each side stream as well.

So, using a side output stream can kill two birds in one shot: splitting the stream and converting the stream type into multiple data types (can be unique for each side output stream).

The code sample below calls the ProcessFunction that splits a stream into two side stream based on a property of the input. To obtain the same result, we should have been using the function filter more than once.

The ProcessFunction collects certain objects (based on criteria) to the main output collector (captures in the SingleOutputStreamOperator), while adding other events to side outputs. The DataStream is vertically split and publish different formats.

Notice the side output stream definition is based on a unique output tag (OutputTag object).

Java
xxxxxxxxxx
1
47
 
1
            // Define a separate stream for Players
2
            final OutputTag<Tuple2<String,String>> playerTag
3
                    = new OutputTag<Tuple2<String,String>>("player"){};
4
5
            // Define a separate stream for Singers
6
            final OutputTag<Tuple2<String,Integer>> singerTag
7
                    = new OutputTag<Tuple2<String,Integer>>("singer"){};
8
9
            // Convert each record to an InputData object and split the main stream into two side streams.
10
            SingleOutputStreamOperator<InputData> inputDataMain
11
                    = inputStream
12
                    .process(new ProcessFunction<String, InputData>() {
13
14
                        @Override
15
                        public void processElement(
16
                                String inputStr,
17
                                Context ctx,
18
                                Collector<InputData> collInputData) {
19
20
                            Utils.print(Utils.COLOR_CYAN, "Received record : " + inputStr);
21
22
                            // Convert a String to an InputData Object
23
                            InputData inputData = InputData.getDataObject(inputStr);
24
25
                            switch (inputData.getType())
26
                            {
27
                                case "Singer":
28
29
                                    //Create output tuple with name and count
30
                                    ctx.output(singerTag,
31
                                            new Tuple2<String,Integer>
32
                                                    (inputData.getName(), inputData.getScore()));
33
                                    break;
34
                                case "Player":
35
                                    // Create output tuple with name and type;
36
                                    // if the newly created tuple doesn't match the playerTag type then a compilation error is raised ("method output cannot be applied to given types")
37
                                    ctx.output(playerTag,
38
                                            new Tuple2<String, String>
39
                                                    (inputData.getName(), inputData.getType()));
40
                                    break;
41
                                default:
42
                                    // Collect main output  as InputData objects
43
                                    collInputData.collect(inputData);
44
                                    break;
45
                            }
46
                        }
47
                    });


Merging Streams (Example #3)

The last operation in this blog-post demonstrates the operation of merging stream. The idea is to combine two different streams, which can differ in their data format, and produce one stream with a unified data structure. As opposed to an SQL merge operation, which merges data horizontally. The operation of the merging stream is vertical since the events continue to flow without any bounded time frame.

Merging streams is done by calling the method connect and then defining the map operation on each element in each individual stream. The result is a merged stream.

Java
xxxxxxxxxx
1
28
 
1
// The returned stream definition includes both streams data type
2
        ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, String>> mergedStream
3
                = singerStream
4
                .connect(playerStream);
5
6
7
        DataStream<Tuple4<String, String, String, Integer>> combinedStream
8
                = mergedStream.map(new CoMapFunction<
9
                Tuple2<String, Integer>, //Stream 1
10
                Tuple2<String, String>, //Stream 2
11
                Tuple4<String, String, String, Integer> //Output
12
                >() {
13
14
            @Override
15
            public Tuple4<String, String, String, Integer>  //Process Stream 1
16
            map1(Tuple2<String, Integer> singer) throws Exception {
17
                return new Tuple4<String, String, String, Integer>
18
                        ("Source: singer stream", singer.f0, "", singer.f1);
19
            }
20
21
            @Override
22
            public Tuple4<String, String, String, Integer> //Process Stream 2
23
            map2(Tuple2<String, String> player) throws Exception {
24
                return new Tuple4<String, String, String, Integer>
25
                        ("Source: player stream", player.f0, player.f1, 0);
26
            }
27
        });
28


Building a Workable Project

Bringing it all together: I uploaded a demo project to GitHub. You can follow the instructions on how to build and compile it. This is a good start to play with Flink.

I hope you find this repo useful. Do not hesitate to contact me if you have any issues.

The Takeaways

This article focused on the essential foundations to build a working stream processing application based on Flink. Its purpose is to provide a basic understanding of stream processing challenges and set the foundations for building a stand-alone Flink application.

Since stream processing has many aspects and complexities, many topics were not covered. For example, Flink execution and task management, using watermarks to set Event time into the stream events, planting states in the stream’s events, running stream iterations, executing SQL-like queries on streams, and much more. I hope to cover some of these topics in subsequent articles.

Nevertheless, I hope this blog equipped you with essential information to start using Flink.

Keep on coding!

— Lior


Resources

[1] DataStream operations: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/index.html

[2] Data sinks: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#data-sinks

Stream processing Apache Flink application Data (computing) Event Object (computer science) Foundation (framework) Java (programming language) Session (web analytics)

Opinions expressed by DZone contributors are their own.

Related

  • High-Performance Java Serialization to Different Formats
  • Implement a Distributed Database to Your Java Application
  • Java Memory Management
  • Java in Mule for Java Programmers

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!