DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • How To Get Closer to Consistency in Microservice Architecture
  • Introduction to Apache Kafka With Spring
  • Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium
  • Unbounded Stream Processing Using Apache Beam

Trending

  • The Cypress Edge: Next-Level Testing Strategies for React Developers
  • How to Convert XLS to XLSX in Java
  • Why Documentation Matters More Than You Think
  • Optimize Deployment Pipelines for Speed, Security and Seamless Automation
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Cloud-Events Kafka Binding With Spring

Cloud-Events Kafka Binding With Spring

Utilize Kafka Protocol Binding for CloudEvents along with Spring-Kafka to easily produce and consume events in a common format.

By 
Will Kennedy user avatar
Will Kennedy
·
Nov. 13, 20 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
12.6K Views

Join the DZone community and get the full member experience.

Join For Free

This article aims to provide details on how to use the spring-kafka library with the CloudEvents spec in order to quickly get you producing and consuming events with a common structure by way of Kafka. The spring-kafka project provides utilities and templates to interact with Kafka with minimal effort, while the CloudEvents specification describes event data in a common way and provides both a Java-SDK as well as a library for Kafka protocol binding.

The CloudEvents Kafka protocol binding provides two content modes for transferring data.

  1. Structured: event metadata attributes and event data are placed into the Kafka message value section using an event format.

  2. Binary: the value of the event data MUST be placed into the Kafka message's value section as-is, with the content-type header value declaring its media type; all other event attributes MUST be mapped to the Kafka message's header section.

For more details see: https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#13-content-modes

Dependencies

In this tutorial we'll be using spring-kafka 2.5.5.RELEASE and cloudevents-kafka 2.0.0-milestone3. As of this writing, version 2 of the cloud events library hasn't been released, so the latest milestone is used.

XML
xxxxxxxxxx
1
15
 
1
        <dependency>
2
            <groupId>org.springframework.kafka</groupId>
3
            <artifactId>spring-kafka</artifactId>
4
            <version>2.5.5.RELEASE</version>
5
        </dependency>
6
        <dependency>
7
            <groupId>io.cloudevents</groupId>
8
            <artifactId>cloudevents-kafka</artifactId>
9
            <version>2.0.0-milestone3</version>
10
        </dependency>
11
        <dependency>
12
            <groupId>io.cloudevents</groupId>
13
            <artifactId>cloudevents-json-jackson</artifactId>
14
            <version>2.0.0-milestone3</version>
15
        </dependency>

Binary Producer and Consumer

In order to publish and consumer messages from Kafka, we'll need to create a Producer and Consumer that can support serializing and deserializing Cloud Events.

Producer

The first thing you need is a producer. We can do this by creating a class to help build our Cloud Event supporting producer:

Java
xxxxxxxxxx
1
49
 
1
import io.cloudevents.core.format.EventFormat;
2
import io.cloudevents.core.message.Encoding;
3
import io.cloudevents.core.provider.EventFormatProvider;
4
import io.cloudevents.jackson.JsonFormat;
5
import io.cloudevents.kafka.CloudEventSerializer;
6
import org.apache.kafka.common.serialization.Serializer;
7
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
8
9
import java.util.HashMap;
10
import java.util.Map;
11
12
import static io.cloudevents.kafka.CloudEventSerializer.ENCODING_CONFIG;
13
import static io.cloudevents.kafka.CloudEventSerializer.EVENT_FORMAT_CONFIG;
14
15
public class CloudEventKafkaProducerFactory {
16
17
    public static <K, V> DefaultKafkaProducerFactory<K, V> cloudEventKafkaProducerFactory(
18
        Map<String, Object> configs,                                                                              Serializer<K> keySerializer,                                                                                         Encoding encoding) {
19
      
20
        return cloudEventKafkaProducerFactory(configs, keySerializer,                   encoding, JsonFormat.CONTENT_TYPE);
21
    }
22
23
    @SuppressWarnings("unchecked")
24
    public static <K, V> DefaultKafkaProducerFactory<K, V> cloudEventKafkaProducerFactory(
25
      Map<String, Object> configs, 
26
      Serializer<K> keySerializer, 
27
      Encoding encoding, 
28
      String eventFormat) {
29
      
30
        //If present, the Kafka message header property content-type MUST be set to the media type of an event format.
31
        if(Encoding.STRUCTURED.equals(encoding)) {
32
            EventFormat resolveFormat = EventFormatProvider
33
              .getInstance()
34
              .resolveFormat(eventFormat);
35
            if(resolveFormat == null) {
36
                eventFormat = JsonFormat.CONTENT_TYPE;
37
            }
38
        }
39
        Map<String, Object> ceSerializerConfigs = new HashMap<>();
40
        ceSerializerConfigs.put(ENCODING_CONFIG, encoding);
41
        ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, eventFormat);
42
        CloudEventSerializer cloudEventSerializer = 
43
                    new CloudEventSerializer();
44
        //isKey always false
45
        cloudEventSerializer.configure(ceSerializerConfigs, false); 
46
      
47
        return new DefaultKafkaProducerFactory<>(configs, keySerializer,                (Serializer<V>) cloudEventSerializer);
48
    }
49
}


This class will allow us to create a DefaultKafkaProducerFactory that supports either binary or structured encoding. You can also specify the event format depending on your needs.

You can instantiate a ProducerFactory to serialize your data into a structured cloud event using the CloudEventKafkaProducerFactory. You simply need to provide it with your producer configs, the key serializer, and the encoding type, in this case, Encoding.BINARY.

Java
x
 
1
DefaultKafkaProducerFactory<String, CloudEvent> binaryProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(producerConfigs, new StringSerializer(), Encoding.BINARY);


Consumer

We'll also need to create a class to help us construct a consumer that supports Cloud Events:

Java
 
xxxxxxxxxx
1
54
 
1
import io.cloudevents.core.format.EventFormat;
2
import io.cloudevents.core.message.Encoding;
3
import io.cloudevents.core.provider.EventFormatProvider;
4
import io.cloudevents.jackson.JsonFormat;
5
import io.cloudevents.kafka.CloudEventDeserializer;
6
import org.apache.kafka.common.serialization.Deserializer;
7
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
8

          
9
import java.util.HashMap;
10
import java.util.Map;
11

          
12
import static io.cloudevents.kafka.CloudEventSerializer.ENCODING_CONFIG;
13
import static io.cloudevents.kafka.CloudEventSerializer.EVENT_FORMAT_CONFIG;
14

          
15
public class CloudEventKafkaConsumerFactory {
16

          
17
    public static <K, V> DefaultKafkaConsumerFactory<K, V>  consumerFactory(
18
      Map<String, Object> configs,
19
      Deserializer<K> keySerializer, 
20
      Encoding encoding) {
21
        return consumerFactory(configs, 
22
                               keySerializer, 
23
                               encoding, 
24
                               JsonFormat.CONTENT_TYPE);
25
    }
26

          
27
    @SuppressWarnings("unchecked")
28
    public static <K, V> DefaultKafkaConsumerFactory<K, V> consumerFactory(
29
      Map<String, Object> configs, 
30
      Deserializer<K> keySerializer, 
31
      Encoding encoding, 
32
      String eventFormat) {
33
        //If present, the Kafka message header property content-type MUST be set to the media type of an event format.
34
        if(Encoding.STRUCTURED.equals(encoding)) {
35
            EventFormat resolveFormat = EventFormatProvider
36
              .getInstance()
37
              .resolveFormat(eventFormat);
38
            if(resolveFormat == null) {
39
                eventFormat = JsonFormat.CONTENT_TYPE;
40
            }
41
        }
42
        Map<String, Object> ceDeserializerConfigs = new HashMap<>();
43
        ceDeserializerConfigs.put(ENCODING_CONFIG, encoding);
44
        ceDeserializerConfigs.put(EVENT_FORMAT_CONFIG, eventFormat);
45

          
46
        CloudEventDeserializer cloudEventDeserializer = 
47
          new CloudEventDeserializer();
48
        //isKey always false
49
        cloudEventDeserializer.configure(ceDeserializerConfigs, false); 
50
        return new DefaultKafkaConsumerFactory<>(configs,
51
                keySerializer,
52
                (Deserializer<V>) cloudEventDeserializer);
53
    }
54
}


This class will allow us to create a DefaultKafkaConsumerFactory that supports either binary or structured encoding which will properly deserialize your Kafka message to a CloudEvent object.

The process of creating a consumer is similar to that of the producer.

Java
xxxxxxxxxx
1
 
1
ConsumerFactory<String, CloudEvent> binaryConsumerFactory =     CloudEventKafkaConsumerFactory.consumerFactory(
2
  consumerConfigs, 
3
  new StringDeserializer(), 
4
  Encoding.BINARY);

Structured Producer and Consumer

Producer

You can create a ProducerFactor to serialize your data into a structured cloud event using the CloudEventKafkaProducerFactory. You simply need to provide it with your producer configs, the key serializer, and the encoding type, in this case, Encoding.STRUCTURED.

Java
xxxxxxxxxx
1
 
1
DefaultKafkaProducerFactory<String, CloudEvent> structuredProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(
2
  producerConfigs, 
3
  new StringSerializer(), 
4
  Encoding.STRUCTURED);

Consumer

The process of creating a consumer is similar to that of the producer.

Java
xxxxxxxxxx
1
 
1
ConsumerFactory<String, CloudEvent> structuredConsumerFactory = CloudEventKafkaConsumerFactory.consumerFactory(
2
  consumerConfigs, 
3
  new StringDeserializer(), 
4
  Encoding.STRUCTURED);

Example Usage

Java
xxxxxxxxxx
1
10
 
1
DefaultKafkaProducerFactory<String, CloudEvent> binaryProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(
2
  producerProps, new StringSerializer(), Encoding.BINARY); 
3
4
KafkaTemplate<String, CloudEvent> binaryKafkaTemplate = 
5
  new KafkaTemplate<>(binaryProducerFactory); 
6
7
ConsumerFactory<String, CloudEvent> binaryConsumerFactory = CloudEventKafkaConsumerFactory.consumerFactory(
8
  consumerProps, new StringDeserializer(), Encoding.BINARY);
9
10
Consumer<String, CloudEvent> binaryConsumer = binaryConsumerFactory.createConsumer();

With this KafkaTemplate and Consumer, you'll be able to publish and consume CloudEvents with binary encoding. Here is a unit test illustrating this usage:

Java
xxxxxxxxxx
1
17
 
1
    @Test
2
    public void testCloudEventsBinaryMessage() throws Exception {
3
        binaryKafkaTemplate.send(TOPIC, getCloudEvent(person));
4
        ConsumerRecord<String, CloudEvent> consumerRecord = KafkaTestUtils
5
          .getSingleRecord(binaryConsumer, TOPIC);
6
        CloudEvent consumedEvent = consumerRecord.value();
7
        Person payload = objectMapper
8
          .readValue(consumedEvent.getData(),Person.class);
9
10
        assertThat(consumedEvent.getId()).isEqualTo(cloudEventID);
11
        assertThat(consumedEvent.getType()).isEqualTo(cloudEventType);
12
        assertThat(consumedEvent.getSource()).isEqualTo(cloudEventSource);
13
14
        assertThat(payload).isEqualTo(person);
15
16
        KafkaUtils.clearConsumerGroupId();
17
}


To view the source code for these examples, please see my GitHub repository here:

https://github.com/wkennedy/spring-kafka-cloud-events

For additional information regarding these libraries, see:

https://cloudevents.io/

https://github.com/cloudevents

https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md

https://github.com/spring-projects/spring-kafka

kafka Event Binding (linguistics) Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • How To Get Closer to Consistency in Microservice Architecture
  • Introduction to Apache Kafka With Spring
  • Change Data Captures CDC from MySQL Database to Kafka with Kafka Connect and Debezium
  • Unbounded Stream Processing Using Apache Beam

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!