Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using Apache Flink With Java 8

DZone's Guide to

Using Apache Flink With Java 8

Apache Flink is useful for stream processing, and now that Java supports lambda functions, you can interact with Flink in a host of new ways.

· Java Zone
Free Resource

Download Microservices for Java Developers: A hands-on introduction to frameworks and containers. Brought to you in partnership with Red Hat.

JDK 8 has introduced a lot of long-anticipated features to the Java language. Among those, the most notable was the introduction of lambda functions. They allowed adding new frameworks such as Java 8 Streams as well as new features to existing frameworks like JUnit 5.

Apache Flink also supports lambda functions, and in this post, I’ll show how to enable them and how to use them in your applications.

Let’s say we need to implement a Flink application to count how often every word appears in our text. Using Java 7, we would implement it like:

DataSource<String> lines = env.fromElements(
    "Apache Flink is a community-driven open source framework for distributed big data analytics,",
    "like Hadoop and Spark. The core of Apache Flink is a distributed streaming dataflow engine written",
            ...
);

lines.flatMap(new FlatMapFunction<String, Object>() {
    @Override
    public void flatMap(String line, Collector<Object> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
})
.groupBy(0)
.sum(1)
.print();


The code is pretty straightforward. At first, it splits every line in the input dataset into separate words and emits tuples with word and number one. Then the applications groups all emitted tuple by word (0th position in emitted tuples) and sums up all ones (1st position in emitted tuples).

If you use Java 8 the first thing that you would like to change is to replace the FlatMapFunction in the previous example with modern lambda expressions like this:

lines.flatMap((line, out) -> {
    String[] words = line.split("\\W+");
    for (String word : words) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.groupBy(0)
.sum(1)
.print();


Unfortunately, if we try to compile this class with javac and execute it on a Flink cluster, we will get the following error:

$ bin/flink run wordCount.jar
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. 


It turned out that Flink is using generic types in user-defined functions to generate serializers. When we use anonymous functions, this information is preserved, but lambda expressions are not anonymous classes. javac treats them differently and does not store generic types in the class file.

In the following sections of this post, I’ll demonstrate several different ways you can overcome this issue.

To solve this problem, one of the Flink committers contributed patches to the Eclipse JDT and OpenJDK compilers. Eclipse JDT has accepted the patch, and, as a result, it now preserves generics information for lambdas and can be used to implement Flink applications with lambda expressions.

To set Eclipse JDT as a compiler in our Maven project, you need to add the following plugin to your pom.xml:

<plugins>
    <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <compilerId>jdt</compilerId>
        </configuration>
        <dependencies>
            <dependency>
                <groupId>org.eclipse.tycho</groupId>
                <artifactId>tycho-compiler-jdt</artifactId>
                <version>0.21.0</version>
            </dependency>
        </dependencies>
    </plugin>
</plugins>


After this, you can compile and execute your application with lambda functions using Flink without any changes:

$ mvn clean package -Pbuild-jar
$ bin/flink run target/codes.brewing.flinkexamples-1.0-SNAPSHOT.jar
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
...
(systems,2)
(the,3)
(to,1)
(written,1)


Provide Type Hints

If you don’t want to use the Eclipse JDT compiler for some reason and want to stick with javac , you can use a different, but less pretty approach. Apache Flink has a notion of type hints that you can use to, as the name suggests, hint the framework what type is used. To set the return type of the flatMap we can use the returns method.

lines.flatMap((line, out) -> {
    String[] words = line.split("\\W+");
    for (String word : words) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.returns(new TupleTypeInfo(TypeInformation.of(String.class), TypeInformation.of(Integer.class)))
.groupBy(0)
.sum(1)
.print();


The return method accepts an instance of the TypeInformation class that is used in Flink to represent information about Java types. It has multiple implementations, such as BasicTypeInfo to represent primitive types, EnumTypeInfo for enums, CompositeType to represent composite types like POJOs, and some others. In this case, we use TupleTypeInfo, which represents type information of Flink tuples. To create TupleTypeInfo , we pass an array of primitive types that specifies the tuple’s structure.

The last code snippet, even if compiled with javac, will work just fine.

If you don’t want to bother either with the Eclipse compiler or with type hints, you still have one more option: use lambdas only to define a subset of operators. It turns out that Flink only needs type information if your function has an Iterable or a Collector as one of its arguments. If you only use lambdas that don’t have such arguments, and these functions return types without generics, you don’t need to change anything in your code.

As an example, the following snippet can be compiled with javac:

lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String line, Collector<String> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(word);
        }
    }
})
.map(word -> new WordCount(word, 1))
.groupBy(wordCount -> wordCount.getWord())
.reduceGroup(new GroupReduceFunction<WordCount, WordCount>() {
    @Override
    public void reduce(Iterable<WordCount> values, Collector<WordCount> out) throws Exception {
        String word = null;
        int count = 0;

        for (WordCount wordCount : values) {
            word = wordCount.getWord();
            count += wordCount.getCount();
        }
        out.collect(new WordCount(word, count));
    }
})
.print();


Notice that in this code snippet, we can use lambda function in the map and groupBy functions. In this case, we also cannot use tuples (as in previous examples) since they have generics and they were replaced with a WordCount class.

Source Code

You can find a full version of the source code from this post in my GitHub repository.

Download Modern Java EE Design Patterns: Building Scalable Architecture for Sustainable Enterprise Development.  Brought to you in partnership with Red Hat

Topics:
java ,apache flink ,java 8 ,lambda function ,tutorial

Published at DZone with permission of Ivan Mushketyk, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}