DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Using KRaft Kafka for Development and Kubernetes Deployment
  • Setting Up Local Kafka Container for Spring Boot Application
  • Reactive Kafka With Spring Boot
  • Exploring JSON Schema for Form Validation in Web Components

Trending

  • Performance Optimization Techniques for Snowflake on AWS
  • Kubeflow: Driving Scalable and Intelligent Machine Learning Systems
  • Contextual AI Integration for Agile Product Teams
  • Scaling DevOps With NGINX Caching: Reducing Latency and Backend Load
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Testing Schema Registry: Spring Boot and Apache Kafka With JSON Schema

Testing Schema Registry: Spring Boot and Apache Kafka With JSON Schema

Learn how to write tests for the Schema Registry for Apache Kafka using Spring Boot, MockSchemaRegistryClient, and EmbeddedKafka with JSON Schema.

By 
Mukut Bhattacharjee user avatar
Mukut Bhattacharjee
·
May. 04, 22 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
14.6K Views

Join the DZone community and get the full member experience.

Join For Free

Introduction

Apache Kafka is the most widely used reliable and scalable eventing platform. It helps in providing the base for a fast, scalable, fault-tolerant, event-driven microservice architecture. Any type of data can be published into a Kafka topic and can be read from. Kafka does not provide an out-of-the-box schema validation system and hence is prone to junk data being fed into a topic.

Confluent has devised a concept of a schema registry that can be used to implement type checking of messages in any Kafka installation. The Schema Registry needs the message schema to be registered against a topic and it enforces that only messages conforming to the schema are sent to the topic.

Integrating Schema Registry is not so much of a challenge with a Spring Boot Kafka producer and consumer apps with extensive support from Spring Boot and Apache Kafka. However, when it comes to testing, this often poses some challenges to the developer.

Since JSON is the most widespread data format, in this post I describe how to write tests for the Schema Registry for Apache Kafka using Spring Boot, MockSchemaRegistryClient, and EmbeddedKafka using JSON Schema.

Install Dependencies

Add the confluent kafka-json-schema-serializer from Confluent dependency in pom.xml.

 
<project>
    ...

    <repositories>
        <repository>
            <id>confluent</id>
            <name>confluent</name>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-json-schema-serializer</artifactId>
            <version>6.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    ...
</project>


Setting Up the Test Infrastructure

To test the schema registry, we won't require a full-fledged production instance of Kafka. Rather, we will use an embedded version of Kafka. To set up Embedded Kafka, we need to annotate any @SpringBootTest class or any @TestConfiguration class with @EmbeddedKafka providing required parameters as shown below:

 
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:8092", "port=8092"})
@ActiveProfiles("test")
@DirtiesContext
public class KafkaTestSupport {


}


io.confluent:kafka-json-schema-serializer provides a mock implementation of Schema Registry client called MockSchemaRegistryClient that can be used to register and test out JSON schema. The io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer should be used as a value serializer for JSON schema-based events.

To set up a mock schema-registry server, the producer properties should be set as follows in application.yaml in your test resources.

 
spring:
  kafka:
    bootstrap-servers: localhost:8092 # should match with embedded broker host port
    producer:
      value-serializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
    properties:
      schema.registry.url: mock://not-used
      auto.register.schemas: false
      use.latest.version: true
      json:
        fail.invalid.schema: true
        oneof.for.nullables: false
        write.dates.iso8601: true


A couple of points to note here:

  1. The mock Schema Registry URL must start with mock://.
  2. We are going to set auto.register.schema to false, since we will be populating and registering our JSON schema with the mock registry server beforehand. Setting this to false will disallow auto-creation and registration of the schema using Jackson.

Let's consider our event model SampleEvent as follows:

 
public class SampleEvent {

    @JsonProperty(required = true)
    private Long id;

    private String name;

    @JsonProperty(required = true)
    private OffsetDateTime date;

    @JsonProperty(required = true)
    private List<SampleSubEvent> subEvents;

}

public class SampleSubEvent {

    @JsonProperty(required = true)
    private Long id;

    private String name;

}


Writing the Test Cases

As part of the configuration, we need to provide a SchemaRegistryClient bean of type MockSchemaRegistryClient and register the serializer to use this mock schema registry instead of an actual one. The ProducerFactory should also be registered to use this serializer. We will do these using a test configuration static inner class.

 
class SchemaRegistryTest extends KafkaTestSupport {


    @TestConfiguration
    public static class TestConfig<K, V> {

        @Autowired
        KafkaProperties properties;

        @Bean
        public SchemaRegistryClient schemaRegistryClient() throws RestClientException, IOException {
            SchemaProvider provider = new JsonSchemaProvider();
            return new MockSchemaRegistryClient(Collections.singletonList(provider));
        }

        @Bean
        public KafkaJsonSchemaSerializer<V> kafkaJsonSchemaSerializer(SchemaRegistryClient schemaRegistryClient)
                throws RestClientException, IOException {
            return new KafkaJsonSchemaSerializer<V>(schemaRegistryClient);
        }

        @Bean
        public ProducerFactory<K, V> producerFactory(Serializer<V> vSerializer) {
            return new DefaultKafkaProducerFactory<K, V>(properties.buildProducerProperties(), (Serializer<K>) new StringSerializer(),
                    vSerializer);
        }

    }

}


Once our mock Schema Registry has been set up, the next step is to generate a valid schema for the event model and register the schema with the mock Schema Registry. Following the default subject naming strategy, for a topic called test, the corresponding subject name will be test-value. For more details on the topic and subjects and how the subject naming strategies work, I would suggest you read this documentation from Confluent.

 
class SchemaRegistryTest extends KafkaTestSupport {

    private static final String TOPIC = "test-";

    private static final String SUBJECT = "test-value";

    private static final String SCHEMA_STR = "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Sample Event\",\"type\":\"object\",\"additionalProperties\":true,\"properties\":{\"id\":{\"type\":\"integer\"},\"name\":{\"type\":\"string\"},\"date\":{\"type\":\"number\"},\"subEvents\":{\"type\":\"array\",\"items\":{\"$ref\":\"#/definitions/SampleSubEvent\"}}},\"required\":[\"id\",\"date\",\"subEvents\"],\"definitions\":{\"SampleSubEvent\":{\"type\":\"object\",\"additionalProperties\":true,\"properties\":{\"id\":{\"type\":\"integer\"},\"name\":{\"type\":\"string\"}},\"required\":[\"id\"]}}}";

    private static final String SOME_INVALID_SCHEMA_STRING = "some_junk";


    @Autowired
    private KafkaTemplate<String, SampleEvent> template;

    @Autowired
    private SchemaRegistryClient schemaRegistryClient;


    @Test
    void should_publish_successfully_on_valid_schema() throws RestClientException, IOException {

        // parse and register the schema
        Optional<ParsedSchema> parsedSchema = schemaRegistryClient.parseSchema("JSON",SCHEMA_STR,
                Collections.EMPTY_LIST);

        if(parsedSchema.isPresent()){
            ParsedSchema schema = parsedSchema.get();
            schemaRegistryClient.register(SUBJECT, schema,1,1);
        }

        SampleEvent e = new SampleEvent();

        // send and assert that no exceptions are thrown
        Assertions.assertDoesNotThrow(() -> template.send(TEST_TOPIC, e));
    }


    @TestConfiguration
    public static class TestConfig<K, V> {

        ...

    }

}


Similarly, to test that an exception is thrown, register a wrong schema or try publishing a different event.

 
    @Test
    void should_throw_error_on_invalid_schema() throws RestClientException, IOException {

        Optional<ParsedSchema> parsedSchema = schemaRegistryClient.parseSchema("JSON", SOME_INVALID_SCHEMA_STRING,
                Collections.EMPTY_LIST);

        if(parsedSchema.isPresent()){
            ParsedSchema schema = parsedSchema.get();
            schemaRegistryClient.register(SUBJECT, schema,1,1);
        }

        SampleEvent e = new SampleEvent();


        // send and assert that exceptions are thrown
        Assertions.assertThrows(Exception.class,
                () -> template.send(TEST_TOPIC, e));
    }


And voila!!!

I hope that this article will be a good and helpful resource on testing Kafka Schema Registry implementation involving JSON schema.

JSON kafka Schema Spring Boot

Opinions expressed by DZone contributors are their own.

Related

  • Using KRaft Kafka for Development and Kubernetes Deployment
  • Setting Up Local Kafka Container for Spring Boot Application
  • Reactive Kafka With Spring Boot
  • Exploring JSON Schema for Form Validation in Web Components

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!