Mule 4 Integration With Kafka
Join the DZone community and get the full member experience.
Join For FreeHello Muleys,
Nowadays, there is a lot of buzz around the Apache Kafka messaging platform. Enterprises are showing their interest in moving towards a more reliable, stable, and open-source solution for their messaging needs. This article talks about integrating Apache Kafka with a sample Mule4 application.
So let's start with Kafka set up first!
- Download the latest binaries from Apache Kafka and extract it to local drive C:\
- Download the latest binaries for Apache ZooKeeper and extract it to local drive C:\
- Configure Environment variables:
- JAVA_HOME: point this env variable to a suitable, installed version of JAVA e.g, "C:\Program Files\Java\jre1.8.0_212". Use command prompt to identify the installed Java version using
java -version
, as shown below:
Update the Path variable with %JAVA_HOME%\bin. - ZOOKEEPER_HOME: point this env variable to C:\zookeeper-3.5.5 and update the PATH variable with %ZOOKEEPER_HOME%\bin.
- JAVA_HOME: point this env variable to a suitable, installed version of JAVA e.g, "C:\Program Files\Java\jre1.8.0_212". Use command prompt to identify the installed Java version using
- Start the ZooKeeper server. Open the command prompt, run as administrator, and run "zkserver" inside C:\zookeeper-3.5.5\bin. Possible output:
- Start Kafka server:
- Edit server.properties with line 30. Go to C:\kafka_2.12-2.2.0\config\server.properties and uncomment line 30: listeners = PLAINTEXT://localhost:9092.
- Open command prompt and trace to C:\kafka_2.12-2.2.0\bin\windows and execute following:
C:\kafka_2.12-2.2.0\bin\windows>kafka-server-start.bat C:\kafka_2.12-2.2.0\config\server.properties
- Set Kafka producer: Creating a topic with name “test” and publish some messages after “>” prompts. Open command prompt and trace to C:\kafka_2.12-2.2.0\bin\windows and execute following: C:\kafka_2.12-2.2.0\bin\windows>kafka-console-producer.bat --broker-list 192.168.0.184:9092 --topic test
- Set Kafka consumer: Open command prompt and trace to C:\kafka_2.12-2.2.0\bin\windows and execute following: C:\kafka_2.12-2.2.0\bin\windows>kafka-console-consumer.bat --bootstrap-server 192.168.0.184:9092 --topic test --from-beginning
So, we have covered the basic Kafka set up on local a Windows machine and tried the message publish consume mechanism on a sample topic "test".
Logs for command prompt triggered message:
xxxxxxxxxx
C:\WINDOWS\system32>cd C:\kafka_2.13-2.4.0\bin\windows
C:\kafka_2.13-2.4.0\bin\windows>kafka-console-producer.bat --broker-list 192.168.0.184:9092 --topic test
>hello
C:\WINDOWS\system32>cd C:\kafka_2.13-2.4.0\bin\windows
C:\kafka_2.13-2.4.0\bin\windows>kafka-console-consumer.bat --bootstrap-server 192.168.0.184:9092 --topic test --from-beginning
[2020-02-14 15:55:20,671] WARN [Consumer clientId=consumer-console-consumer-38663-1, groupId=console-consumer-38663] Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2020-02-14 15:55:20,858] WARN [Consumer clientId=consumer-console-consumer-38663-1, groupId=console-consumer-38663] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2020-02-14 15:55:21,004] WARN [Consumer clientId=consumer-console-consumer-38663-1, groupId=console-consumer-38663] Error while fetching metadata with correlation id 5 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2020-02-14 15:55:21,358] WARN [Consumer clientId=consumer-console-consumer-38663-1, groupId=console-consumer-38663] Error while fetching metadata with correlation id 7 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
hello
Let's start with Anypoint Studio (7.4.2) and Mule RunTime (4.2.2) setup:
- Open Anypoint studio and create a sample project named as "KafkaDemo".
- The default flow can be renamed to "Kafka Publisher", which will listen on an HTTP listener, logs the POST request on the 8099 port and action "/kafkademo", and publishes the message on Kafka topic "test".
- Create a "Consumer" flow starting with "Kafka Consumer" and followed by the logger to log the message is being received.
- Kafka Publisher configuration: under the global element, you may need to create a "KafkaPublisher" connection as shown:
- Similarly for Kafka Consumer:
- Code snippet:
<mule xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka" xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="03316fb4-b7aa-4b59-8fc1-be7e67cb81e7" >
<http:listener-connection host="0.0.0.0" port="8099" />
</http:listener-config>
<kafka:kafka-producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="34378022-e968-4207-84e4-695a51681d74" >
<kafka:basic-kafka-producer-connection bootstrapServers="localhost:9092" />
</kafka:kafka-producer-config>
<kafka:kafka-consumer-config name="Apache_Kafka_Consumer_configuration" doc:name="Apache Kafka Consumer configuration" doc:id="561bce8d-0ca4-4c37-be5c-6c1e928e6147" >
<kafka:basic-kafka-consumer-connection groupId="test-consumer" bootstrapServers="localhost:9092" />
</kafka:kafka-consumer-config>
<flow name="KafkaPublisher" doc:id="95036497-ffb3-45db-9860-5a6883cdaa57">
<http:listener doc:name="Listener" doc:id="bb867536-18a4-4d2c-bc69-3e4bd39bf277" config-ref="HTTP_Listener_config" path="/kafkademo" allowedMethods="POST" />
<logger level="INFO" doc:name="Logger" doc:id="b3524fdb-0b55-4adb-a659-493254a4ec09" message="Payload received: #[payload]" />
<kafka:producer doc:name="Publish Message" doc:id="1143950e-1758-4c83-9871-e33bcba76599" config-ref="Apache_Kafka_Producer_configuration" topic="test" key="2" />
<logger level="INFO" doc:name="Logger" doc:id="c1ce9762-7ac2-4d80-bd36-e159af85c9ff" message="Message published to Kafka topic -test" />
</flow>
<flow name="Consumer" doc:id="877cced3-516d-4b45-a98e-6dcf37f3b89b" >
<kafka:consumer doc:name="Message Consumer" doc:id="33a0aee2-d33e-4284-a217-9537946cb631" config-ref="Apache_Kafka_Consumer_configuration" topic="test"/>
<logger level="INFO" doc:name="Logger" doc:id="42f3a992-3e72-4e2e-b96d-20ae62ba0ea2" message="Message consumed: #[payload]"/>
</flow>
</mule>
Let's test it!
- You can use any REST API tool like POSTMAN or SOAP UI or Advanced Rest Client and point to POST operation on the "http://localhost:8089/kafkademo" with a sample string message.
- Run the application in the Mule Anypoint studio and trigger the request from POSTMAN.
- Studio console will show message published and consumed:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-test-consumer-1, groupId=test-consumer] (Re-)joining group
INFO 2020-02-14 16:14:03,393 [WrapperListener_start_runner] org.eclipse.jetty.server.AbstractConnector: Started ServerConnector@74e5e474{HTTP/1.1,[http/1.1]}{0.0.0.0:59319}
INFO 2020-02-14 16:14:03,399 [WrapperListener_start_runner] org.mule.runtime.core.internal.logging.LogUtil:
**********************************************************************
* - - + DOMAIN + - - * - - + STATUS + - - *
**********************************************************************
* default * DEPLOYED *
**********************************************************************
*******************************************************************************************************
* - - + APPLICATION + - - * - - + DOMAIN + - - * - - + STATUS + - - *
*******************************************************************************************************
* kafkademo * default * DEPLOYED *
*******************************************************************************************************
INFO 2020-02-14 16:14:03,416 [[MuleRuntime].cpuLight.02: cpuLight@com.mulesoft.connectors.kafka.internal.service.MuleConsumer.configureAndLaunchSchedulerService:58 @10309b16] [event: ] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-test-consumer-1, groupId=test-consumer] Finished assignment for group at generation 1: {consumer-test-consumer-1-3e5ad9b9-f625-4dcf-9f02-19859d0fc73f=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@6e304ac6}
INFO 2020-02-14 16:14:03,426 [[MuleRuntime].cpuLight.02: cpuLight@com.mulesoft.connectors.kafka.internal.service.MuleConsumer.configureAndLaunchSchedulerService:58 @10309b16] [event: ] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-test-consumer-1, groupId=test-consumer] Successfully joined group with generation 1
INFO 2020-02-14 16:14:03,432 [[MuleRuntime].cpuLight.02: cpuLight@com.mulesoft.connectors.kafka.internal.service.MuleConsumer.configureAndLaunchSchedulerService:58 @10309b16] [event: ] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-test-consumer-1, groupId=test-consumer] Adding newly assigned partitions: test-0
INFO 2020-02-14 16:14:03,448 [[MuleRuntime].cpuLight.02: cpuLight@com.mulesoft.connectors.kafka.internal.service.MuleConsumer.configureAndLaunchSchedulerService:58 @10309b16] [event: ] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-test-consumer-1, groupId=test-consumer] Found no committed offset for partition test-0
INFO 2020-02-14 16:14:03,475 [[MuleRuntime].cpuLight.02: cpuLight@com.mulesoft.connectors.kafka.internal.service.MuleConsumer.configureAndLaunchSchedulerService:58 @10309b16] [event: ] org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-test-consumer-1, groupId=test-consumer] Resetting offset for partition test-0 to offset 3.
INFO 2020-02-14 16:15:09,303 [[MuleRuntime].cpuLight.03: [kafkademo].KafkaPublisher.CPU_LITE @70b68726] [event: 085965a0-4f17-11ea-b1d1-6236dd9be254] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Payload received: hello kafka topic. This is first message. Hope you are ready to consume it.
INFO 2020-02-14 16:15:09,458 [[MuleRuntime].cpuLight.04: [kafkademo].KafkaPublisher.CPU_LITE @70b68726] [event: 085965a0-4f17-11ea-b1d1-6236dd9be254] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Message published to Kafka topic -test
INFO 2020-02-14 16:15:09,471 [[MuleRuntime].cpuLight.03: [kafkademo].Consumer.CPU_LITE @7d5f0087] [event: 1216d5f1-4f17-11ea-b1d1-6236dd9be254] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Message consumed: hello kafka topic. This is first message. Hope you are ready to consume it.
Hence, we were able to publish and consume the message over a Kafka Topic.
I hope this tutorial helps as a first step to use Kafka in Mule 4 application for anyone new to these concepts. Do reach out to me in case of queries. Happy learning !!
Opinions expressed by DZone contributors are their own.
Comments