DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Power BI Embedded Analytics — Part 3: Power BI Embedded Demo
  • DGS GraphQL and Spring Boot
  • Auto-Instrumentation in Azure Application Insights With AKS
  • Deploying a Scala Play Application to Heroku: A Step-by-Step Guide

Trending

  • How to Format Articles for DZone
  • Simplifying Multi-LLM Integration With KubeMQ
  • Optimizing Serverless Computing with AWS Lambda Layers and CloudFormation
  • Endpoint Security Controls: Designing a Secure Endpoint Architecture, Part 2

4 Ways to Optimize Your Flink Applications

Apache Flink is a streaming data processing framework. Read article to learn how to make your Flink applications a little bit faster!

By 
Ivan Mushketyk user avatar
Ivan Mushketyk
·
Oct. 18, 17 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
16.5K Views

Join the DZone community and get the full member experience.

Join For Free

Flink is a complicated framework and provides many ways to tweak its execution. In this article, I'll show four different ways to improve the performance of your Flink applications.

If you are not familiar with Flink, you can read other introductory articles like this, this, and this. But if you are already familiar with Apache Flink, this article will help you to make your applications a little bit faster.

Use Flink Tuples

When you use operations like groupBy, join, or keyBy, Flink provides you a number of options to select a key in your dataset. You can use a key selector function:

// Join movies and ratings datasets
movies.join(ratings)
        // Use movie id as a key in both cases
        .where(new KeySelector<Movie, String>() {
            @Override
            public String getKey(Movie m) throws Exception {
                return m.getId();
            }
        })
        .equalTo(new KeySelector<Rating, String>() {
            @Override
            public String getKey(Rating r) throws Exception {
                return r.getMovieId();
            }
        })

Or you can specify a field names in POJO types:

movies.join(ratings)
    // Use same fields as in the previous example
    .where("id")
    .equalTo("movieId")

But if you are working with Flink tuple types, you can simply specify a position of a field tuple that will be used as key:

DataSet<Tuple2<String, String>> movies = ...
DataSet<Tuple3<String, String, Double>> ratings = ...

movies.join(ratings)
    // Specify fields positions in tuples
    .where(0)
    .equalTo(1)

The last option will give you the best performance, but what about readability? Does it mean that your code will look like this now?

DataSet<Tuple3<Integer, String, Double>> result = movies.join(ratings)
    .where(0)
    .equalTo(0)
    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
        // What is happening here?
        @Override
        public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
            // Some tuples are joined with some other tuples and some fields are returned???
            return new Tuple3<>(first.f0, first.f1, second.f1);
        }
    });

A common idiom to improve readability, in this case, is to create a class that inherits from one of the TupleX classes and implements getters and setters for these fields. Here's an Edge class from the Flink Gelly library that has three classes and extends the Tuple3 class:

public class Edge<K, V> extends Tuple3<K, K, V> {

    public Edge(K source, K target, V value) {
        this.f0 = source;
        this.f1 = target;
        this.f2 = value;
    }
    
    // Getters and setters for readability
    public void setSource(K source) {
        this.f0 = source;
    }

    public K getSource() {
        return this.f0;
    }
    
    // Also has getters and setters for other fields
    ...
}

Reuse Flink Objects

Another option that you can use to improve the performance of your Flink application is to use mutable objects when you return data from a user-defined function. Take a look at this example:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            // A new Tuple instance is created on every execution
            collector.collect(new Tuple2<>(userName, changesCount));
        }
    }

As you can see on every execution of the apply function, we create a new instance of the Tuple2 class, which increases pressure on a garbage collector. One way to fix this problem would be to reuse the same instance again and again:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create an instance that we will reuse on every call
        private Tuple2<String, Long> result = new Tuple<>();
    
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Auto-boxing!! A new Long value may be created
            result.f1 = changesCount;
            
            // Reuse the same Tuple2 object
            collector.collect(result);
        }
    }

It's a bit better. We create a new Tuple2 instance on every call, but we still indirectly create an instance of the Long class. To solve this problem, Flink has a number of so-called value classes: IntValue, LongValue, StringValue, FloatValue, etc. The point of this classes is to provide mutable versions of built-in types so we can reuse them in our user-defined functions. Here is how we can use them:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create a mutable count instance
        private LongValue count = new IntValue();
        // Assign mutable count to the tuple
        private Tuple2<String, LongValue> result = new Tuple<>("", count);
    
        @Override
        // Notice that now we have a different return type
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, LongValue>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Update mutable count value
            count.setValue(changesCount);
            
            // Reuse the same tuple and the same LongValue instance
            collector.collect(result);
        }
    }

This idiom is commonly used in Flink libraries like Flink Gelly.

Use Function Annotations

One more way to optimize your Flink application is to provide some information about what your user-defined functions are doing with input data. Since Flink can't parse and understand code, you can provide crucial information that will help to build a more efficient execution plan. There are three annotations that we can use:

  1. @ForwardedFields: Specifies what fields in an input value were left unchanged and are used in an output value.
  2. @NotForwardedFields: Specifies fields that were not preserved in the same positions in the output.
  3. @ReadFields: Specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.

Let's take a look at how we can use ForwardedFields annotation:

// Specify that the first element is copied without any changes
@ForwardedFields("0")
class MyFunction implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
    @Override
    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
       // Copy first field without change
        return new Tuple2<>(value.f0, value.f1 + 123);
    }
}

This means that the first element in an input tuple is not being changed and it is returned in the same position.

If you don't change a field but simply move it into a different position, you can specify this with the ForwardedFields annotation, as well. In the next example, we swap fields in an input tuple and warn Flink about this:

// 1st element goes into the 2nd position, and 2nd element goes into the 1st position
@ForwardedFields("0->1; 1->0")
class SwapArguments implements MapFunction<Tuple2<Long, Double>, Tuple2<Double, Long>> {
    @Override
    public Tuple2<Double, Long> map(Tuple2<Long, Double> value) {
       // Swap elements in a tuple
        return new Tuple2<>(value.f1, value.f0);
    }
}

The annotations mentioned above can only be applied to functions that have one input parameter, such as map or flatMap. If you have two input parameters, you can use the ForwardedFieldsFirst and the ForwardedFieldsSecond annotations that provide information about the first and the second parameters respectively.

Here's how we can use these annotations in an implementation of the JoinFunction interface:

// Two fields from the input tuple are copied to the first and second positions of the output tuple
@ForwardedFieldsFirst("0; 1")
// The third field from the input tuple is copied to the third position of the output tuple
@ForwardedFieldsSecond("2")
class MyJoin implements JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
    @Override
    public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
        return new Tuple3<>(first.f0, first.f1, second.f1);
    }
})

Flink also provides NotForwardedFieldsFirst, NotForwardedFieldsSecond, ReadFieldsFirst, and ReadFirldsSecond annotations for similar purposes.

Select Join Type

You can make your joins faster if you give Flink another hint, but before we discuss why it works, let's talk about how Flink executes joins.

When Flink is processing batch data, each machine in a cluster stores part of data. To perform a join, Apache Flink needs to find all pairs of two datasets where a join condition is satisfied. To do this, Flink first has to put items from both datasets that have the same key on the same machine in the cluster. There are two strategies for this:

  1. Repartition-repartition strategy: In this case, both datasets are partitioned by their keys and sent across the network. This means that if datasets are big, it may take a significant amount of time to copy them across the network.
  2. Broadcast-forward strategy: In this case, one dataset is left untouched, but the second dataset is copied to every machine in the cluster that has part of the first dataset.

If you are joining a small dataset with a much bigger dataset, you can use the broadcast-forward strategy and avoid costly partition of the first dataset. This is really easy to do:

ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST) 

This hints that the first dataset is a much smaller than the second one.

You can also use other join hints:

  • BROADCAST_HASH_SECOND: The second dataset is much smaller
  • REPARTITION_HASH_FIRST: The first dataset it a bit smaller
  • REPARTITION_HASH_SECOND: The second dataset is a bit smaller
  • REPARTITION_SORT_MERGE: Repartition both datasets and use sorting and merging strategy
  • OPTIMIZER_CHOOSES: Flink optimizer will decide how to join datasets

You can read more about how Flink performs joins in this article.

More Information

I hope you liked this article and found it useful.

I will write more articles about Flink in the near future, so stay tuned! You can read my other articles here, or you can you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink. Here is a short preview of this course.

application

Published at DZone with permission of Ivan Mushketyk, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Power BI Embedded Analytics — Part 3: Power BI Embedded Demo
  • DGS GraphQL and Spring Boot
  • Auto-Instrumentation in Azure Application Insights With AKS
  • Deploying a Scala Play Application to Heroku: A Step-by-Step Guide

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!