DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Schema Evolution in Event-Driven Systems: Avro/Protobuf Strategies That Don’t Break Consumers
  • Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
  • Understanding Kafka and Event-Driven Architecture [Video Tutorials]
  • Spring Boot and Apache Kafka [Video Tutorials]

Trending

  • Throughput vs Goodput: The Performance Metric You Are Probably Ignoring in LLM Testing
  • LLM Integration in Enterprise Applications: A Practical Guide
  • How AI Is Rewriting Full-Stack Java Systems: Practical Patterns with Spring Boot, Kafka and WebSockets
  • Code Quality Had 5 Pillars. AI Broke 3 and Created 2 We Can’t Measure
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Producing and Consuming Avro Messages With Redpanda Schema Registry

Producing and Consuming Avro Messages With Redpanda Schema Registry

In this article, learn how to use Redpanda’s built-in schema registry to produce and consume Apache Avro messages in five simple steps.

By 
Dunith Dhanushka user avatar
Dunith Dhanushka
DZone Core CORE ·
Apr. 21, 23 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
4.7K Views

Join the DZone community and get the full member experience.

Join For Free

If you’re familiar with Apache Kafka®, then you might have encountered a Kafka-compatible schema registry—a separate component that you deploy outside of your Kafka cluster, since Kafka itself doesn’t have one built-in. 

Essentially, a schema is a logical description of how your data is organized, and so a schema registry provides a central repository for those schemas, allowing producers and consumers to seamlessly send and receive data between them. For event-driven architectures, this can become complex and difficult to manage as you scale, since data schemas can change and evolve over time (potentially breaking things down the line). 

This is where Redpanda steps in. Redpanda is a Kafka API-compatible streaming data platform that’s designed from the ground up to be fast, simple, and cost-effective. In keeping with that mission, Redpanda comes with a schema registry already built in so you can store, version control, and validate schemas for your real-time applications without deploying or managing anything other than the Redpanda cluster.  

To give you a taste of how it works, this post walks you through building a simple clickstream processor using the Redpanda schema registry to produce and consume Apache Avro™ messages. We chose Avro since it’s the most popular choice to serialize data in a compact binary format and support schema evolution. This tutorial is just five steps long and everything you need is in this GitHub repository.

Ready? Let’s get started.

Before that, you may need a little refresher on basic Kafka API concepts, like topics, partitions, consumers, schemas, etc.

How To Build a Clickstream Processing Application

It’s time to get down to the code. In just five steps, we’ll spin up a single-node Redpanda cluster in Redpanda Console with Docker Compose, and then use a schema-registry-aware SDK client (confluent-kafka) to produce and consume Avro messages to Redpanda. The SDK will do most of the heavy lifting, including schema registration, and schema compatibility checking.

In brief, the Python producer application collects user interaction events from a web application, serializes them in Avro, and publishes them into the clickstream topic in Redpanda. Another Python application consumes them from Redpanda, deserializes, and uses them to analyze user behavior.

Producer to Consumer

The format of a sample ClickStream event would look like this:

JSON
 
{
   "user_id":2323,
   "event_type":"BUTTON_CLICK",
   "ts":"2018-11-12 01:02:03.123456789"
}


To mimic the above use case, we’ll write a simple producer and consumer in Python that produce and consume messages from the clickstream Redpanda topic. They’ll use the PandaProxy REST API to communicate with the schema registry. 

Before we start, make sure you have Docker Desktop and Python 3 (with pip) installed on your local machine.

1. Clone the GitHub Repository

This GitHub repository contains a few artifacts to bootstrap the tutorial. It will help you get started immediately. We will create additional code artifacts as we progress through the tutorial.  

Execute the following commands to clone the repository to your local machine.

Shell
 
git clone https://github.com/redpanda-data-blog/2023-schema-registry-kafka-avro-tutorial.git code
cd code


2. Install Python Dependencies

We will install several Python libraries used by the Python clients you will see later in this tutorial. You will find them in the requirements.txt file at the root level of the repo. To prevent them from clashing with your local dependencies, let’s create a new virtual environment for them with venv.

Execute the following commands in a terminal window.

Shell
 
python3 -m venv env
source env/bin/activate
pip install --upgrade pip
pip install -r requirements.txt


Also, note that we’re using the confluent-kafka Python SDK for all the API communications with the Redpanda schema registry. It’s a schema-registry-aware SDK that’s also compatible with the Confluent schema registry. Because of that, confluent-kafka will do a lot of heavy lifting for us under the hood, such as adding padding for each message consisting of the magic byte and the schemaID. Also, it can automatically register the schemas with the registry.

Another advantage is that you use the Redpanda schema registry with your Confluent SDK clients, without needing any code changes.

3. Start a Redpanda Cluster

Next, we will use Docker Compose to create a Redpanda cluster.

Locate the docker-compose.yml file at the root level of the cloned repository and run the following command in a terminal.

Shell
 
docker compose up -d


That will spin up a single-node Redpanda cluster with the Redpanda console. This Redpanda node contains the schema registry built-in. You can visually explore the schema definitions stored in the schema registry with the Redpanda console.

Access the console by logging into http://localhost:8080/brokers. Click on the Schema Registry in the sidebar to see the schema definitions. 

Redpanda Schema Registry

You should see an empty screen since we started the cluster from scratch.

4. Write the Producer Code

Now that we have a functioning Redpanda cluster, the next step is to produce Avro-formatted messages. 

The producer.py in the cloned repository has the Python code for event production. Its content would look like this:

Python
 
import json
from uuid import uuid4
from confluent_kafka import KafkaException
from confluent_kafka.avro import AvroProducer 
from confluent_kafka import avro

def delivery_callback(error, message):
    if error:
        print("Failed to send the message: %s" % error)
    else:
        print(f"Message with the key {message.key()} has been produced to the topic {message.topic()}")

def load_avro_schema_from_file():
    key_schema_string = """
    {"type": "string"}
    """

    key_schema = avro.loads(key_schema_string)
    value_schema = avro.load('./schemas/click_event.avsc')

    return key_schema, value_schema

def produce():
    config = {
        'bootstrap.servers' : "localhost:9092",
        'schema.registry.url' : "http://localhost:8081"
    }

    key_schema, value_schema = load_avro_schema_from_file()
    
    producer = AvroProducer(
        config,
        default_key_schema = key_schema,
        default_value_schema = value_schema
    )

    try:
        key = str(uuid4())
        value_str = '{"user_id":2,"event_type":"CLICK","ts":"2021-12-12"}'
        value = json.loads(value_str) 

        producer.produce(
            topic = "clickstream",
            key = key,
            value = value,
            on_delivery = delivery_callback
        )

        producer.poll(10000)
        producer.flush()
    
    except KafkaException as e:
        print("Error occurred during message production:", e)
    
    print("Done!")

def main():
    produce()

if __name__ == "__main__":
    main()


Most of the methods are just the boilerplate code and are self-explanatory. So, let’s walk through the methods that matter most to serialization.

First, we pass the schema URL to the SDK client by setting the configuration property, schema.registry.url

Next, the load_avro_schema_from_file() method returns two schemas for a ClickStream event—the schema for the key and the schema for the value. 

Python
 
def load_avro_schema_from_file():
    key_schema_string = """
    {"type": "string"}
    """

    key_schema = avro.loads(key_schema_string)
    value_schema = avro.load('./schemas/click_event.avsc')

    return key_schema, value_schema


Note that the value schema is loaded from the schemas/click_event.avsc file in the repository. That file contains the following Avro schema definition which defines the structure of a ClickStream event.

JSON
 
{
    "type" : "record",
    "namespace" : "com.redpanda.examples.avro",
    "name" : "ClickEvent",
    "fields" : [
       { "name": "user_id", "type" : "int" },
       { "name": "event_type", "type" : "string" },
       { "name": "ts", "type": "string" }
    ]
 }


Once both schemas are derived, they are passed as the arguments to the SDK’s serializer (AvroProducer). If you recall the serialization workflow above, this is where the AvroProducer requested the schemaID for the subject, ClickEvent. Here, the subject name is derived from the name field, specified in the Avro schema definition. 

Python
 
producer = AvroProducer(
        config,
        default_key_schema = key_schema,
        default_value_schema = value_schema
    )


When the producer code runs for the first time, the AvroProducer automatically registers the ClickEvent schema in the schema registry and fetches the schemaID, which happens to be 1. This schema ID is unique across the Redpanda cluster, and you can use it to retrieve the schema later. 

The subsequent invocations will read the schemaID from the cache.

Next, run the file on a terminal to produce Avro messages.

Shell
 
python producer.py


Log into the Redpanda Console’s Topics page to see if the clickstream topic has been populated with a single event. 

Screenshot of the Topics page in Redpanda Console

Screenshot of the Topics page in Redpanda Console

Note that the Redpanda Console can deserialize Avro messages for you, showing you the content of the clickstream event’s payload (value) since we used a schema registry-aware Python SDK. As both the producer client and the Console use the same schema registry, the Console can determine which schema to use for deserialization by looking at the schemaID portion carrying in each message. 

Next, log in to the Redpanda console’s Schema Registry page to verify the schema registration. You will see the clickstream-key and the clickstream-value schema definitions have already been registered with the schema registry. 

Screenshot of the Schema Registry page in Redpanda Console

Now that we have Avro-formatted messages in the clickstream topic, let’s deserialize them with a Python consumer.

The consumer.py file in the repository contains the code to consume the clickstream topic, deserialize the messages, and print their content on the terminal. The file content would look like this:

Python
 
import json
from confluent_kafka import KafkaException
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import avro 

def consume():
    config = {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081",
        "group.id": "my-connsumer1",
        "auto.offset.reset": "earliest"
    }

    consumer = AvroConsumer(config)
    consumer.subscribe(["clickstream"])

    while True:
      try:
        msg = consumer.poll(1)

        if msg is None:
          continue

        print("Key is :" + json.dumps(msg.key()))
        print("Value is :" + json.dumps(msg.value()))
        print("-------------------------")

      except KafkaException as e:
        print('Kafka failure ' + e)

    consumer.close()

def main():
    consume()

if __name__ == '__main__':
    main()


The code is pretty self-explanatory. As we did with the producer, the schema registry URL has been configured with config object and passed into the Avro deserializer, AvroConsumer, which subscribes to the clickstream topic.

That’s all you need to know. The rest, including the schemaID discovery, schema retrieval, and finally, the deserialization will be taken care of by the AvroConsumer.

Run the file in a  terminal.

Shell
 
python consumer.py


You should see a single event in return, with their deserialized content as follows.

Shell
 
Key is :"39950858-1cfd-4d56-a3ac-2bde1c806f6f"
Value is :{"user_id": 2, "event_type": "CLICK", "ts": "2021-12-12"}


Simplifying Schema Registry in Kafka With Redpanda

If you made it this far, give yourself a pat on the back because you just used Redpanda’s built-in schema registry to produce and consume Avro messages! This is just one way Redpanda works to make streaming data in Kafka faster and simpler. There are no new binaries to install, no new services to deploy and maintain, and the default configuration just works.

Feel free to customize this clickstream processing example further and play around with other data schema formats, like Protobuf. To learn more about Redpanda, check out our documentation.

avro kafka Schema Message consumer producer

Published at DZone with permission of Dunith Dhanushka. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Schema Evolution in Event-Driven Systems: Avro/Protobuf Strategies That Don’t Break Consumers
  • Handling Schema Versioning and Updates in Event Streaming Platforms Without Schema Registries
  • Understanding Kafka and Event-Driven Architecture [Video Tutorials]
  • Spring Boot and Apache Kafka [Video Tutorials]

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook