{{announcement.body}}
{{announcement.title}}

Apache Flink With Kafka - Consumer and Producer

DZone 's Guide to

Apache Flink With Kafka - Consumer and Producer

See an example of how to share an example of consuming records from Kafka through FlinkKafkaConsumer and records to Kafka using FlinkKafkaProducer.

· Big Data Zone ·
Free Resource

Overview

Apache Flink provides various connectors to integrate with other systems. In this article, I will share an example of consuming records from Kafka through FlinkKafkaConsumer and producing records to Kafka using FlinkKafkaProducer.

Setup

I installed Kafka locally and created two Topics, TOPIC-IN and TOPIC-OUT

Shell


I wrote a very simple NumberGenerator, which will generate a number every second and send it to TOPIC_IN using a KafkaProducer object. The code for both is available on Github.

A sample run produces the following output:

Shell


FlinkKafkaConnector Example

First, define a FlinkKafkaConsumer, as shown below:

Java


Line #5: Get a local Flink StreamExecutionEnvrionment.

Line #8: Required to use timestamp coming in the messages from Kafka. Otherwise, Flink will use the system clock.

Line #15: Create a FlinkKafkaConsumer<> object, which will act as a source for us. The class "KafkaRecord" is a wrapper for the key and value coming from Kafka, and the MySchema class implements KafkaDeserializationSchema<KafkaRecord> to provide deserialization logic used by Flink to convert byte[] from Kafka to String. 

The code for both is available here. This is required because I want to read both the key and value of the Kafka messages.

Line #18 to #25: Required to inform Flink where it should read the timestamp. This is used to decide the start and end of a TumblingTimewindow.

After this, we need to define a FlinkKafkaProducer, as shown below:

Java


Now, we can define a simple pipeline, as shown below:

Java


Line #1: Create a DataStream from the FlinkKafkaConsumer object as the source.

Line #3: Filter out null and empty values coming from Kafka.

Line #5: Key the Flink stream based on the key present in Kafka messages. This will logically partition the stream and allow parallel execution on a per-key basis.

Line #6 to #7: Define a time window of five seconds and provide lateness of an extra second.

Line #8 to #19: Simple reduction logic that appends all the numbers collected in a window and sends the result using a new key "outKey".

Line #20: Sends the output of each window to the FlinkKafkaProducer object created above.

Line #23: Start the NumberGenerator.

Line #26: Start the Flink execution environment.

A sample run of this code produces the following output:

Shell


Conclusion

The above example shows how to use Flink's Kafka connector API to consume as well as produce messages to Kafka and customized deserialization when reading data from Kafka.

Topics:
big data ,consumer ,flink ,flink api ,kafka ,producer ,streaming data ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}