DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • From APIs to Event-Driven Systems: Modern Java Backend Design
  • How To Get Closer to Consistency in Microservice Architecture
  • Introduction to Apache Kafka With Spring
  • How AI Is Rewriting Full-Stack Java Systems: Practical Patterns with Spring Boot, Kafka and WebSockets

Trending

  • Prompt Injection Is Real, So I Built a Python Firewall for LLM Pipelines
  • Metal Default, a New Build Cloud, and a New Format
  • Is the Data Warehouse Dead? 3 Patterns From Enterprise Architecture That Answer This Question
  • WebSocket Debugging Without a Proxy — A Browser-First Workflow
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Cloud-Events Kafka Binding With Spring

Cloud-Events Kafka Binding With Spring

Utilize Kafka Protocol Binding for CloudEvents along with Spring-Kafka to easily produce and consume events in a common format.

By 
Will Kennedy user avatar
Will Kennedy
·
Nov. 13, 20 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
13.3K Views

Join the DZone community and get the full member experience.

Join For Free

This article aims to provide details on how to use the spring-kafka library with the CloudEvents spec in order to quickly get you producing and consuming events with a common structure by way of Kafka. The spring-kafka project provides utilities and templates to interact with Kafka with minimal effort, while the CloudEvents specification describes event data in a common way and provides both a Java-SDK as well as a library for Kafka protocol binding.

The CloudEvents Kafka protocol binding provides two content modes for transferring data.

  1. Structured: event metadata attributes and event data are placed into the Kafka message value section using an event format.

  2. Binary: the value of the event data MUST be placed into the Kafka message's value section as-is, with the content-type header value declaring its media type; all other event attributes MUST be mapped to the Kafka message's header section.

For more details see: https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#13-content-modes

Dependencies

In this tutorial we'll be using spring-kafka 2.5.5.RELEASE and cloudevents-kafka 2.0.0-milestone3. As of this writing, version 2 of the cloud events library hasn't been released, so the latest milestone is used.

XML
xxxxxxxxxx
1
15
 
1
        <dependency>
2
            <groupId>org.springframework.kafka</groupId>
3
            <artifactId>spring-kafka</artifactId>
4
            <version>2.5.5.RELEASE</version>
5
        </dependency>
6
        <dependency>
7
            <groupId>io.cloudevents</groupId>
8
            <artifactId>cloudevents-kafka</artifactId>
9
            <version>2.0.0-milestone3</version>
10
        </dependency>
11
        <dependency>
12
            <groupId>io.cloudevents</groupId>
13
            <artifactId>cloudevents-json-jackson</artifactId>
14
            <version>2.0.0-milestone3</version>
15
        </dependency>

Binary Producer and Consumer

In order to publish and consumer messages from Kafka, we'll need to create a Producer and Consumer that can support serializing and deserializing Cloud Events.

Producer

The first thing you need is a producer. We can do this by creating a class to help build our Cloud Event supporting producer:

Java
xxxxxxxxxx
1
49
 
1
import io.cloudevents.core.format.EventFormat;
2
import io.cloudevents.core.message.Encoding;
3
import io.cloudevents.core.provider.EventFormatProvider;
4
import io.cloudevents.jackson.JsonFormat;
5
import io.cloudevents.kafka.CloudEventSerializer;
6
import org.apache.kafka.common.serialization.Serializer;
7
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
8
9
import java.util.HashMap;
10
import java.util.Map;
11
12
import static io.cloudevents.kafka.CloudEventSerializer.ENCODING_CONFIG;
13
import static io.cloudevents.kafka.CloudEventSerializer.EVENT_FORMAT_CONFIG;
14
15
public class CloudEventKafkaProducerFactory {
16
17
    public static <K, V> DefaultKafkaProducerFactory<K, V> cloudEventKafkaProducerFactory(
18
        Map<String, Object> configs,                                                                              Serializer<K> keySerializer,                                                                                         Encoding encoding) {
19
      
20
        return cloudEventKafkaProducerFactory(configs, keySerializer,                   encoding, JsonFormat.CONTENT_TYPE);
21
    }
22
23
    @SuppressWarnings("unchecked")
24
    public static <K, V> DefaultKafkaProducerFactory<K, V> cloudEventKafkaProducerFactory(
25
      Map<String, Object> configs, 
26
      Serializer<K> keySerializer, 
27
      Encoding encoding, 
28
      String eventFormat) {
29
      
30
        //If present, the Kafka message header property content-type MUST be set to the media type of an event format.
31
        if(Encoding.STRUCTURED.equals(encoding)) {
32
            EventFormat resolveFormat = EventFormatProvider
33
              .getInstance()
34
              .resolveFormat(eventFormat);
35
            if(resolveFormat == null) {
36
                eventFormat = JsonFormat.CONTENT_TYPE;
37
            }
38
        }
39
        Map<String, Object> ceSerializerConfigs = new HashMap<>();
40
        ceSerializerConfigs.put(ENCODING_CONFIG, encoding);
41
        ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, eventFormat);
42
        CloudEventSerializer cloudEventSerializer = 
43
                    new CloudEventSerializer();
44
        //isKey always false
45
        cloudEventSerializer.configure(ceSerializerConfigs, false); 
46
      
47
        return new DefaultKafkaProducerFactory<>(configs, keySerializer,                (Serializer<V>) cloudEventSerializer);
48
    }
49
}


This class will allow us to create a DefaultKafkaProducerFactory that supports either binary or structured encoding. You can also specify the event format depending on your needs.

You can instantiate a ProducerFactory to serialize your data into a structured cloud event using the CloudEventKafkaProducerFactory. You simply need to provide it with your producer configs, the key serializer, and the encoding type, in this case, Encoding.BINARY.

Java
x
 
1
DefaultKafkaProducerFactory<String, CloudEvent> binaryProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(producerConfigs, new StringSerializer(), Encoding.BINARY);


Consumer

We'll also need to create a class to help us construct a consumer that supports Cloud Events:

Java
 
xxxxxxxxxx
1
54
 
1
import io.cloudevents.core.format.EventFormat;
2
import io.cloudevents.core.message.Encoding;
3
import io.cloudevents.core.provider.EventFormatProvider;
4
import io.cloudevents.jackson.JsonFormat;
5
import io.cloudevents.kafka.CloudEventDeserializer;
6
import org.apache.kafka.common.serialization.Deserializer;
7
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
8

          
9
import java.util.HashMap;
10
import java.util.Map;
11

          
12
import static io.cloudevents.kafka.CloudEventSerializer.ENCODING_CONFIG;
13
import static io.cloudevents.kafka.CloudEventSerializer.EVENT_FORMAT_CONFIG;
14

          
15
public class CloudEventKafkaConsumerFactory {
16

          
17
    public static <K, V> DefaultKafkaConsumerFactory<K, V>  consumerFactory(
18
      Map<String, Object> configs,
19
      Deserializer<K> keySerializer, 
20
      Encoding encoding) {
21
        return consumerFactory(configs, 
22
                               keySerializer, 
23
                               encoding, 
24
                               JsonFormat.CONTENT_TYPE);
25
    }
26

          
27
    @SuppressWarnings("unchecked")
28
    public static <K, V> DefaultKafkaConsumerFactory<K, V> consumerFactory(
29
      Map<String, Object> configs, 
30
      Deserializer<K> keySerializer, 
31
      Encoding encoding, 
32
      String eventFormat) {
33
        //If present, the Kafka message header property content-type MUST be set to the media type of an event format.
34
        if(Encoding.STRUCTURED.equals(encoding)) {
35
            EventFormat resolveFormat = EventFormatProvider
36
              .getInstance()
37
              .resolveFormat(eventFormat);
38
            if(resolveFormat == null) {
39
                eventFormat = JsonFormat.CONTENT_TYPE;
40
            }
41
        }
42
        Map<String, Object> ceDeserializerConfigs = new HashMap<>();
43
        ceDeserializerConfigs.put(ENCODING_CONFIG, encoding);
44
        ceDeserializerConfigs.put(EVENT_FORMAT_CONFIG, eventFormat);
45

          
46
        CloudEventDeserializer cloudEventDeserializer = 
47
          new CloudEventDeserializer();
48
        //isKey always false
49
        cloudEventDeserializer.configure(ceDeserializerConfigs, false); 
50
        return new DefaultKafkaConsumerFactory<>(configs,
51
                keySerializer,
52
                (Deserializer<V>) cloudEventDeserializer);
53
    }
54
}


This class will allow us to create a DefaultKafkaConsumerFactory that supports either binary or structured encoding which will properly deserialize your Kafka message to a CloudEvent object.

The process of creating a consumer is similar to that of the producer.

Java
xxxxxxxxxx
1
 
1
ConsumerFactory<String, CloudEvent> binaryConsumerFactory =     CloudEventKafkaConsumerFactory.consumerFactory(
2
  consumerConfigs, 
3
  new StringDeserializer(), 
4
  Encoding.BINARY);

Structured Producer and Consumer

Producer

You can create a ProducerFactor to serialize your data into a structured cloud event using the CloudEventKafkaProducerFactory. You simply need to provide it with your producer configs, the key serializer, and the encoding type, in this case, Encoding.STRUCTURED.

Java
xxxxxxxxxx
1
 
1
DefaultKafkaProducerFactory<String, CloudEvent> structuredProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(
2
  producerConfigs, 
3
  new StringSerializer(), 
4
  Encoding.STRUCTURED);

Consumer

The process of creating a consumer is similar to that of the producer.

Java
xxxxxxxxxx
1
 
1
ConsumerFactory<String, CloudEvent> structuredConsumerFactory = CloudEventKafkaConsumerFactory.consumerFactory(
2
  consumerConfigs, 
3
  new StringDeserializer(), 
4
  Encoding.STRUCTURED);

Example Usage

Java
xxxxxxxxxx
1
10
 
1
DefaultKafkaProducerFactory<String, CloudEvent> binaryProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(
2
  producerProps, new StringSerializer(), Encoding.BINARY); 
3
4
KafkaTemplate<String, CloudEvent> binaryKafkaTemplate = 
5
  new KafkaTemplate<>(binaryProducerFactory); 
6
7
ConsumerFactory<String, CloudEvent> binaryConsumerFactory = CloudEventKafkaConsumerFactory.consumerFactory(
8
  consumerProps, new StringDeserializer(), Encoding.BINARY);
9
10
Consumer<String, CloudEvent> binaryConsumer = binaryConsumerFactory.createConsumer();

With this KafkaTemplate and Consumer, you'll be able to publish and consume CloudEvents with binary encoding. Here is a unit test illustrating this usage:

Java
xxxxxxxxxx
1
17
 
1
    @Test
2
    public void testCloudEventsBinaryMessage() throws Exception {
3
        binaryKafkaTemplate.send(TOPIC, getCloudEvent(person));
4
        ConsumerRecord<String, CloudEvent> consumerRecord = KafkaTestUtils
5
          .getSingleRecord(binaryConsumer, TOPIC);
6
        CloudEvent consumedEvent = consumerRecord.value();
7
        Person payload = objectMapper
8
          .readValue(consumedEvent.getData(),Person.class);
9
10
        assertThat(consumedEvent.getId()).isEqualTo(cloudEventID);
11
        assertThat(consumedEvent.getType()).isEqualTo(cloudEventType);
12
        assertThat(consumedEvent.getSource()).isEqualTo(cloudEventSource);
13
14
        assertThat(payload).isEqualTo(person);
15
16
        KafkaUtils.clearConsumerGroupId();
17
}


To view the source code for these examples, please see my GitHub repository here:

https://github.com/wkennedy/spring-kafka-cloud-events

For additional information regarding these libraries, see:

https://cloudevents.io/

https://github.com/cloudevents

https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md

https://github.com/spring-projects/spring-kafka

kafka Event Binding (linguistics) Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • From APIs to Event-Driven Systems: Modern Java Backend Design
  • How To Get Closer to Consistency in Microservice Architecture
  • Introduction to Apache Kafka With Spring
  • How AI Is Rewriting Full-Stack Java Systems: Practical Patterns with Spring Boot, Kafka and WebSockets

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook