Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Create Serializers With Kafka

DZone's Guide to

How to Create Serializers With Kafka

Kafka provides the ability to publish and subscribe to streams of records. You can create your own custom serializer and deserializer.

· Integration Zone ·
Free Resource

SnapLogic is the leading self-service enterprise-grade integration platform. Download the 2018 GartnerMagic Quadrant for Enterprise iPaaS or play around on the platform, risk free, for 30 days.

Kafka lets us publish and subscribe to streams of records and the records can be of any type (JSON, String, POJO, etc.) Kafka gives users the ability to creates our own serializer and deserializer so that we can transmit different data type using it.

In this article, I will demonstrate how to create a custom serializer and deserializer — but first, let’s understand what serialization is and why we would serialize.

Serialization and Deserialization

Serialization is the process of converting an object into a stream of bytes that are used for transmission. Kafka stores and transmit these bytes of arrays in its queue.

Deserialization, as the name suggests, does the opposite of serialization, in which we convert bytes of arrays into the desired data type. Kafka provides serializers and deserializers for a few data types (String, Long, Double, Integer, Bytes, etc).

All of the pre-build (de)serializers can be seen here.

Now, I hope you understand what serialization is and why we would serialize an object. Let’s begin with its implementation.

Implementation

To create a serializer class, we need to implement the org.apache.kafka.common.serialization.Serializer interface. Similarly, to create a deserializer class, we need to implement the org.apache.kafka.common.serialization.Deserializer interface.

Both serializer and deserializer interfaces consist of three methods:

  1. Configure. This method is called at startup with configuration.
  2. Serialize/deserialize. This method is used for serialization and deserialization.
  3. Close. This method is called when the Kafka session is to be closed.

Serializer Interface

public interface Serializer extends Closeable {
  void configure(Map<String, ?> var1, boolean var2);

  byte[] serialize(String var1, T var2);

  void close();
}

Deserializer Interface

public interface Deserializer extends Closeable {
  void configure(Map<String, ?> var1, boolean var2);

  T deserialize(String var1, byte[] var2);

  void close();
}

Let’s start with an example.

Dependencies I’ve used:

  • Kafka (0.10.1.1).
  • FasterXML Jackson (2.8.6).

user.java:

public class User {

  private String name;
  private int age;

  public User() {
  }

  public User(String name, int age) {
    this.name = name;
    this.age = age;
  }

  public String getName() {
    return this.name;
  }

  public int getAge() {
    return this.age;
  }

  @Override public String toString() {
    return "User(" + name + ", " + age + ")";
  }
}

userserializer.java:

public class UserSerializer implements Serializer {

  @Override public void configure(Map<String, ?> map, boolean b) {

  }

  @Override public byte[] serialize(String arg0, User arg1) {
    byte[] retVal = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      retVal = objectMapper.writeValueAsString(arg1).getBytes();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return retVal;
  }

  @Override public void close() {

  }

}

Userdeserializer.java:

public class UserDeserializer implements Deserializer {

  @Override public void close() {

  }

  @Override public void configure(Map<String, ?> arg0, boolean arg1) {

  }

  @Override
  public User deserialize(String arg0, byte[] arg1) {
    ObjectMapper mapper = new ObjectMapper();
    User user = null;
    try {
      user = mapper.readValue(arg1, User.class);
    } catch (Exception e) {

      e.printStackTrace();
    }
    return user;
  }

}

Now, what’s left is to use this serializer and deserializer.

To use the above serializer, we need to register this property:

props.put("value.serializer", "com.knoldus.serializers.UserSerializer");

Using this property, the producer will be:

try (Producer<String, User> producer = new KafkaProducer<>(props)) {
   producer.send(new ProducerRecord<String, User>("MyTopic", user));
   System.out.println("Message " + user.toString() + " sent !!");
} catch (Exception e) {
   e.printStackTrace();
}

Similarly, for the deserializer, we need to register this property:

props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");

The consumer will be:

try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList(topic));
    while (true) {
        ConsumerRecords<String, User> messages = consumer.poll(100);
        for (ConsumerRecord<String, User> message : messages) {
          System.out.println("Message received " + message.value().toString());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

And that's it! Now you know how to create a custom serializer and deserializer

With SnapLogic’s integration platform you can save millions of dollars, increase integrator productivity by 5X, and reduce integration time to value by 90%. Sign up for our risk-free 30-day trial!

Topics:
kafka ,serialization ,deserialization ,integration ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}