Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Create Serializers With Kafka

DZone's Guide to

How to Create Serializers With Kafka

Kafka provides the ability to publish and subscribe to streams of records. You can create your own custom serializer and deserializer.

· Integration Zone
Free Resource

Learn how API management supports better integration in Achieving Enterprise Agility with Microservices and API Management, brought to you in partnership with 3scale

Kafka lets us publish and subscribe to streams of records and the records can be of any type (JSON, String, POJO, etc.) Kafka gives users the ability to creates our own serializer and deserializer so that we can transmit different data type using it.

In this article, I will demonstrate how to create a custom serializer and deserializer — but first, let’s understand what serialization is and why we would serialize.

Serialization and Deserialization

Serialization is the process of converting an object into a stream of bytes that are used for transmission. Kafka stores and transmit these bytes of arrays in its queue.

Deserialization, as the name suggests, does the opposite of serialization, in which we convert bytes of arrays into the desired data type. Kafka provides serializers and deserializers for a few data types (String, Long, Double, Integer, Bytes, etc).

All of the pre-build (de)serializers can be seen here.

Now, I hope you understand what serialization is and why we would serialize an object. Let’s begin with its implementation.

Implementation

To create a serializer class, we need to implement the org.apache.kafka.common.serialization.Serializer interface. Similarly, to create a deserializer class, we need to implement the org.apache.kafka.common.serialization.Deserializer interface.

Both serializer and deserializer interfaces consist of three methods:

  1. Configure. This method is called at startup with configuration.
  2. Serialize/deserialize. This method is used for serialization and deserialization.
  3. Close. This method is called when the Kafka session is to be closed.

Serializer Interface

public interface Serializer extends Closeable {
  void configure(Map<String, ?> var1, boolean var2);

  byte[] serialize(String var1, T var2);

  void close();
}

Deserializer Interface

public interface Deserializer extends Closeable {
  void configure(Map<String, ?> var1, boolean var2);

  T deserialize(String var1, byte[] var2);

  void close();
}

Let’s start with an example.

Dependencies I’ve used:

  • Kafka (0.10.1.1).
  • FasterXML Jackson (2.8.6).

user.java:

public class User {

  private String name;
  private int age;

  public User() {
  }

  public User(String name, int age) {
    this.name = name;
    this.age = age;
  }

  public String getName() {
    return this.name;
  }

  public int getAge() {
    return this.age;
  }

  @Override public String toString() {
    return "User(" + name + ", " + age + ")";
  }
}

userserializer.java:

public class UserSerializer implements Serializer {

  @Override public void configure(Map<String, ?> map, boolean b) {

  }

  @Override public byte[] serialize(String arg0, User arg1) {
    byte[] retVal = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      retVal = objectMapper.writeValueAsString(arg1).getBytes();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return retVal;
  }

  @Override public void close() {

  }

}

Userdeserializer.java:

public class UserDeserializer implements Deserializer {

  @Override public void close() {

  }

  @Override public void configure(Map<String, ?> arg0, boolean arg1) {

  }

  @Override
  public User deserialize(String arg0, byte[] arg1) {
    ObjectMapper mapper = new ObjectMapper();
    User user = null;
    try {
      user = mapper.readValue(arg1, User.class);
    } catch (Exception e) {

      e.printStackTrace();
    }
    return user;
  }

}

Now, what’s left is to use this serializer and deserializer.

To use the above serializer, we need to register this property:

props.put("value.serializer", "com.knoldus.serializers.UserSerializer");

Using this property, the producer will be:

try (Producer<String, User> producer = new KafkaProducer<>(props)) {
   producer.send(new ProducerRecord<String, User>("MyTopic", user));
   System.out.println("Message " + user.toString() + " sent !!");
} catch (Exception e) {
   e.printStackTrace();
}

Similarly, for the deserializer, we need to register this property:

props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");

The consumer will be:

try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList(topic));
    while (true) {
        ConsumerRecords<String, User> messages = consumer.poll(100);
        for (ConsumerRecord<String, User> message : messages) {
          System.out.println("Message received " + message.value().toString());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

And that's it! Now you know how to create a custom serializer and deserializer

Unleash the power of your APIs with future-proof API management - Create your account and start your free trial today, brought to you in partnership with 3scale.

Topics:
kafka ,serialization ,deserialization ,integration ,tutorial

Published at DZone with permission of Prabhat Kashyap, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}