DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • High-Performance Java Serialization to Different Formats
  • Practical Generators in Go 1.23 for Database Pagination
  • Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
  • Migrating From Lombok to Records in Java

Trending

  • The Modern Data Stack Is Overrated — Here’s What Works
  • Accelerating AI Inference With TensorRT
  • Doris: Unifying SQL Dialects for a Seamless Data Query Ecosystem
  • My LLM Journey as a Software Engineer Exploring a New Domain
  1. DZone
  2. Data Engineering
  3. Data
  4. How to Use Protobuf With Apache Kafka and Schema Registry

How to Use Protobuf With Apache Kafka and Schema Registry

A full guide on working with Protobuf in Apache Kafka.

By 
Dejan Maric user avatar
Dejan Maric
·
Aug. 03, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
43.6K Views

Join the DZone community and get the full member experience.

Join For Free

Since Confluent Platform version 5.5, Avro is no longer the only schema in town. Protobuf and JSON schemas are now supported as first-class citizens in Confluent universe. But before I go on explaining how to use Protobuf with Kafka, let’s answer one often-asked question:

Why Do We Need Schemas?

When applications communicate through a pub-sub system, they exchange messages and those messages need to be understood and agreed upon by all the participants in the communication. Additionally, you would like to detect and prevent changes to the message format that would make messages unreadable for some of the participants. 

That's where a schema comes in — it represents a contract between the participants in communication, just like an API represents a contract between a service and its consumers. And just as REST APIs can be described using OpenAPI (Swagger) so the messages in Kafka can be described using Avro, Protobuf or Avro schemas.

Schemas describe the structure of the data by:

  • specifying which fields are in the message
  • specifying the data type for each field and whether the field is mandatory or not

In addition, together with Schema Registry, schemas prevent a producer from sending poison messages - malformed data that consumers cannot interpret. Schema Registry will detect if breaking changes are about to be introduced by the producer and can be configured to reject such changes. An example of a breaking change would be deleting a mandatory field from the schema.

Introduction to Protobuf

Similar to Apache Avro, Protobuf is a method of serializing structured data. A message format is defined in a .proto file and you can generate code from it in many languages including Java, Python, C++, C#, Go and Ruby. Unlike Avro, Protobuf does not serialize schema with the message. So, in order to deserialize the message, you need the schema in the consumer.

Here's an example of a Protobuf schema containing one message type:

ProtoBuf
 




xxxxxxxxxx
1


 
1
syntax = "proto3";
2

          
3
package com.codingharbour.protobuf;
4

          
5
message SimpleMessage {
6
    string content = 1;
7
    string date_time = 2;
8
}


In the first line, we define that we're using Protobuf version 3. Our message type called SimpleMessage defines two string fields: content and date_time. Each field is assigned a so-called field number, which has to be unique in a message type. These numbers identify the fields when the message is serialized to the Protobuf binary format. Google suggests using numbers 1 through 15 for most frequently used fields because it takes one byte to encode them.

Protobuf supports common scalar types like string, int32, int64 (long), double, bool etc. For the full list of all scalar types in Protobuf check the Protobuf documentation.

Besides scalar types, it is possible to use complex data types. Below we see two schemas, Order and Product, where Order can contain zero, one or more Products:

ProtoBuf
 




xxxxxxxxxx
1
11


 
1
message Order {
2
    int64 order_id = 1;
3
    int64 date_time = 2;
4
    repeated Product product = 3;
5
}
6

          
7
message Product {
8
    int32 product_id = 1;
9
    string name = 2;
10
    string description = 3;
11
}


Now, let's see how these schemas end up in the Schema Registry.

Schema Registry and Protobuf

Schema Registry is a service for storing a versioned history of schemas used in Kafka. It also supports the evolution of schemas in a way that doesn't break producers or consumers. Until recently Schema Registry supported only Avro schemas, but since Confluent Platform 5.5 the support has been extended to Protobuf and JSON schemas.

If you worked with Avro and Kafka before, this section will not contain any surprises. Like with Avro, Schema Registry provides a serializer and deserializer for Protobuf, called KafkaProtobufSerializer and KafkaProtobufDeserializer.

The job of this serializer is to convert the Java object to a Protobuf binary format before the producer writes the message to Kafka.

The additional job of the serialiser is to check whether the Protobuf schema exists in the Schema Registry. If not, it will write the schema to Schema Registry and it will write the schema id to the message (at the beginning of the message). Then, when the Kafka record reaches the consumer, the consumer will use KafkaProtobufDeserializer to fetch the schema from the Schema Registry based on the schema id from the message. Once the schema is fetched, the KafkaProtobufDeserializer will use it to deserialize the message. This way the consumer doesn't need to know the schema in advance to be able to consume messages from Kafka.

Registering and using Protobuf schema


This is why, when using KafkaProtobuf(De)Serializer in a producer or a consumer, we need to provide the URL of the Schema Registry.

Code Generation in Java

Ok, now we know how a Protobuf schema looks and we know how it ends up in Schema Registry. Let's see now how we use Protobuf schemas from Java.

The first thing that you need is a protobuf-java library. In these examples, I'm using maven, so let's add the maven dependency:

XML
 




xxxxxxxxxx
1


 
1
<dependencies>
2
    <dependency>
3
        <groupId>com.google.protobuf</groupId>
4
        <artifactId>protobuf-java</artifactId>
5
        <version>3.12.2</version>
6
    </dependency>
7
</dependencies>


The next thing you want to do is use the protoc compiler to generate Java code from .proto files. But we're not going to invite the compiler manually, we'll use a maven plugin called protoc-jar-maven-plugin:

XML
 




xxxxxxxxxx
1
25


 
1
<plugin>
2
    <groupId>com.github.os72</groupId>
3
    <artifactId>protoc-jar-maven-plugin</artifactId>
4
    <version>3.11.4</version>
5
    <executions>
6
        <execution>
7
            <phase>generate-sources</phase>
8
            <goals>
9
                <goal>run</goal>
10
            </goals>
11
            <configuration>
12
                <inputDirectories>
13
                    <include>${project.basedir}/src/main/Protobuf</include>
14
                </inputDirectories>
15
                <outputTargets>
16
                    <outputTarget>
17
                        <type>java</type>
18
                        <addSources>main</addSources>
19
                        <outputDirectory>
20
                          ${project.basedir}/target/generated-sources/protobuf
21
                      </outputDirectory>
22
                    </outputTarget>
23
                </outputTargets>
24
            </configuration>
25
        </execution>
26
    </executions>
27
</plugin>


The Protobuf classes will be generated during the generate-sources phase. The plugin will look for proto files in the src/main/protobuf folder and the generated code will be created in the target/generated-sources/protobuf folder.

To generate the class in the target folder run mvn clean generate-sources

Note: All code examples from this blog post are available on Coding Harbour's GitHub.

Ok, now that we have our class generated, let's send it to Kafka using the new Protobuf serializer.

Running a Local Kafka Cluster

Before we get started, let's boot up a local Kafka cluster with the Schema Registry, so we can try our out code right away. We will run our cluster using docker-compose.

Don’t have docker-compose? Check: how to install docker-compose

I've prepared a docker-compose file with one Zookeeper, one Kafka broker and the Schema Registry. You can grab it from https://github.com/codingharbour/kafka-docker-compose

Navigate to single-node-avro-kafka folder and run docker-compose up -d

Your local Kafka cluster is now ready to be used. By running docker-compose ps, we can see that the Kafka broker is available on port 9092, while the Schema Registry runs on port 8081. Make a note of that, because we’ll need it soon.

Writing a Protobuf Producer

With Kafka cluster up and running is now time to create a Java producer that will send our SimpleMessage to Kafka. First, let's prepare the configuration for the Producer:

Java
 




xxxxxxxxxx
1


 
1
Properties properties = new Properties();
2
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
3
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
4
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
5
properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
6

          
7
Producer<String, SimpleMessage> producer = new KafkaProducer<>(properties);


Notice that we are using KafkaProtobufSerializer as the value serializer class. This is the new serializer available in Confluent Platform since version 5.5. It works similarly to KafkaAvroSerializer: when publishing messages it will check with Schema Registry if the schema is available there. If the schema is not yet registered, it will write it to Schema Registry and then publish the message to Kafka. For this to work, the serializer needs the URL of the Schema Registry and in our case, that's http://localhost:8081.

Next, we prepare the KafkaRecord, using the SimpleMessage class generated from the Protobuf schema:

Java
 




xxxxxxxxxx
1


 
1
SimpleMessage simpleMessage = SimpleMessage.newBuilder()
2
        .setContent("Hello world")
3
        .setDateTime(Instant.now().toString())
4
        .build();
5
                
6
ProducerRecord<String, SimpleMessage> record
7
                = new ProducerRecord<>("protobuf-topic", null, simpleMessage);  


This record will be written to the topic called protobuf-topic. The last thing to do is to write the record to Kafka:

Java
 




xxxxxxxxxx
1


 
1
producer.send(record);
2
producer.flush();
3
producer.close();


Usually, you wouldn't call flush() method, but since our application will be stopped after this, we need to ensure the message is written to Kafka before that happens.

Writing a Protobuf Consumer

We said that the consumer doesn't need to know the schema in advance to be able to deserialize the message, thanks to Schema Registry. But, having the schema available in advance allows us to generate the Java class out of it and use the class in our code. This helps with code readability and makes a code strongly typed. 

Here's how to do it. First, you will generate a java class(es) as explained in Code generation in Java section. Next, we prepare the configuration for the Kafka consumer:

Java
 




xxxxxxxxxx
1


 
1
Properties properties = new Properties();
2
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
3
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer-group");      
4
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
5
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


Here we're defining a broker URL, consumer group of our consumer and telling the consumer that we'll handle offset commits ourselves.
Next, we define deserializer for the messages:

Java
 




xxxxxxxxxx
1


 
1
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
2
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
3
properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
4
properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, SimpleMessage.class.getName());


We use string deserializer for the key, but for the value, we're using the new KafkaProtobufDeserializer. For the Protobuf deserializer, we need to provide the Schema Registry URL, as we did for the serializer above.

The last line is the most important. It tells the deserializer to which class to deserializer the record values. In our case, it's the SimpleMessage class (the one we generated from the Protobuf schema using the Protobuf maven plugin).

Now we're ready to create our consumer and subscribe it to protobuf-topic:

Java
 




xxxxxxxxxx
1


 
1
KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties);
2
consumer.subscribe(Collections.singleton("protobuf-topic"));


And then we poll Kafka for records and print them to the console:

Java
 




xxxxxxxxxx
1


 
1
while (true) {
2
    ConsumerRecords<String, SimpleMessage> records = consumer.poll(Duration.ofMillis(100));
3
    for (ConsumerRecord<String, SimpleMessage> record : records) {
4
        System.out.println("Message content: " + record.value().getContent());
5
        System.out.println("Message time: " + record.value().getDateTime());
6
    }
7
    consumer.commitAsync();
8
}


Here we're consuming a batch of records and just printing the content to the console. 

Remember when we configured the consumer to let us handle committing offsets by setting ENABLE_AUTO_COMMIT_CONFIG to false? That's what we're doing in the last line: only after we've fully processed the current group of records will we commit the consumer offset.

That's all there is to writing a simple Protobuf consumer. Let's now check one more variant.

Generic Protobuf Consumer

What if you want to handle messages in a generic way in your consumer, without generating a Java class from a Protobuf schema? Well, you can use an instance of DynamicMessage class from Protobuf library. DynamicMessage has a reflective API, so you can navigate through message fields and read their values. Here's how you can do it...

First, let's configure the consumer. Its configuration is very similar to the previous example:

Java
 




xxxxxxxxxx
1


 
1
Properties properties = new Properties();
2
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
3
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "generic-protobuf-consumer-group");      
4
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
5
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
6

          
7
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
8
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
9
properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");


The only thing missing is the SPECIFIC_PROTOBUF_VALUE_TYPE configuration. Since we want to handle messages in a generic way, we don't need this configuration.

Now we're ready to create our consumer and subscribe it to protobuf-topic topic, as in the previous example:

Java
 




xxxxxxxxxx
1


 
1
KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties);
2
consumer.subscribe(Collections.singleton("protobuf-topic"));


And then we poll Kafka for records and print them to the console:

Java
 




xxxxxxxxxx
1


 
1
while (true) {
2
    ConsumerRecords<String, DynamicMessage> records = consumer.poll(Duration.ofMillis(100));
3
    for (ConsumerRecord<String, DynamicMessage> record : records) {
4
        for (FieldDescriptor field : record.value().getAllFields().keySet()) {
5
            System.out.println(field.getName() + ": " + record.value().getField(field));
6
        }
7
    }
8
    consumer.commitAsync();
9
}


Without SPECIFIC_PROTOBUF_VALUE_TYPE configured in our consumer, the consumer will always return the instance of DynamicMessage in the record's value. Then we use the DynamicMessage.getAllFields() method to obtain the list of FieldDescriptors. Once we have all the descriptors we can simply iterate through them and print the value of each field.

Check out the JavaDoc to find out more about DynamicMessage.

That wraps our Kafka Protobuf guide. Now you're ready to start writing producers and consumers that send Protobuf messages to Apache Kafka with help of Schema Registry.

All the code from this blog post is available on Coding Harbour's GitHub.

Would you like to learn more about Kafka?

I have created a Kafka mini-course that you can get absolutely free. Sign up for it over at Coding Harbour.

kafka Schema Java (programming language) Data Types file IO Record (computer science) avro cluster Data (computing)

Published at DZone with permission of Dejan Maric. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • High-Performance Java Serialization to Different Formats
  • Practical Generators in Go 1.23 for Database Pagination
  • Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
  • Migrating From Lombok to Records in Java

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!