Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using Jakarta EE/MicroProfile to Connect to Apache Kafka: Part 1 — Hello World

DZone's Guide to

Using Jakarta EE/MicroProfile to Connect to Apache Kafka: Part 1 — Hello World

Learn how to securely integrate Apache Kafka with Eclipse MicroProfile.

· Big Data Zone ·
Free Resource

The Architect’s Guide to Big Data Application Performance. Get the Guide.

Apache Kafka is a community-distributed streaming platform that has three key capabilities: publish and subscribe to streams of records, store streams of records in a fault-tolerant durable way, and then process streams as they occur. Apache Kafka has several success cases in the Java world. This post will cover how to benefit from this powerful tool in the Jakarta EE/MicroProfile universe.

Apache Kafka Core Concepts

Kafka is run as a cluster on one or more servers that can span multiple data centers; Kafka cluster stores a stream of records in categories called topics, and each record consists of a key, a value, and a timestamp.

From the documentation, Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.

  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.

  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.

  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. 

APIs Apache Kafka

Install Apache Kafka

The official documentation has a nice getting started with Apache Kafka post that teaches you how to install it with Zookeeper. Briefly, Kafka uses Zookeeper to Cluster membership, topic configuration, and so on.

Download the 2.1.0 release and un-tar it.

tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0


First, start a Zookeeper instance

bin/zookeeper-server-start.sh config/zookeeper.properties


And finally, start with Apache Kafka:

 bin/kafka-server-start.sh config/server.properties


Using Docker


Docker Logo

There is also the possibility of using Docker. As it requires two images, one to Zookeeper and one to Apache Kafka, this tutorial will use docker-compose. Follow these instructions:

version:  '3.2'

services:

  zookeeper:
    image: "confluent/zookeeper"
    networks:
      - microprofile
    ports:
      - 2181:2181

  kafka:
    image: "confluent/kafka"
    networks:
      - microprofile
    ports:
      - 9092:9092
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_PORT=9092
    depends_on:
      - zookeeper

networks:
    microprofile:


Then, run the command:

docker-compose -f docker-compose.yml up -d


To connect as localhost, also define Kafka as the localhost within Linux, append the value below at the /etc/hosts:

127.0.0.1       localhost kafka


Application With Eclipse MicroProfile

The Eclipse MicroProfile is an open-source project whose goal is to define a microservices application platform that is portable across multiple runtimes. Eclipse MicroProfile has several plans to make it easier to create enterprise applications at the Microservices and cloud age. To connect with Apache Kafka, there is the Eclipse MicroProfile Reactive that has the Reactive Message.

The sample code will smoothly create a sender and receiver message from Apache Kafka using CDI 2.0 and Java SE; this project will use the Maven structure. Therefore, the first step of this demo is to define the dependencies. The project needs a MicroProfile configuration implementation, CDI 2.0 implementation, and MicroProfile reactive dependencies.

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <weld-core.version>3.1.0.Final</weld-core.version>
        <slf4j-api.version>1.7.26</slf4j-api.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j-api.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>io.smallrye</groupId>
            <artifactId>smallrye-config</artifactId>
            <version>1.3.5</version>
        </dependency>

        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-provider</artifactId>
            <version>0.0.4</version>
        </dependency>
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-kafka</artifactId>
            <version>0.0.4</version>
            <exclusions>
                <exclusion>
                    <!-- this avoid having the logging working, to exclude it -->
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-streams-operators</artifactId>
            <version>0.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.jboss.weld.se</groupId>
            <artifactId>weld-se-core</artifactId>
            <version>${weld-core.version}</version>
        </dependency>
        <dependency>
            <groupId>javax.enterprise</groupId>
            <artifactId>cdi-api</artifactId>
            <version>2.0.SP1</version>
        </dependency>

    </dependencies>


Then, the next step is to create the configuration files at src/main/resources/META-INF.

The first one is to enable the CDI, the bean.xml file.

<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="all">
</beans>


The second and last one is microprofile-config.properties that has the setup configurations to Eclipse MicroProfile that connects to Apache Kafka, such as implementation to serialize/deserialize the records, which are composed of a key and the respective value.

# Kafka Sink
smallrye.messaging.sink.data.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.sink.data.bootstrap.servers=localhost:9092
smallrye.messaging.sink.data.key.serializer=org.apache.kafka.common.serialization.StringSerializer
smallrye.messaging.sink.data.value.serializer=org.apache.kafka.common.serialization.StringSerializer
smallrye.messaging.sink.data.acks=1
## Kafka Source
smallrye.messaging.source.kafka.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.kafka.bootstrap.servers=localhost:9092
smallrye.messaging.source.kafka.topic=kafka
smallrye.messaging.source.kafka.group.id=demo
smallrye.messaging.source.kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer


Consequently, with all the configuration set up, the last step is to create the class that will send/receive messages from Apache Kafka. The API has fluent information; therefore, there are Incoming and Outgoing annotations. At the Sender class, there is a BlockingQueue, which just sends a message when there is a text in the queue.

@ApplicationScoped
public class Receiver {

    private static final Logger LOGGER = Logger.getLogger(Receiver.class.getName());

    @Incoming("kafka")
    public CompletionStage<Void> consume(KafkaMessage<String, String> message) {
        String payload = message.getPayload();
        String key = message.getKey();
        MessageHeaders headers = message.getHeaders();
        Integer partition = message.getPartition();
        Long timestamp = message.getTimestamp();
        LOGGER.info("received: " + payload + " from topic " + message.getTopic());
        return message.ack();
    }

}


@ApplicationScoped
public class Sender {

    private static final Logger LOGGER = Logger.getLogger(Sender.class.getName());

    private BlockingQueue<String> messages = new LinkedBlockingQueue<>();

    public void add(String message) {
        messages.add(message);
    }

    @Outgoing("data")
    public CompletionStage<KafkaMessage<String, String>> send() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                String message = messages.take();
                LOGGER.info("Sending message to kafka with the message: " + message);
                return KafkaMessage.of("kafka", "key", message);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

}


Finally, the last class is to start the CDI container, and then, send some message to the Kafka. Thus, you can see the result log.

public class App {
    public static void main(String[] args) {
        SeContainer container = SeContainerInitializer.newInstance().initialize();
        Sender sender = container.select(Sender.class).get();
        sender.add("Hello world");
        sender.add("Otávio");
        sender.add("Poliana");
        sender.add("Clement");
    }
}


To conclude, we see the potential of Apache Kafka and why this project became so accessible to Big-Data players. This is a simple example of how secure it is to integrate with Eclipse MicroProfile. A special thanks to Clement Escoffier because he helped me out at this sample code, which can also be found on GitHub.

Learn how taking a DataOps approach will help you speed up processes and increase data quality by providing streamlined analytics pipelines via automation and testing. Learn More.

Topics:
microprofile ,jakarta ee ,java ,reactive ,eclipse ,eclipse microprofile ,tutorial ,apache kafka ,big data

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}