{{announcement.body}}
{{announcement.title}}

Develop Camel-Quarkus Applications Using Red Hat

DZone 's Guide to

Develop Camel-Quarkus Applications Using Red Hat

In this article, we demonstrate how to publish and consume messages from Kafka Topic using camel-quarkus-Kafka.

· Integration Zone ·
Free Resource

Red Hat Open Shift

Red Hat Open Shift offers a consistent hybrid-cloud foundation for building and scaling containerized applications. Open Shift provides an enterprise-grade, container-based platform with no vendor lock-in. Red Hat was one of the first companies to work with Google on Kubernetes, even before launch, and has become the second leading contributor to the Kubernetes upstream project. Open Shift also provides a common development platform no matter what infrastructure we use to host the application.

Quarkus

Quarkus provides a container-first approach to building Java applications. This approach makes it much easier to build microservices-based applications written in Java as well as enabling those applications to invoke functions running on serverless computing frameworks. For this reason, Quarkus applications have small memory footprints and fast start-up times.

For this demonstration, we chose to run our camel Quarkus apps on open shift. Running on Open Shift Container Platform means that our demo applications can run anywhere that Open Shift runs, which includes bare metal, Amazon Web Services (AWS), Azure, Google Cloud, IBM Cloud and more

Red Hat AMQ Streams

Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.

The main components comprise:

Kafka Broker

Messaging broker responsible for delivering records from producing clients to consuming clients.

Apache ZooKeeper is a core dependency for Kafka, providing a cluster coordination service for highly reliable distributed coordination.

AMQ Streams architecture

kafka bridge

Apache Camel

Apache Camel is an open-source framework for message-oriented middleware with a rule-based routing and mediation engine that provides a Java object-based implementation of the Enterprise Integration Patterns using an application programming interface to configure routing and mediation rules

Prerequisites

For this demonstration, you will need the following technologies set up in your development environment:

  1. An Open Shift 4.3+ environment with Cluster Admin access
  2. Open shift CLI (oc)
  3. Apache Maven 3.6.3+
  4. JDK 11 Installed
  5. Kafka cluster installed on openshift 4

In this article, we demonstrate how to publish and consume messages from Kafka Topic using camel-quarkus-Kafka.

Getting Started With Red Hat Quarkus Build

From Red, Hat Quarkus configure your application.

select extensions camel-core,camel-Kafka,camel-timer and generate your application

config your application

The pom file 

Shell
 




x


1
<?xml version="1.0"?>
2
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
3
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
4
  <modelVersion>4.0.0</modelVersion>
5
  <groupId>org.acme</groupId>
6
  <artifactId>camel-quarkus-kafka</artifactId>
7
  <version>1.0.0-SNAPSHOT</version>
8
  <properties>
9
    <compiler-plugin.version>3.8.1</compiler-plugin.version>
10
    <maven.compiler.parameters>true</maven.compiler.parameters>
11
    <maven.compiler.source>11</maven.compiler.source>
12
    <maven.compiler.target>11</maven.compiler.target>
13
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
15
    <quarkus-plugin.version>1.3.4.Final-redhat-00001</quarkus-plugin.version>
16
    <quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
17
    <quarkus.platform.group-id>com.redhat.quarkus</quarkus.platform.group-id>
18
    <quarkus.platform.version>1.3.4.Final-redhat-00001</quarkus.platform.version>
19
    <surefire-plugin.version>2.22.1</surefire-plugin.version>
20
  </properties>
21
  <dependencyManagement>
22
    <dependencies>
23
      <dependency>
24
        <groupId>${quarkus.platform.group-id}</groupId>
25
        <artifactId>${quarkus.platform.artifact-id}</artifactId>
26
        <version>${quarkus.platform.version}</version>
27
        <type>pom</type>
28
        <scope>import</scope>
29
      </dependency>
30
    </dependencies>
31
  </dependencyManagement>
32
  <dependencies>
33
    <dependency>
34
      <groupId>io.quarkus</groupId>
35
      <artifactId>quarkus-resteasy</artifactId>
36
    </dependency>
37
    <dependency>
38
      <groupId>io.quarkus</groupId>
39
      <artifactId>quarkus-junit5</artifactId>
40
      <scope>test</scope>
41
    </dependency>
42
    <dependency>
43
      <groupId>io.rest-assured</groupId>
44
      <artifactId>rest-assured</artifactId>
45
      <scope>test</scope>
46
    </dependency>
47
    <dependency>
48
      <groupId>org.apache.camel.quarkus</groupId>
49
      <artifactId>camel-quarkus-log</artifactId>
50
    </dependency>
51
    <dependency>
52
      <groupId>org.apache.camel.quarkus</groupId>
53
      <artifactId>camel-quarkus-core</artifactId>
54
    </dependency>
55
    <dependency>
56
      <groupId>org.apache.camel.quarkus</groupId>
57
      <artifactId>camel-quarkus-kafka</artifactId>
58
    </dependency>
59
    <dependency>
60
      <groupId>org.apache.camel.quarkus</groupId>
61
      <artifactId>camel-quarkus-timer</artifactId>
62
    </dependency>
63
  </dependencies>
64
  <build>
65
    <plugins>
66
      <plugin>
67
        <groupId>io.quarkus</groupId>
68
        <artifactId>quarkus-maven-plugin</artifactId>
69
        <version>${quarkus-plugin.version}</version>
70
        <executions>
71
          <execution>
72
            <goals>
73
              <goal>build</goal>
74
            </goals>
75
          </execution>
76
        </executions>
77
      </plugin>
78
      <plugin>
79
        <artifactId>maven-compiler-plugin</artifactId>
80
        <version>${compiler-plugin.version}</version>
81
      </plugin>
82
      <plugin>
83
        <artifactId>maven-surefire-plugin</artifactId>
84
        <version>${surefire-plugin.version}</version>
85
        <configuration>
86
          <systemProperties>
87
            <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
88
          </systemProperties>
89
        </configuration>
90
      </plugin>
91
    </plugins>
92
  </build>
93
  <profiles>
94
    <profile>
95
      <id>native</id>
96
      <activation>
97
        <property>
98
          <name>native</name>
99
        </property>
100
      </activation>
101
      <build>
102
        <plugins>
103
          <plugin>
104
            <artifactId>maven-failsafe-plugin</artifactId>
105
            <version>${surefire-plugin.version}</version>
106
            <executions>
107
              <execution>
108
                <goals>
109
                  <goal>integration-test</goal>
110
                  <goal>verify</goal>
111
                </goals>
112
                <configuration>
113
                  <systemProperties>
114
                    <native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
115
                  </systemProperties>
116
                </configuration>
117
              </execution>
118
            </executions>
119
          </plugin>
120
        </plugins>
121
      </build>
122
      <properties>
123
        <quarkus.package.type>native</quarkus.package.type>
124
      </properties>
125
    </profile>
126
  </profiles>
127
</project>
128
 
          



Single Node Kafka Cluster Set up in Openshift 4

Shell
 




xxxxxxxxxx
1
16


1
pods
2
my-cluster-entity-operator-cb446ff8f-5ncfj   3/3     Running     1          5d22h
3
my-cluster-kafka-0                           2/2     Running     0          19h
4
my-cluster-zookeeper-0                       1/1     Running     0          5d22h
5
strimzi-cluster-operator-86bb9f6ccd-m5nsk    1/1     Running     0          5d22h
6
 
          
7
services
8
my-cluster-kafka-0                    ClusterIP   xxx.yyy.149   <none>        9094/TCP                     19h
9
my-cluster-kafka-bootstrap            ClusterIP   xxx.yy.216.1     <none>        9091/TCP                     5d22h
10
my-cluster-kafka-brokers              ClusterIP   None             <none>        9091/TCP                     5d22h
11
my-cluster-kafka-external-bootstrap   ClusterIP   xxx.yy.42.219    <none>        9094/TCP                     19h
12
my-cluster-zookeeper-client           ClusterIP   xxx.yy.160.224   <none>        2181/TCP                     5d22h
13
my-cluster-zookeeper-nodes            ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   5d22h
14
 
          
15
route
16
my-cluster-kafka-0           my-cluster-kafka-0-kafka.apps.kakarlatest2.lab.xxx.yyy.com                  my-cluster-kafka-0                    9094   passthrough   None
17
my-cluster-kafka-bootstrap   my-cluster-kafka-bootstrap-kafka.apps.kakarlatest2.lab.xxx.yyy.com          my-cluster-kafka-external-bootstrap   9094   passthrough   None
18
 
          
19
 
          



Follow below steps to extract openshift Kafka cluster certs, as the app needs to connect Kafka cluster set up in openshift 4

To Run the example as an external client, first extract and import the openshift Kafka cluster cert

Shell
 




xxxxxxxxxx
1


 
1
oc extract secret/my-cluster-cluster-ca-cert --keys=ca.crt --to=- > src/main/resources/ca.crt
2
keytool -keystore src/main/resources/client.truststore.jks -alias CARoot -import -file src/main/resources/ca.crt
3
 keytool -import -trustcacerts -alias root -file src/main/resources/ca.crt -keystore src/main/resources/client.truststore.jks -storepass  xxxxxx -noprompt
4
 
          



Camel Kafka Route Builder Class  

Write camel Kafka Producer Route to publish messages to Kafka's topic.

Java
 




xxxxxxxxxx
1
18


1
from("timer://foo?period={{period}}").setBody(constant("post message to kafka cluster topic"))
2
                .to("kafka:{{kafka.topic}}?brokers={{kafka.external.bootstrap.url}}"
3
                        + "&keySerializerClass={{kafka.key.serializer}}&serializerClass={{kafka.value.serializer}}"
4
                        + "&securityProtocol=SSL&sslTruststoreLocation={{truststore}}"
5
                        + "&sslTruststorePassword={{truststore.password}}")
6
                .process(new Processor() {
7
                    @Override
8
                    public void process(Exchange exchange) throws Exception {
9
                        List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) 
10
                          exchange.getIn()                       .getHeader(KafkaConstants.KAFKA_RECORDMETA);
11
                        for (RecordMetadata rd : recordMetaData1) {
12
                            LOG.info("producer partition is:" +            rd.partition());
13
                            LOG.info("producer partition message is:" + rd.toString());
14
                        }
15
                    }
16
 
          
17
                });
18
 
          



Place holders used in Camel Route is defined in Application.properties file

Java
 




x


 
1
kafka.topic=my-topic
2
kafka.external.bootstrap.url=my-cluster-kafka-bootstrap-kafka.apps.kakarlatest2.lab.xxx.yyy.com:443
3
period=10000&repeatCount=5
4
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
5
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
6
truststore=/home/kkakarla/Downloads/camel-quarkus-kafka/src/main/resources/client.truststore.jks
7
truststore.password=xxxxxx



Tree structure of camel-quarkus-Kafka

Java
 




xxxxxxxxxx
1
33


1
[kkakarla@kkakarla camel-quarkus-kafka]$ tree
2
.
3
├── mvnw
4
├── mvnw.cmd
5
├── pom.xml
6
├── README.md
7
├── src
8
   ├── main
9
      ├── docker
10
         ├── Dockerfile.jvm
11
         └── Dockerfile.native
12
      ├── java
13
         └── org
14
             └── acme
15
                 ├── ExampleResource.java
16
                 └── KafkaRoute.java
17
      └── resources
18
          ├── application.properties
19
          ├── ca.crt
20
          ├── client.truststore.jks
21
          └── META-INF
22
              └── resources
23
                  └── index.html
24
   └── test
25
       └── java
26
           └── org
27
               └── acme
28
                   ├── ExampleResourceTest.java
29
                   └── NativeExampleResourceIT.java
30
└── target
31
 
          
32
14 directories, 14 files
33
 
          



Run the application in dev mode

Java
 




xxxxxxxxxx
1
47


 
1
[kkakarla@kkakarla camel-quarkus-kafka]$ ./mvnw quarkus:dev
2
[INFO] Scanning for projects...
3
[INFO] 
4
[INFO] --------------------< org.acme:camel-quarkus-kafka >--------------------
5
[INFO] Building camel-quarkus-kafka 1.0.0-SNAPSHOT
6
[INFO] --------------------------------[ jar ]---------------------------------
7
[INFO] 
8
[INFO] --- quarkus-maven-plugin:1.3.4.Final-redhat-00001:dev (default-cli) @ camel-quarkus-kafka ---
9
[INFO] Changes detected - recompiling the module!
10
[INFO] Compiling 2 source files to /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/target/classes
11
[INFO] /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/src/main/java/org/acme/KafkaRoute.java: /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/src/main/java/org/acme/KafkaRoute.java uses unchecked or unsafe operations.
12
[INFO] /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/src/main/java/org/acme/KafkaRoute.java: Recompile with -Xlint:unchecked for details.
13
Listening for transport dt_socket at address: 5005
14
__  ____  __  _____   ___  __ ____  ______ 
15
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
16
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
17
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
18
2020-07-10 11:39:13,455 INFO  [org.apa.cam.sup.LRUCacheFactory] (main) Detected and using LURCacheFactory: camel-caffeine-lrucache
19
2020-07-10 11:39:13,685 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.1.0 (CamelContext: camel-1) is starting
20
-----------------------------------------------------------------------------
21
-----------------------------------------------------------------------------
22
-----------------------------------------------------------------------------
23
 
          
24
sasl.mechanism = GSSAPI
25
    security.protocol = SSL
26
    security.providers = null
27
    send.buffer.bytes = 131072
28
    ssl.cipher.suites = null
29
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
30
    ssl.endpoint.identification.algorithm = https
31
    ssl.key.password = null
32
    ssl.keymanager.algorithm = SunX509
33
    ssl.keystore.location = null
34
    ssl.keystore.password = null
35
    ssl.keystore.type = JKS
36
    ssl.protocol = TLS
37
    ssl.provider = null
38
    ssl.secure.random.implementation = null
39
    ssl.trustmanager.algorithm = PKIX
40
    ssl.truststore.location = /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka/src/main/resources/client.truststore.jks
41
    ssl.truststore.password = [hidden]
42
    ssl.truststore.type = JKS
43
    transaction.timeout.ms = 60000
44
    transactional.id = null
45
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
46
 
          
47
2020-07-10 11:40:32,475 WARN  [org.apa.kaf.cli.pro.KafkaProducer] (Timer-0) [Producer clientId=producer-2] delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms. Setting it to 305000.
48
2020-07-10 11:40:32,946 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka version: 2.4.0.redhat-00005
49
2020-07-10 11:40:32,946 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka commitId: bc61f1c575849a1e
50
2020-07-10 11:40:32,946 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka startTimeMs: 1594361432945
51
2020-07-10 11:40:32,950 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Route: route2 started and consuming from: timer://foo
52
2020-07-10 11:40:32,953 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Total 1 routes, of which 1 are started
53
2020-07-10 11:40:32,953 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Apache Camel 3.1.0 (CamelContext: camel-2) started in 0.488 seconds
54
2020-07-10 11:40:32,954 INFO  [io.quarkus] (Timer-0) camel-quarkus-kafka 1.0.0-SNAPSHOT (powered by Quarkus 1.3.0.Final) started in 81.008s. Listening on: http://0.0.0.0:8080
55
2020-07-10 11:40:32,955 INFO  [io.quarkus] (Timer-0) Profile dev activated. Live Coding activated.
56
2020-07-10 11:40:32,955 INFO  [io.quarkus] (Timer-0) Installed features: [camel-core, camel-file, camel-kafka, camel-log, camel-support-common, camel-timer, cdi, kubernetes, resteasy]
57
2020-07-10 11:40:32,956 INFO  [io.qua.dev] (Timer-0) Hot replace total time: 0.952s 
58
2020-07-10 11:40:35,481 INFO  [org.apa.kaf.cli.Metadata] (kafka-producer-network-thread | producer-2) [Producer clientId=producer-2] Cluster ID: P_Ka2jxLRd6xlrVD2jHFjg
59
2020-07-10 11:40:38,114 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #2 - KafkaProducer[my-topic]) producer partition is:0
60
2020-07-10 11:40:38,115 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #2 - KafkaProducer[my-topic]) producer partition message is:my-topic-0@20
61
2020-07-10 11:40:44,345 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #4 - KafkaProducer[my-topic]) producer partition is:0
62
2020-07-10 11:40:44,346 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #4 - KafkaProducer[my-topic]) producer partition message is:my-topic-0@21
63
2020-07-10 11:40:54,350 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #6 - KafkaProducer[my-topic]) producer partition is:0
64
2020-07-10 11:40:54,351 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #6 - KafkaProducer[my-topic]) producer partition message is:my-topic-0@22
65
2020-07-10 11:41:04,342 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #8 - KafkaProducer[my-topic]) producer partition is:0
66
2020-07-10 11:41:04,343 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #8 - KafkaProducer[my-topic]) producer partition message is:my-topic-0@23
67
2020-07-10 11:41:14,567 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #10 - KafkaProducer[my-topic]) producer partition is:0
68
2020-07-10 11:41:14,567 INFO  [org.acm.tim.KafkaRoute] (Camel (camel-2) thread #10 - KafkaProducer[my-topic]) producer partition message is:my-topic-0@24
69
 
          
70
 
          



Published 5 messages to partition 0

Write camel Kafka Consumer Route to consume messages from Kafka topic.

Java
 




xxxxxxxxxx
1


1
from("timer://foo?period={{period}}")
2
                .from("kafka:{{kafka.topic}}?brokers={{kafka.external.bootstrap.url}}&securityProtocol=SSL"
3
                        + "&sslTruststoreLocation={{truststore}}&"
4
                        + "sslTruststorePassword={{truststore.password}}&groupId=cameltest")
5
                .log("Message received from Kafka : ${body}")
6
                .log("    on the topic ${headers[kafka.TOPIC]}")
7
                .log("    on the partition ${headers[kafka.PARTITION]}")
8
                .log("    with the offset ${headers[kafka.OFFSET]}")
9
                .log("    with the key ${headers[kafka.KEY]}");



Run the consumer application in dev mode

Java
 




x


 
1
    ssl.provider = null
2
    ssl.secure.random.implementation = null
3
    ssl.trustmanager.algorithm = PKIX
4
    ssl.truststore.location = /home/kkakarla/development/git/ramu-dev/quarkus/camel-quarkus-kafka-consumer/src/main/resources/client.truststore.jks
5
    ssl.truststore.password = [hidden]
6
    ssl.truststore.type = JKS
7
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
8
 
          
9
2020-07-10 11:52:07,840 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Timer-0) The configuration 'specific.avro.reader' was supplied but isn't a known config.
10
2020-07-10 11:52:07,841 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka version: 2.4.0.redhat-00005
11
2020-07-10 11:52:07,842 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka commitId: bc61f1c575849a1e
12
2020-07-10 11:52:07,842 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Timer-0) Kafka startTimeMs: 1594362127841
13
2020-07-10 11:52:07,843 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Route: route2 started and consuming from: kafka://my-topic
14
2020-07-10 11:52:07,844 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Subscribing my-topic-Thread 0 to topic my-topic
15
2020-07-10 11:52:07,845 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Total 1 routes, of which 1 are started
16
2020-07-10 11:52:07,845 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Subscribed to topic(s): my-topic
17
2020-07-10 11:52:07,845 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Timer-0) Apache Camel 3.1.0 (CamelContext: camel-2) started in 0.454 seconds
18
2020-07-10 11:52:07,846 INFO  [io.quarkus] (Timer-0) camel-quarkus-kafka-consumer 1.0.0-SNAPSHOT (powered by Quarkus 1.3.0.Final) started in 74.773s. Listening on: http://0.0.0.0:8080
19
2020-07-10 11:52:07,846 INFO  [io.quarkus] (Timer-0) Profile dev activated. Live Coding activated.
20
2020-07-10 11:52:07,846 INFO  [io.quarkus] (Timer-0) Installed features: [camel-core, camel-file, camel-kafka, camel-log, camel-support-common, camel-timer, cdi, kubernetes, resteasy]
21
2020-07-10 11:52:07,847 INFO  [io.qua.dev] (Timer-0) Hot replace total time: 0.732s 
22
2020-07-10 11:52:10,540 INFO  [org.apa.kaf.cli.Metadata] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Cluster ID: P_Ka2jxLRd6xlrVD2jHFjg
23
2020-07-10 11:52:10,630 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Discovered group coordinator my-cluster-kafka-0-kafka.apps.kakarlatest2.lab.xxx.yyy.com:443 (id: 2147483647 rack: null)
24
2020-07-10 11:52:11,153 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] (Re-)joining group
25
2020-07-10 11:52:13,918 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] (Re-)joining group
26
2020-07-10 11:52:17,500 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Finished assignment for group at generation 17: {consumer-cameltest-2-386c3314-a3ff-44b3-9612-6378ffc4d64a=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@3d00beea}
27
2020-07-10 11:52:17,916 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Successfully joined group with generation 17
28
2020-07-10 11:52:17,919 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Adding newly assigned partitions: my-topic-0
29
2020-07-10 11:52:18,526 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) [Consumer clientId=consumer-cameltest-2, groupId=cameltest] Setting offset for partition my-topic-0 to the committed offset FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=my-cluster-kafka-0-kafka.apps.kakarlatest2.lab.xxx.yyy.com:443 (id: 0 rack: null), epoch=0}}
30
2020-07-10 11:52:20,929 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
31
2020-07-10 11:52:20,930 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the topic my-topic
32
2020-07-10 11:52:20,930 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the partition 0
33
2020-07-10 11:52:20,930 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the offset 20
34
2020-07-10 11:52:20,930 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the key 
35
2020-07-10 11:52:20,931 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
36
2020-07-10 11:52:20,931 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the topic my-topic
37
2020-07-10 11:52:20,931 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the partition 0
38
2020-07-10 11:52:20,932 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the offset 21
39
2020-07-10 11:52:20,932 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the key 
40
2020-07-10 11:52:20,932 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
41
2020-07-10 11:52:20,932 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the topic my-topic
42
2020-07-10 11:52:20,932 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the partition 0
43
2020-07-10 11:52:20,932 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the offset 22
44
2020-07-10 11:52:20,932 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the key 
45
2020-07-10 11:52:20,932 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
46
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the topic my-topic
47
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the partition 0
48
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the offset 23
49
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the key 
50
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic]) Message received from Kafka : post message to kafka cluster topic
51
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the topic my-topic
52
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     on the partition 0
53
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the offset 24
54
2020-07-10 11:52:20,933 INFO  [route2] (Camel (camel-2) thread #1 - KafkaConsumer[my-topic])     with the key 
55
 
          



Deploy Application to Openshift 4.x Cluster

1. Change to the directory that contains your Quarkus Maven project.

2. To add the Open Shift extension to an existing project, enter the following command:

Java
 




xxxxxxxxxx
1


1
./mvnw quarkus:add-extension -Dextensions="openshift"



3. When you add the OpenShift extension, the script adds the following dependency to the pom.xml file:

Java
 




xxxxxxxxxx
1


1
<dependency>
2
     <groupId>io.quarkus</groupId>
3
     <artifactId>quarkus-openshift</artifactId>
4
   </dependency>



4. If you are using an untrusted certificate, add the following line to the src/main/resources/application.properties file:

Java
 




xxxxxxxxxx
1


 
1
quarkus.s2i.base-jvm-image=registry.access.redhat.com/openjdk/openjdk-11-rhel7



5. Log in to the Open Shift CLI (oc) and create a new Open Shift project

6. To deploy your project to Open Shift, enter the following command in your Quarkus Maven project directory

Java
 




xxxxxxxxxx
1


 
1
./mvnw clean package -Dquarkus.kubernetes.deploy=true



Check the pods

Java
 




xxxxxxxxxx
1


1
NAME                                         READY   STATUS      RESTARTS   AGE
2
camel-quarkus-kafka-1-build                  0/1     Completed   0          4m21s
3
camel-quarkus-kafka-1-deploy                 0/1     Completed   0          56s
4
camel-quarkus-kafka-1-z9r65                  1/1     Running     0          82s
5
 
          



Check the camel-quarkus-Kafka-1-z9r65 logs  using command oc logs -f camel-quarkus-kafka-1-z9r65

Here Kafka cluster service bootstrap URL  should be used in camel route if app is deployed in same openshift where Kafka cluster is set up

Hope this article helps others who want to try camel-quarkus-Kafka 

Topics:
camel, integration, kafka, openshift, quarkus, tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}