DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Resilient Kafka Consumers With Reactor Kafka
  • Design Twitter Like Application Using Lambda Architecture
  • Building AMQP-Based Messaging Framework on MongoDB
  • Dependency Injection in Spring

Trending

  • Dropwizard vs. Micronaut: Unpacking the Best Framework for Microservices
  • How To Develop a Truly Performant Mobile Application in 2025: A Case for Android
  • Performing and Managing Incremental Backups Using pg_basebackup in PostgreSQL 17
  • Creating a Web Project: Caching for Performance Optimization
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Getting Started With Kafka and Rust (Part 2)

Getting Started With Kafka and Rust (Part 2)

A hands-on guide to teach you how to interact with Kafka using the Rust programming language.

By 
Abhishek Gupta user avatar
Abhishek Gupta
DZone Core CORE ·
Apr. 14, 21 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
5.9K Views

Join the DZone community and get the full member experience.

Join For Free

This is a two-part series to help you get started with Rust and Kafka. We will be using the rust-rdkafka crate which itself is based on librdkafka (C library).

In this post, we will cover the Kafka Consumer API.

The first part is available here.

Initial setup

Make sure you install a Kafka broker - a local setup should suffice. Of course you will need to have Rust installed as well - you will need version 1.45 or above

Before you begin, clone the GitHub repo:

Shell
 




xxxxxxxxxx
1


 
1
git clone https://github.com/abhirockzz/rust-kafka-101
2
cd part2



Simple Consumer

Creating a low-level consumer (BaseConsumer) is strikingly similar to how you'd create its counterpart - BaseProducer. The only difference is that you will have to cast the output to the right type (which in this case is BaseConsumer)

Rust
 




xxxxxxxxxx
1


 
1
    let consumer: BaseConsumer = ClientConfig::new()
2
        .set("bootstrap.servers", "localhost:9092")
3
        .set("group.id", "my_consumer_group")
4
        .create()
5
        .expect("invalid consumer config");
6

          
7
    consumer
8
        .subscribe(&["rust"])
9
        .expect("topic subscribe failed");



Notice that the group.id config has also been included .set("group.id", "my_consumer_group") - its mandatory.

Once a BaseConsumer is created, one can subscribe to one or more topics (in this case, its just one topic with the name rust).

To fetch messages from the topic, we start (spawn) a new thread:

Rust
 




xxxxxxxxxx
1
15


 
1
    thread::spawn(move || loop {
2
        for msg_result in consumer.iter() {
3
            let msg = msg_result.unwrap();
4
            let key: &str = msg.key_view().unwrap().unwrap();
5
            let value = msg.payload().unwrap();
6
            let user: User = serde_json::from_slice(value).expect("failed to deser JSON to User");
7
            println!(
8
                "received key {} with value {:?} in offset {:?} from partition {}",
9
                key,
10
                user,
11
                msg.offset(),
12
                msg.partition()
13
            )
14
        }
15
    });



It accepts a closure which in this case happens to be a infinite loop that:

  • Receives messages, and,
  • Prints out the key, value along with offset and partition info

Calling iter on the consumer is just a short-cut invoking poll without any timeout.

Other variations are also possible. You can use poll directly:

Rust
 




xxxxxxxxxx
1


 
1
loop {
2
  let message = consumer.poll(Duration::from_secs(2));
3
    ...
4
}



Or, use this format:

Rust
 




xxxxxxxxxx
1


 
1
for message in &consumer {
2
...
3
}



Run the Program

  • Rename the file src/1_consumer_simple.rs to main.rs, and
  • execute cargo run

Output:

Plain Text
 




xxxxxxxxxx
1
19


 
1
sending message
2
sending message
3
produced message with key user-1 in offset 25 of partition 2
4
produced message with key user-2 in offset 12 of partition 4
5
sending message
6
produced message with key user-3 in offset 20 of partition 0
7
received key user-3 with value User { id: 3, email: "user-3@foobar.com" } in offset 20 from partition 0
8
sending message
9
produced message with key user-4 in offset 24 of partition 3
10
received key user-4 with value User { id: 4, email: "user-4@foobar.com" } in offset 24 from partition 3
11
sending message
12
produced message with key user-5 in offset 25 of partition 3
13
received key user-5 with value User { id: 5, email: "user-5@foobar.com" } in offset 25 from partition 3
14
sending message
15
produced message with key user-6 in offset 26 of partition 3
16
received key user-6 with value User { id: 6, email: "user-6@foobar.com" } in offset 26 from partition 3
17
sending message
18
produced message with key user-7 in offset 27 of partition 3
19
received key user-7 with value User { id: 7, email: "user-7@foobar.com" } in offset 27 from partition 3



As expected:

  • You see the producer callbacks - confirms that the message was sent to Kafka
  • Consumer received the message as well - as confirmed by the log

What About Consumer Callbacks?

Yes, just like the producer, the consumer API also has callbacks for:

  • Re-balancing
  • Offset commit

To do this, we need to implement the ConsumerContext trait. We will:

  • Define a struct
  • Provide an empty implementation for ClientContext
  • Override the following methods from ConsumerContext trait - pre_rebalance, post_rebalance, commit_callback
Rust
 




xxxxxxxxxx
1


 
1
struct ConsumerCallbackLogger;
2
impl ClientContext for ConsumerCallbackLogger {}
3
impl ConsumerContext for ConsumerCallbackLogger {
4
...
5
}



We will skip the pre_rebalance method and focus on post_rebalance in this example:

Rust
 




xxxxxxxxxx
1
17


 
1
    fn post_rebalance<'a>(&self, rebalance: &rdkafka::consumer::Rebalance<'a>) {
2
        println!("post_rebalance callback");
3

          
4
        match rebalance {
5
            Rebalance::Assign(tpl) => {
6
                for e in tpl.elements() {
7
                    println!("rebalanced partition {}", e.partition())
8
                }
9
            }
10
            Rebalance::Revoke => {
11
                println!("ALL partitions are REVOKED")
12
            }
13
            Rebalance::Error(err_info) => {
14
                println!("Post Rebalance error {}", err_info)
15
            }
16
        }
17
    }



Rebalance is an enum. As a part of the implementation, we match it against all the possible options (partitions assigned, partitions revoked, rebalance error) and simply log it.

Rust
 




xxxxxxxxxx
1
20


 
1
    fn commit_callback(
2
        &self,
3
        result: rdkafka::error::KafkaResult<()>,
4
        offsets: &rdkafka::TopicPartitionList,
5
    ) {
6
        match result {
7
            Ok(_) => {
8
                for e in offsets.elements() {
9
                    println!(
10
                        "committed offset {:?} in partition {}",
11
                        e.offset(),
12
                        e.partition()
13
                    )
14
                }
15
            }
16
            Err(err) => {
17
                println!("error committing offset - {}", err)
18
            }
19
        }
20
    }



For commit callback events, we match on the KafkaResult (available in the commit_callback parameter) to check whether the commit was successful. If it was, we simply print out the committed offset in the partition or log the error that occurred during the commit process.

Once this is done, we simply need to plug-in our new implementation:

Rust
 




xxxxxxxxxx
1


 
1
    let consumer: BaseConsumer<ConsumerCallbackLogger> = ClientConfig::new()
2
        .set("bootstrap.servers", "localhost:9092",)
3
        ....
4
        .create_with_context(ConsumerCallbackLogger {})
5
        .expect("invalid consumer config");



To do this, we made a couple of changes:

  • Use create_with_context (instead of create)
  • use BaseConsumer<ConsumerCallbackLogger>

Run the Program

  • Rename the file src/2_consumer_callback.rs to main.rs, and
  • execute cargo run
Plain Text
 




xxxxxxxxxx
1
19


 
1
sending message
2
sending message
3
produced message with key user-1 in offset 0 of partition 2
4
post_rebalance callback
5
rebalanced partition 0
6
rebalanced partition 1
7
rebalanced partition 2
8
rebalanced partition 3
9
rebalanced partition 4
10
rebalanced partition 5
11
produced message with key user-2 in offset 0 of partition 4
12
sending message
13
produced message with key user-3 in offset 0 of partition 0
14
received key user-3 with value User { id: 3, email: "user-3@foobar.com" } in offset 0 from partition 0
15
sending message
16
committed offset Offset(1) in partition 0
17
committed offset Offset(1) in partition 4
18
produced message with key user-4 in offset 0 of partition 3
19
received key user-4 with value User { id: 4, email: "user-4@foobar.com" } in offset 0 from partition 3



As expected, the re-balance events were logged along with the successful commits.

Trigger a Re-Balance

Partition assignment happens the first time when you start the application and you're able to witness this, thanks to our ConsumerContext implementation. You can also trigger the rebalance again by starting the new instance of the application. Since there are two instances in the same consumer group, the topic partitions will be rebalanced. For e.g. if you had 6 partitions in the topic, they will be equally split up amongst these two instances.

You should see log messages similar to this:

Plain Text
 




xxxxxxxxxx
1
13


 
1
....
2
# instance 1
3
post_rebalance callback
4
rebalanced partition 0
5
rebalanced partition 1
6
rebalanced partition 2
7
...
8

          
9
# instance 2
10
post_rebalance callback
11
rebalanced partition 3
12
rebalanced partition 4
13
rebalanced partition 5



Switching to Manual Commit

By default, the offset commit process is taken care of by the library itself. But we can exercise more control over it by switching to manual mode.

The first thing would be to set enable.auto.commit to false - set("enable.auto.commit", "false");

At-Least Once Delivery

To achieve this, we need to make sure we indeed process the message successfully before committing the offset. To simulate this, let's write a function (named process) that can fail randomly. We will then use this in our consumer loop and commit only when this functions returns successfully.

Rust
 




xxxxxxxxxx
1
14


 
1
fn process(u: User) -> Result<(), ()> {
2
    let mut rnd = rand::thread_rng();
3
    let ok = rnd.gen_bool(1.0 / 2.0); //50% probability of returning true
4
    match ok {
5
        true => {
6
            println!("SUCCESSFULLY processed User info {:?}", u);
7
            Ok(())
8
        }
9
        false => {
10
            println!("FAILED to process User info {:?}", u);
11
            Err(())
12
        }
13
    }
14
}



We will need to modify our consumer loop"

  • Add manual offset commit based on response from the process function
  • Add a label ('consumer_thread) to our thread loop
Rust
 




x



1
    thread::spawn(move || 'consumer_thread: loop {
2
        for msg_result in consumer.iter() {
3
            //..... omitted
4

          
5
            println!(
6
                "received key {} with value {:?} in offset {:?} from partition {}",
7
                key,
8
                user,
9
                msg.offset(),
10
                msg.partition()
11
            );
12

          
13
            let processed = process(user);
14
            match processed {
15
                Ok(_) => {
16
                    consumer.commit_message(&msg, CommitMode::Sync);
17
                }
18
                Err(_) => {
19
                    println!("loop encountered processing error");
20
                    break 'consumer_thread;
21
                }
22
            }
23
        }
24
    });




We call process - this is to simulate processing of each record received by the consumer. In case the processing succeeds (returns Ok), we commit the record using commit_message.

Note that the commit itself may fail. This should ideally be handled in the commit_callback implementation of ConsumerContext

Run the Program

  • Rename the file src/3_manual_commit.rs to main.rs, and
  • Execute cargo run

The program output is lengthy, but bear with me.

Output:

Plain Text
 




x
21


 
1
produced message with key user-1 in offset 22 of partition 2
2
produced message with key user-2 in offset 28 of partition 4
3
post_rebalance callback
4
rebalanced partition 0
5
rebalanced partition 1
6
rebalanced partition 2
7
rebalanced partition 3
8
rebalanced partition 4
9
rebalanced partition 5
10
received key user-5 with value User { id: 5, email: "user-5@foobar.com" } in offset 52 from partition 3
11
SUCCESSFULLY processed User info User { id: 5, email: "user-5@foobar.com" }
12
committed offset Offset(53) in partition 3
13
received key user-2 with value User { id: 2, email: "user-2@foobar.com" } in offset 28 from partition 4
14
SUCCESSFULLY processed User info User { id: 2, email: "user-2@foobar.com" }
15
produced message with key user-3 in offset 35 of partition 0
16
committed offset Offset(29) in partition 4
17
received key user-1 with value User { id: 1, email: "user-1@foobar.com" } in offset 22 from partition 2
18
FAILED to process User info User { id: 1, email: "user-1@foobar.com" }
19
loop encountered processing error. closing consumer...
20
post_rebalance callback
21
ALL partitions have been REVOKED



Notice these logs messages when process returns successfully:

  1. received key user-5 with value User { id: 5, email: "" } in offset 52 from partition 3
  2. SUCCESSFULLY processed User info User { id: 5, email: "" }
  3. committed offset Offset(52) in partition 3

For a failure scenario:

  1. received key user-1 with value User { id: 1, email: "" } in offset 22 from partition 2
  2. FAILED to process User info User { id: 1, email: "" }
  3. loop encountered a processing error. closing consumer...

We ended up stopping the consumer when processing failed? The question here is:

How to Handle Messages That Did Not Get Processed?

Note that failure could happen due to many reasons. A couple of them are:

  • Processing failed (this is what we simulated in this example), or,
  • Processing was successful, but the commit failed

If we continue with our consumer loop after a failed message, we could end up losing messages (data loss). Why? It's because the commit_message method also marks smaller offsets (less that the one being handled) as committed. For e.g. if you had a scenario where offset 20 from partition 5 failed to get processed (and committed), you continue processing and offset 21 from partition 5 was processed and committed successfully, you will end up missing data from offset 20 - this is because committing offset 21 will also commit offsets 20 and below. Even after you re-start the application, this will not be detected.

To Prevent This...

You can either:

  • Halt the consumer process after detecting the first failure. In this example, we do this by exiting our consumer thread itself (this is not acceptable for real-world applications though). When you restart the application, the processing will begin from the last committed offset and the failed message will be picked up and re-processed.
  • Even better - You can handle this in commit_callback by sending this data to another Kafka topic (also know as a "dead letter topic") which can be processed separately.

Other Considerations

This is by no means an exhaustive list or coverage of all the delivery semantics:

  • We did not cover at-most once and exactly once.
  • You may want to choose the use Async commit mode - this has its own set of caveats.
  • Committing each and every message (even asynchronously) carries overhead. You may want to commit messages/offsets in batches. As always, you need to take care of a lot of corner cases here as well.

That's all for this two-part series on getting started with Rust and Kafka using the rust-rdkafka library. We covered:

  • A simple producer
  • Producer with delivery callback
  • How to send JSON payloads
  • A basic consumer
  • Handle re-balance and offset commit callbacks
  • Explore manual commit and at-least once delivery semantics
kafka Rust (programming language) Commit (data management) Plain text Processing application Implementation Delivery (commerce)

Published at DZone with permission of Abhishek Gupta, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Resilient Kafka Consumers With Reactor Kafka
  • Design Twitter Like Application Using Lambda Architecture
  • Building AMQP-Based Messaging Framework on MongoDB
  • Dependency Injection in Spring

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!