DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Event-Driven Pipelines With Apache Pulsar and Go
  • Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
  • Kafka and Spark Structured Streaming in Enterprise: The Patterns That Hold Up Under Pressure
  • Exactly-Once Processing: Myth vs Reality

Trending

  • Bringing Intelligence Closer to the Source: Why Real-Time Processing is the Heart of Edge AI
  • AI Agents in Java: Architecting Intelligent Health Data Systems
  • Feature Flag Debt: Performance Impact in Enterprise Applications
  • RAG Is Not Enough: Advanced Retrieval Architectures Using Vertex AI Search on GCP
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka Topics Naming

Kafka Topics Naming

In this post, learn how you can easily enforce a naming convention on Apache Kafka Topics.

By 
Saurabh Sharma user avatar
Saurabh Sharma
·
Jul. 14, 22 · Code Snippet
Likes (6)
Comment
Save
Tweet
Share
10.0K Views

Join the DZone community and get the full member experience.

Join For Free

Creating a Topic in a Kafka cluster is easy and is well documented for kafka-topics.sh or even the official API documentation.

 
 bin/kafka-topics.sh --help


The complexity arises when you are trying to enforce a standard way of defining topic naming. There are many ways to identify the right convention based on your need, but to enforce such a topic convention while you are creating one is explained in this 5-step blog.

There is no right convention: it is always determined based on what your business needs.

For my example, I wish to define a topic convention that follows the semantics:

 
<organizationname>.<productname>


It is simple enough to get started and can be easily extended, as you will observe as you follow along.

From the official documentation, if you wish to define a custom topic policy creation you will have to define the property:

Properties files
 
create.topic.policy.class.name=mypackage.className


The className should implement the interface:

 org.apache.kafka.server.policy.CreateTopicPolicy

Step 1: Building the Project

With these two building blocks, let's define a Maven project:

Building Project

Step 2: Define the Dependency

Let's define a package "me.samarthya" and also add the dependency of the Kafka clients in the "pom.xml."

XML
 
 <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.2.0</version>
   <scope>compile</scope>
</dependency>


Step 3: Implementation

Let's define the main class TopicPolicy:

Java
 
public class TopicPolicy implements CreateTopicPolicy {
    private final Logger logger = Logger.getLogger(TopicPolicy.class.toString());

    private final static String TopicPattern = "\\w+\\.{1}\\w+";

    @Override
    public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
        StringBuilder bd = new StringBuilder().append(" Topic Name=").append(requestMetadata.topic());
        logger.info(bd.toString());
        if ( requestMetadata.topic().isEmpty() || !Pattern.matches(TopicPattern, requestMetadata.topic())) {
            throw new PolicyViolationException("Topic name " + requestMetadata.topic() + " should match the pattern " + TopicPattern);
        }
    }

    @Override
    public void close() throws Exception {
        logger.info(" Close & release.");
    }

    @Override
    public void configure(Map<String, ?> configs) {
        if (configs != null) {
            for( String k: configs.keySet()) {
                logger.info(configs.get(k).toString());
            }
        }
    }
}


With the class defined, the main thing to observe is that the TopicPattern that has been defined as the format will be matched for the name. If it is not found, a PolicyViolationException will be thrown.

Step 4: Repeat for Each Broker in the Cluster

Package the jar. It has to be placed under the "lib" folder of the Kafka (classpath).  

PowerShell
 
  4 -rw-r--r--. 1 vagrant vagrant     3881 Jul 12 06:28 topic-policy-1.0-SNAPSHOT.jar


Also, in the "server.properties," you can define two properties:

Properties files
 
create.topic.policy.class.name=me.samarthya.TopicPolicy
auto.create.topics.enable=false


Restart your cluster.

Step 5: Test Your "Topics"

Let's go back to the Kafka binary folder (local machine) and issue the topic creation command again.

PowerShell
 
 bin/kafka-topics.sh --bootstrap-server mybroker.test:9092  --topic invalid_topic --create


If the jar has been loaded successfully, you should see an error reported as below:

PowerShell
 
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Error while executing topic command : Topic name invalid_topic should match the pattern \w+\.{1}\w+
[2022-07-13 09:49:21,805] ERROR org.apache.kafka.common.errors.PolicyViolationException: Topic name invalid_topic should match the pattern \w+\.{1}\w+
 (kafka.admin.TopicCommand$)


You can modify the pattern now as per your convenience and re-deploy the jar to check the new custom topic policies.

Example

PowerShell
 
bin/kafka-topics.sh --bootstrap-server mybroker.test.test:9092  --topic invalid.valid --create
PowerShell
 
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic invalid.valid


Note

Since the auto-topic creation has been disabled, if you try and create an invalid topic through producer, it will not work (see below).

PowerShell
 
 bin/kafka-console-producer.sh --bootstrap-server mybroker.test:9092 --topic test


This will result in the following error:

PowerShell
 
[2022-07-13 09:54:21,196] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)


For an existing topic invalid.valid, it should work as follows:

PowerShell
 
bin/kafka-console-producer.sh --bootstrap-server mybrokers.test:9092 --topic invalid.valid


kafka

Opinions expressed by DZone contributors are their own.

Related

  • Event-Driven Pipelines With Apache Pulsar and Go
  • Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
  • Kafka and Spark Structured Streaming in Enterprise: The Patterns That Hold Up Under Pressure
  • Exactly-Once Processing: Myth vs Reality

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook