Using Jakarta EE/MicroProfile to Connect to Apache Kafka: Part Two
No existentialism here — get grounded with Kafka and Jakarta EE.
Join the DZone community and get the full member experience.
Join For Free
CDI extensions are mechanisms by which we can implement additional functionalities on top of the CDI container. These extensions allow Jakarta EE/MicroProfile to continue growing their ecosystem with more frameworks and integrations. This post focuses on a second framework to connect with Apache Kafka.
This Post will talk directly about another option to integrate Jakarta EE/MicroProfile with Apache Kafka. The first post in this series can be found here.
The first step in this tutorial is to install Apache Kafka. To make the process easier, this post will use docker-compose. So, if you don't have docker-compose, follow these instructions:
- Install Docker-compose.
Create a docker-compose.yml and set it with the configuration below:
version: '3.2'
services:
zookeeper:
image: "confluent/zookeeper"
networks:
- jakarta_nosql
ports:
- 2181:2181
kafka:
image: "confluent/kafka"
networks:
- jakarta_nosql
ports:
- 9092:9092
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_PORT=9092
depends_on:
- zookeeper
networks:
jakarta_nosql:
Done. Apache Kafka is installed. The next step is to create a plain Java SE project with Maven, so we'll set up the dependencies in the pom.xml file. This project will have CDI 2.0 implementation that supports Java SE, Eclipse Yasson, and the integration library itself.
A minimal configuration in the project is a class using the @KafkaConfig annotation. You can use either the host and port hardcoded or the values of the parameters to be replaced from the properties.
import org.aerogear.kafka.cdi.annotation.KafkaConfig;
//@KafkaConfig(bootstrapServers = "localhost:9092")
@KafkaConfig(bootstrapServers = "#{kafka_host}:#{kafka_port}")
public class Config {
static final String TOPIC_ENTITY = "movie";
static final String GROUP_ID_ENTITY = "movie_group";
static final String TOPIC_TEXT = "text";
static final String GROUP_ID_TEXT = "text_group";
static final String GROUP_ID_TEXT_2 = "text_group_2";
}
If you want to use the parameters option, don't forget to add these arguments when you run your Java application; otherwise, the project won't be enabled to connect in Kafka.
At this first hello world, we'll send a text to a log it. The @Producer
annotation is used to configure and inject an instance of the SimpleKafkaProducer class:
@ApplicationScoped
public class TextListenerService {
@Consumer(topics = Config.TOPIC_TEXT, groupId = Config.GROUP_ID_TEXT)
public void receiver(final String message) {
System.out.println("That's what I got from the text message 1: " + message);
}
}
Then, let's create a consumer where it will log the received message from Kafka:
import org.aerogear.kafka.SimpleKafkaProducer;
import org.aerogear.kafka.cdi.annotation.Producer;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class TextPublisherService {
@Producer
private SimpleKafkaProducer<Integer, String> producer;
public <T> void sendMessage(String message) {
producer.send(Config.TOPIC_TEXT, message);
}
}
Done, let's see it working.
import javax.enterprise.inject.se.SeContainer;
import javax.enterprise.inject.se.SeContainerInitializer;
import java.util.UUID;
public class TextApp {
public static void main(String[] args) {
SeContainer container = SeContainerInitializer.newInstance().initialize();
final TextPublisherService service = container.select(TextPublisherService.class).get();
for (int index = 0; index < 10; index++) {
service.sendMessage(UUID.randomUUID().toString());
}
}
private TextApp() {
}
}
The output:
That's what I got from the text message 1: 204cd899-76f0-48a2-bfa0-2d6d6b1dcf4b
That's what I got from the text message 1: 9292258c-4391-43b9-89a6-a91a713f4294
That's what I got from the text message 1: 3593f681-0d9e-4e76-9289-c28783b551f4
That's what I got from the text message 1: 86208c2a-b4b5-4467-b16f-fef8934ddb73
That's what I got from the text message 1: 2f145b0a-3e40-4d78-b920-acf5226ea793
That's what I got from the text message 1: 4062bdae-384e-4ea7-835a-a9410bd01c06
That's what I got from the text message 1: de74a890-b537-4ab1-b80b-ffa374058f0d
That's what I got from the text message 1: 9452cd38-802c-4cda-a916-acf0d690946a
That's what I got from the text message 1: 2b34fa98-9cce-4216-b510-2f6727c35a5f
That's what I got from the text message 1: 354e6986-086c-481b-bb7c-b91251896fc1
If you wish, you can add a second consumer; to make it, it's essential to have a different group id:
import org.aerogear.kafka.cdi.annotation.Consumer;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class TextListenerService2 {
@Consumer(topics = Config.TOPIC_TEXT, groupId = Config.GROUP_ID_TEXT_2)
public void receiver(final String message) {
System.out.println("That's what I got from the text message2: " + message);
}
}
The output:
That's what I got from the text message 1: dbe1a0f1-f644-4df1-b7fc-292ed09626c2
That's what I got from the text message 1: f7240d1e-2373-4170-a849-ab9710f59fb3
That's what I got from the text message 1: 7d4d8ebd-f64e-43c5-9b77-df48bfe9dd1e
That's what I got from the text message 1: 955a22c0-1831-4acc-89aa-17abb74e1f43
That's what I got from the text message 1: fc66edde-887f-4d34-b686-0491a03b358d
That's what I got from the text message 1: b086f93b-0186-46e7-90cb-a2b35f7116af
That's what I got from the text message 1: c91e04bf-6265-40f2-b54e-f3107c38e638
That's what I got from the text message 1: 6d2f655c-c546-4f4f-9060-63b108d631ba
That's what I got from the text message 1: 32388bec-a81a-4220-af9b-513bba8f7e96
That's what I got from the text message 1: a5b6d30d-a223-4b5c-9b24-4a48b80d9a01
That's what I got from the text message2: 204cd899-76f0-48a2-bfa0-2d6d6b1dcf4b
That's what I got from the text message2: 9292258c-4391-43b9-89a6-a91a713f4294
That's what I got from the text message2: 3593f681-0d9e-4e76-9289-c28783b551f4
That's what I got from the text message2: 86208c2a-b4b5-4467-b16f-fef8934ddb73
That's what I got from the text message2: 2f145b0a-3e40-4d78-b920-acf5226ea793
That's what I got from the text message2: 4062bdae-384e-4ea7-835a-a9410bd01c06
That's what I got from the text message2: de74a890-b537-4ab1-b80b-ffa374058f0d
That's what I got from the text message2: 9452cd38-802c-4cda-a916-acf0d690946a
That's what I got from the text message2: 2b34fa98-9cce-4216-b510-2f6727c35a5f
That's what I got from the text message2: 354e6986-086c-481b-bb7c-b91251896fc1
That's what I got from the text message2: dbe1a0f1-f644-4df1-b7fc-292ed09626c2
That's what I got from the text message2: f7240d1e-2373-4170-a849-ab9710f59fb3
That's what I got from the text message2: 7d4d8ebd-f64e-43c5-9b77-df48bfe9dd1e
That's what I got from the text message2: 955a22c0-1831-4acc-89aa-17abb74e1f43
That's what I got from the text message2: fc66edde-887f-4d34-b686-0491a03b358d
That's what I got from the text message2: b086f93b-0186-46e7-90cb-a2b35f7116af
That's what I got from the text message2: c91e04bf-6265-40f2-b54e-f3107c38e638
That's what I got from the text message2: 6d2f655c-c546-4f4f-9060-63b108d631ba
That's what I got from the text message2: 32388bec-a81a-4220-af9b-513bba8f7e96
That's what I got from the text message2: a5b6d30d-a223-4b5c-9b24-4a48b80d9a01
Transfer an Entity
Apache Kafka uses a binary message format and comes with a handful of handy Serializers and Deserializers. The Kafka CDI extension for the JsonObject
is useful to transfer entities. The next step on this tutorial is to transport entities using JsonObject
.
The first step in this section is to create a utilitarian class where converts entity to/from JsonObject.
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonWriter;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import javax.json.bind.JsonbConfig;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Objects;
import javax.json.bind.config.PropertyVisibilityStrategy;
public final class JsonUtils {
private static final Jsonb JSONB;
static {
JsonbConfig config = new JsonbConfig().withPropertyVisibilityStrategy(new PrivateVisibilityStrategy());
JSONB = JsonbBuilder.newBuilder().withConfig(config).build();
}
private JsonUtils() {
}
public static <T> JsonObject toJson(T entity) {
Objects.requireNonNull(entity, "entity is required");
ByteArrayOutputStream output = new ByteArrayOutputStream();
JSONB.toJson(entity, output);
InputStream stream = new ByteArrayInputStream(output.toByteArray());
JsonReader reader = Json.createReader(stream);
return reader.readObject();
}
public static <T> T fromJson(JsonObject jsonObject, Class<T> type) {
Objects.requireNonNull(jsonObject, "jsonObject is required");
Objects.requireNonNull(type, "type is required");
ByteArrayOutputStream output = new ByteArrayOutputStream();
final JsonWriter writer = Json.createWriter(output);
writer.write(jsonObject);
InputStream stream = new ByteArrayInputStream(output.toByteArray());
return JSONB.fromJson(stream, type);
}
static class PrivateVisibilityStrategy implements PropertyVisibilityStrategy {
@Override
public boolean isVisible(Field field) {
return true;
}
@Override
public boolean isVisible(Method method) {
return true;
}
}
}
It is using a Movie POJO that has two attributes: a title and its genre.
public class Movie {
private String title;
private String genre;
@Deprecated
public Movie() {
}
public Movie(String title, String genre) {
this.title = title;
this.genre = genre;
}
@Override
public String toString() {
return "Movie{" +
"title='" + title + '\'' +
", genre='" + genre + '\'' +
'}';
}
}
The sender/receiver classes are similar to the text; however, instead of String
, it will operate with JsonObject
:
import org.aerogear.kafka.SimpleKafkaProducer;
import org.aerogear.kafka.cdi.annotation.Producer;
import javax.enterprise.context.ApplicationScoped;
import javax.json.JsonObject;
@ApplicationScoped
public class MoviePublisherService {
@Producer
private SimpleKafkaProducer<Integer, JsonObject> producer;
public <T> void sendMessage(T entity) {
producer.send(Config.TOPIC_ENTITY, JsonUtils.toJson(entity));
}
}
import org.aerogear.kafka.cdi.annotation.Consumer;
import javax.enterprise.context.ApplicationScoped;
import javax.json.JsonObject;
@ApplicationScoped
public class MovieListenerService {
@Consumer(topics = Config.TOPIC_ENTITY, groupId = Config.GROUP_ID_ENTITY)
public void receiver(final JsonObject message) {
final Movie entity = JsonUtils.fromJson(message, Movie.class);
System.out.println("That's what I got from the entity: " + entity);
}
}
The last step is to execute and check the output:
import javax.enterprise.inject.se.SeContainer;
import javax.enterprise.inject.se.SeContainerInitializer;
public class MovieApp {
public static void main(String[] args) {
SeContainer container = SeContainerInitializer.newInstance().initialize();
final MoviePublisherService service = container.select(MoviePublisherService.class).get();
service.sendMessage(new Movie("The Big Flame", "Drama"));
service.sendMessage(new Movie("Beautiful People", "Comedy"));
service.sendMessage(new Movie("Barking Dogs Never Bite (Flandersui gae)", "Comedy|Horror"));
service.sendMessage(new Movie("Snow Creature, The", "Horror"));
service.sendMessage(new Movie("Osama", "Drama"));
}
private MovieApp() {
}
}
The output:
That's what I got from the entity: Movie{title='The Big Flame', genre='Drama'}
That's what I got from the entity: Movie{title='Beautiful People', genre='Comedy'}
That's what I got from the entity: Movie{title='Barking Dogs Never Bite (Flandersui gae)', genre='Comedy|Horror'}
That's what I got from the entity: Movie{title='Snow Creature, The', genre='Horror'}
That's what I got from the entity: Movie{title='Osama', genre='Drama'}
In this tutorial, we could see one more option to connect between Jakarta EE/Microprofile and Kafka through a CDI extension. You can get the code result here.
Opinions expressed by DZone contributors are their own.
Comments