DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • High-Speed Real-Time Streaming Data Processing
  • Real-Time Streaming Architectures: A Technical Deep Dive Into Kafka, Flink, and Pinot
  • Kafka Stream (KStream) vs Apache Flink
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions

Trending

  • Understanding IEEE 802.11(Wi-Fi) Encryption and Authentication: Write Your Own Custom Packet Sniffer
  • Mastering Advanced Aggregations in Spark SQL
  • Is Agile Right for Every Project? When To Use It and When To Avoid It
  • Developers Beware: Slopsquatting and Vibe Coding Can Increase Risk of AI-Powered Attacks
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Apache Flink With Kafka - Consumer and Producer

Apache Flink With Kafka - Consumer and Producer

By 
Preetdeep Kumar user avatar
Preetdeep Kumar
DZone Core CORE ·
Apr. 02, 20 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
25.4K Views

Join the DZone community and get the full member experience.

Join For Free

Overview

Apache Flink provides various connectors to integrate with other systems. In this article, I will share an example of consuming records from Kafka through FlinkKafkaConsumer and producing records to Kafka using FlinkKafkaProducer.

Setup

I installed Kafka locally and created two Topics, TOPIC-IN and TOPIC-OUT. 

Shell
xxxxxxxxxx
1
 
1
# Create two topics
2
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TOPIC-IN
3
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TOPIC-OUT
4
5
# List all topics
6
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
7
TOPIC-IN
8
TOPIC-OUT
9
__consumer_offsets


I wrote a very simple NumberGenerator, which will generate a number every second and send it to TOPIC_IN using a KafkaProducer object. The code for both is available on Github.

A sample run produces the following output:

Shell
xxxxxxxxxx
1
10
 
1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC-IN --property print.key=true --from-beginning
2
myKey   [1]
3
myKey   [2]
4
myKey   [3]
5
myKey   [4]
6
myKey   [5]
7
myKey   [6]
8
myKey   [7]
9
myKey   [8]
10
myKey   [9]


FlinkKafkaConnector Example

First, define a FlinkKafkaConsumer, as shown below:

Java
xxxxxxxxxx
1
27
 
1
String TOPIC_IN = "TOPIC-IN";
2
String TOPIC_OUT = "TOPIC-OUT";
3
String BOOTSTRAP_SERVER = "localhost:9092";
4
5
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6
7
// to use allowed lateness and timestamp from kafka message
8
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
9
        
10
Properties props = new Properties();
11
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
12
props.put("client.id", "flink-kafka-example");
13
14
// consumer to get both key/values per Topic
15
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
16
17
// for allowing Flink to handle late elements
18
kafkaConsumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<KafkaRecord>() 
19
    {
20
        @Override
21
        public long extractAscendingTimestamp(KafkaRecord record) 
22
        {
23
            return record.timestamp;
24
        }
25
    });
26
27
kafkaConsumer.setStartFromLatest();


Line #5: Get a local Flink StreamExecutionEnvrionment.

Line #8: Required to use timestamp coming in the messages from Kafka. Otherwise, Flink will use the system clock.

Line #15: Create a FlinkKafkaConsumer<> object, which will act as a source for us. The class "KafkaRecord" is a wrapper for the key and value coming from Kafka, and the MySchema class implements KafkaDeserializationSchema<KafkaRecord> to provide deserialization logic used by Flink to convert byte[] from Kafka to String. 

The code for both is available here. This is required because I want to read both the key and value of the Kafka messages.

Line #18 to #25: Required to inform Flink where it should read the timestamp. This is used to decide the start and end of a TumblingTimewindow.

After this, we need to define a FlinkKafkaProducer, as shown below:

Java
xxxxxxxxxx
1
 
1
Properties prodProps = new Properties();
2
prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER);
3
        
4
FlinkKafkaProducer<KafkaRecord> kafkaProducer = 
5
                new FlinkKafkaProducer<KafkaRecord>(TOPIC_OUT, 
6
                                               ((record, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT, record.key.getBytes(), record.value.getBytes())), 
7
                                               prodProps, 
8
                                               Semantic.EXACTLY_ONCE);


Now, we can define a simple pipeline, as shown below:

Java
xxxxxxxxxx
1
26
 
1
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
2
3
stream        
4
.filter((record) -> record.value != null && !record.value.isEmpty())
5
.keyBy(record -> record.key)
6
.timeWindow(Time.seconds(5))
7
.allowedLateness(Time.seconds(1))        
8
.reduce(new ReduceFunction<KafkaRecord>() 
9
{
10
  KafkaRecord result = new KafkaRecord();
11
  
12
  @Override
13
  public KafkaRecord reduce(KafkaRecord record1, KafkaRecord record2) throws Exception 
14
  {
15
     result.key = "outKey";
16
     result.value = record1.value+record2.value;
17
     return result;
18
  }
19
})
20
.addSink(kafkaProducer);
21
22
// produce a number as string every second
23
new NumberGenerator(p, TOPIC_IN).start();
24
        
25
// start flink
26
env.execute();


Line #1: Create a DataStream from the FlinkKafkaConsumer object as the source.

Line #3: Filter out null and empty values coming from Kafka.

Line #5: Key the Flink stream based on the key present in Kafka messages. This will logically partition the stream and allow parallel execution on a per-key basis.

Line #6 to #7: Define a time window of five seconds and provide lateness of an extra second.

Line #8 to #19: Simple reduction logic that appends all the numbers collected in a window and sends the result using a new key "outKey".

Line #20: Sends the output of each window to the FlinkKafkaProducer object created above.

Line #23: Start the NumberGenerator.

Line #26: Start the Flink execution environment.

A sample run of this code produces the following output:

Shell
xxxxxxxxxx
1
 
1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC-OUT --property print.key=true --from-beginning
2
outKey  [5][6]
3
outKey  [7][8][9][10][11]
4
outKey  [12][13][14][15][16]
5
outKey  [17][18][19][20][21]
6
outKey  [22][23][24][25][26]


Conclusion

The above example shows how to use Flink's Kafka connector API to consume as well as produce messages to Kafka and customized deserialization when reading data from Kafka.

kafka Apache Flink

Opinions expressed by DZone contributors are their own.

Related

  • High-Speed Real-Time Streaming Data Processing
  • Real-Time Streaming Architectures: A Technical Deep Dive Into Kafka, Flink, and Pinot
  • Kafka Stream (KStream) vs Apache Flink
  • Event-Driven Architectures: Designing Scalable and Resilient Cloud Solutions

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!