DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Spring Cloud Stream: A Brief Guide
  • How To Get Closer to Consistency in Microservice Architecture
  • 7 Microservices Best Practices for Developers
  • Spring Microservice Tip: Abstracting the Database Hostname With Environment Variable

Trending

  • Java's Quiet Revolution: Thriving in the Serverless Kubernetes Era
  • Fixing Common Oracle Database Problems
  • Virtual Threads: A Game-Changer for Concurrency
  • Enhancing Avro With Semantic Metadata Using Logical Types
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. How Kafka Can Make Microservice Planet a Better Place

How Kafka Can Make Microservice Planet a Better Place

In this article, we want to focus on using Kafka in the microservice architecture, and we need an important concept named Kafka Topic for that.

By 
Reza Ganji user avatar
Reza Ganji
DZone Core CORE ·
Aug. 28, 21 · Tutorial
Likes (13)
Comment
Save
Tweet
Share
18.5K Views

Join the DZone community and get the full member experience.

Join For Free

Once upon a time but not so long ago, most software architectures were monolithic and everything was placed in hardly mutable and large software packages, but nowadays Microservice Architecture has become a flagship flower and you hear it everywhere. It divides software into mini software with specific responsibilities, actually, instead of huge software, we have a bunch of smaller software named microservices with minimum interaction with each other.

Designing microservices needs specific knowledge like DDD design and of course a good understanding of the monolithic world. Personally, I believe that if you are not a good monolithic thinker, you can not design a fantastic microservice-based architecture. Anyway, I don't want to teach microservice architecture in this article, there are lots of resources on the internet for that.

So what is the problem? "Interaction." Microservices although very few, but interconnected, and has interaction with each other, but sometimes these interactions cause problems. In this article, I try to explain a kind of interaction with the asynchronous approach.

Maslow's Hammer

Before we start, I want to have a brief explanation of an interesting theorem, Maslow's hammer, it is so easy to understand and very funny, but absolutely important, all is that: when all you have is a hammer, everything looks like a nail!

According to the theorem, when we acquire a new skill, we tend to see opportunities to use it everywhere, it is dangerous. Every skill, every tool, and every approach works on a special condition, not everywhere.

Microservices Asynchronous Communication

Generally, there are two ways of microservice inter-service communication: synchronous and asynchronous. In the synch approach services are waiting for each other's behavior until they are called from other sides they have no action, this approach sometimes may cause problems like performance issues. Consider an example we have an online order system, that customers order things from our site and after shopping completion, we notify them with SMS, if all these processes been have done synchronous customer show wait for registering the order, payment as well as notification SMS process.

Let's think different and also asynchronous, after customers successfully registered his order he received successful order from the system and has not forced to wait until receiving the SMS from System, on the other hand, we have a notification service that acts async and has a queue of orders that should be notified to their owners.

Simply:

We have two microservices: OrderService, NotificationService.

  1. The order has been registered by the customer.
  2. The customer received a "successful order" from the system and goes to drink his tea.
  3. The order has been put in the notification server queue.
  4. Notification server starts processing of queued order and sends SMS for them.

So briefly according to microservices:

  • Synchronous communication results in tight runtime coupling, both the client and service must be available for the duration of the request.
  • Asynchronous messaging for inter-service communication. Services communicating by exchanging messages over messaging channels.

There are several different styles of asynchronous communication:

  • Request/response — a service sends a request message to a recipient and expects to receive a reply message promptly.
  • Notifications — a sender sends a message to a recipient but does not expect a reply. Nor is one sent.
  • Request/asynchronous response — a service sends a request message to a recipient and expects to receive a reply message eventually.
  • Publish/subscribe — a service publishes a message to zero or more recipients.
  • Publish/asynchronous response — a service publishes a request to one or recipients, some of whom send back a reply.

Synchronous > Asynchronous Diagram

There are several tools for asynchronous communication, like Apache Kafka or RabbitMQ. In this article we use Kafka.

What is Kafka?

Apache Kafka is a distributed open-source streaming and event handling platform that is used by many companies as well as software architectures. Kafka was initially conceived as a messaging queue and is based on an abstraction of a distributed commit log. Since being created and open-sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.

Originally Kafka was developed under the Apache license but later Confluent forked on it and delivered a robust version of it. Actually Confluent delivers the most complete distribution of Kafka with Confluent Platform. Confluent Platform improves Kafka with additional community and commercial features designed to enhance the streaming experience of both operators and developers in production, at a massive scale.

You can find thousands of documents about learning Kafka. In this article, we want to focus on using it in the microservice architecture, and we need an important concept named Kafka Topic for that.

A Topic is a category/feed name to which records are stored and published. all Kafka records are organized into topics. Producer applications write data to topics and consumer applications read from topics. Records published to the cluster stay in the cluster until a configurable retention period has passed by.

Apache Kafka Broker Diagram With Topics

Installing Kafka

By using Docker and docker-compose, installing Kafka is so easy you need just install confluent, just follow these instructions:

1. Download or copy the contents of the Confluent Platform all-in-one Docker Compose file, for example:

Shell
 
curl --silent --output docker-compose.yml \
  https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.2.0-post/cp-all-in-one/docker-compose.yml


2. Start Confluent Platform with the -d option to run in detached mode:

Java
 
docker-compose up -d


The above command starts Confluent Platform with a separate container for each Confluent Platform component. Your output should resemble the following:

Java
 
Creating network "cp-all-in-one_default" with the default driver
Creating zookeeper ... done
Creating broker    ... done
Creating schema-registry ... done
Creating rest-proxy      ... done
Creating connect         ... done
Creating ksql-datagen    ... done
Creating ksqldb-server   ... done
Creating control-center  ... done
Creating ksqldb-cli      ... done


After 1 hour, the installation will be complete! And you reach the Kafka console at: http://localhost:9021.

Create Topic(s)

Now time to create an important part; topic creation is the first part of our job as developers. Simply open the console in the above-mentioned address and create a topic. We should create two topics with the names: first, second.

http://localhost:9021

Creating a Topic on Confluent

Case Study

For better understanding I will try to explain a topic with an example and some considerations:

  1. We have two microservices: OrderService, NotificationService.
  2. Actually, we should have another microservice named PaymentService, but for simplicity, I considered it in OrderService, so both ordering and payment operations are done in OrderService.
  3. After an order is completed, NotificationService is called.
  4. When NotificationService is called, in the case of a successful payment, it sends a "successful order" to the customer, else it sends a "failed order " message to the customer.

Let's Translate the Case Study to A Producer, Consumer Tool

The above case study represents two actors: First, OrderService as producer, which produces an event named order and puts it in the Kafka topic. The second is the NotificationService which is a consumer with consumes the event that generated in the topic.

One Producer and One Consumer Project

this is the simplest implementation, there is one producer project and one consumer project. Let's start them using Spring Boot with Kafka dependency.

Here is the producer project:

First of all, we should add a Kafka dependency to our project (pom.xml):

XML
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.5.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.cmp.kafka.producer</groupId>
	<artifactId>producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>producer</name>


	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>


The next job is configuring the project to connect Kafka, and you can see the application.properties file below:

Shell
 
spring.kafka.bootstrap-servers= localhost:9092
spring.kafka.consumer.topic= first

spring.kafka.consumer.key-serializer= org.apache.kafka.common.serialization.StringSerializer

spring.kafka.consumer.value-serializer= org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.properties.spring.json.trusted.packages=*


Remember that the default listener for the Kafka port is 9092.

We have two message classes named Order and PaymentStatus which indicate whether the payment is successful or not.

Java
 
public class Order {
    private Long id;
    private  String Name;
    private PaymentStatus status;
    public Order() {

    }

    public Order(Long id, String name, PaymentStatus status) {
        this.id = id;
        Name = name;
        this.status=status;
    }

    public Long getId() {
        return id;
    }

    public String getName() {
        return Name;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public void setName(String name) {
        Name = name;
    }

    @Override
    public String toString() {
        return "Payment{" +
                "id=" + id +
                ", Name='" + Name + '\'' +
                ", status=" + status +
                '}';
    }

    public PaymentStatus getStatus() {
        return status;
    }

    public void setStatus(PaymentStatus status) {
        this.status = status;
    }
}


Java
 
public enum PaymentStatus {
    SUCCESS,FAIL
}


Now, we should add the OrderService class which is responsible for sending messages to Kafka:

Java
 
@Service
public class OrderService {

    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;

    public void sendMessage(Order payment) {
        ListenableFuture<SendResult<String, Order>> future =
                kafkaTemplate.send("first_topic", payment);
             System.out.println("sent Message to kafka topic : " + payment);


    }
    }


There is a magical keyword named kafkaTemplate which interacts with Kafka using configurations.

Finally, we add a rest class that calls OrderService from browser and sends them to Kafka:

Java
 
@RestController
@RequestMapping(value = "/kafka")
public class OrderController {
    @Autowired
    OrderService producerService;


    @GetMapping(value = "/publish")
    public String sendMessageToKafkaTopic(@RequestParam("message") String message,
                                          @RequestParam("paymentStatus") PaymentStatus paymentStatus) {
            producerService.sendMessage(new Order( 1l,message, paymentStatus));

        return "Message sent to the Kafka Topic java_in_use_topic Successfully" + message
                + "with status " +paymentStatus ;


    }


}


Now it's time to enjoy it! Start the project using mvn spring-boot:run and launch the browser with the following URL:

http://localhost:8080/kafka/publish?message=test2&paymentStatus=FAIL

http://localhost:8080/kafka/publish?message=reza&paymentStatus=SUCCESS

So the result is:

Shell
 
2021-08-19 19:03:41.563  INFO 16104 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2021-08-19 19:03:41.564  INFO 16104 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2021-08-19 19:03:41.566  INFO 16104 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629383621562
2021-08-19 19:03:41.786  INFO 16104 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 3Ifr9ZA2SXmVVwojL8kIHg
sent Message to kafka topic : Payment{id=1, Name='test2', status=FAIL}
sent Message to kafka topic : Payment{id=1, Name='reza', status=SUCCESS}


After all of this, we have to message a topic named 'first_topic' and then time to consume it.

Here is the Consumer project:

The consumer or NotificationService has the same pom.xml and application.properties and also has the classes Order and PaymentStatus, same as OrderService.

Here is the main application class with the @EnableKafka annotation:

Java
 
@SpringBootApplication
@EnableKafka
public class NotificationApplication {

	public static void main(String[] args) {
		SpringApplication.run(NotificationApplication.class, args);



	}

}


The final class is NotificationService with a method that listens 'first' :

Java
 
@KafkaListener(topics = "first", groupId = "default" ,containerFactory = "kafkaListenerContainerFactory" )
    public void listenGroupFoo( Order payment) {
        if(payment.getStatus()== PaymentStatus.SUCCESS)
            System.out.println("we have successful ordered and going to send success message: " + payment);
        else
            System.out.println("we have unsuccessful ordered and going to send failed message: " + payment);

    }


This code snippet is a part of NotificationServce, you can see the whole class code below:

Java
 
@EnableKafka
@Configuration
public class NotificationService {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;


    @KafkaListener(topics = "first", groupId = "default" ,containerFactory = "kafkaListenerContainerFactory" )
    public void listenGroupFoo( Order payment) {
        if(payment.getStatus()== PaymentStatus.SUCCESS)
            System.out.println("we have successful ordered and going to send success message: " + payment);
        else
            System.out.println("we have unsuccessful ordered and going to send failed message: " + payment);

    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        JsonDeserializer<HeaderEnricher.Container> deserializer = new JsonDeserializer<>(HeaderEnricher.Container.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);


        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        props.put(JsonDeserializer.TRUSTED_PACKAGES, Order.class.getPackage().getName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Order.class);
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");





        return props;
    }

    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Order>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}


I am sure you can write this code cleaner than me!

After executing the code, the result is:

Shell
 
2021-08-19 19:03:41.563  INFO 16104 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2021-08-19 19:03:41.564  INFO 16104 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2021-08-19 19:03:41.566  INFO 16104 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629383621562
2021-08-19 19:03:41.786  INFO 16104 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 3Ifr9ZA2SXmVVwojL8kIHg
sent Message to kafka topic : Payment{id=1, Name='test2', status=FAIL}
sent Message to kafka topic : Payment{id=1, Name='reza', status=SUCCESS}


As you can see everything that OrderService had previously put in the 'first' consumed by NotificationService. But don't forget that, a data structure that's put in the topic should have exactly the same structure as the data structure that has consumed.

Let's Do It With The Spring Cloud Streaming Approach

Spring Cloud Stream is a framework for building highly scalable, event-driven microservices connected with shared messaging systems. Spring Cloud Stream provides components that abstract the communication with many message brokers away from the code.

The core building blocks of Spring Cloud Stream are:

Destination Binders: Components responsible to provide integration with the external messaging systems.

Destination Bindings: Bridge between the external messaging systems and application code (producer/consumer) provided by the end-user.

Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).
The Spring Cloud Streaming Approach

In addition to this in this part we should know some functional concepts which had exposed in Java 8:

Consumer: A Consumer is a functional interface that accepts a single input and returns no output. In layman’s language, as the name suggests the implementation of this interface consumes the input supplied to it. Consumer interface has two methods:

Java
 
void accept(T t);

default Consumer<T> andThen(Consumer<? super T> after);


Supplier: A Supplier is a simple interface that indicates that this implementation is a supplier of results. This interface, however, does not enforce any restrictions that supplier implementation needs to return a different result on each invocation. The supplier has only one method, get(), and does not have any other default and static methods.

Predicate: A Predicate interface represents a boolean-valued function of an argument. This is mainly used to filter data from a Java Stream. The filter method of a stream accepts a predicate to filter the data and return a new stream satisfying the predicate. A predicate has a test() method which accepts an argument and returns a boolean value.

Function: A Function interface is more of a generic one that takes one argument and produces a result. This has a Single Abstract Method (SAM) apply which accepts an argument of a type T and produces a result of type R.

Java
 
Function<T,R>


This is the most useful functional interface used in stream processing.

The final topic that we should learn about is before starting our stream processing project is Kstream.

KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event. In the real world, Kafka Streams greatly simplifies the stream processing from topics. Built on top of Kafka client libraries, it provides data parallelism, distributed coordination, fault tolerance, and scalability. It deals with messages as an unbounded, continuous, and real-time flow of records.

Kafka Cluster Flow

Remember the previous project: we just have one producer project that had put the messages into the topic with the name 'first' and one consumer project, which consumes the messages.

Now we want to use the Spring Cloud Streams capabilities, in addition to that project's, so now we have a project named Stream Processor, which consumes the messages in topic 'first' then processes it and puts just success message into the already created topic with name 'second.'

Flow Through Stream Processor

OK, let's do it: we need a Stream Processor. Before everything we should add pom.xml and it's dependencies:

XML
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.5.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.tosan</groupId>
	<artifactId>stream-processor</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>stream-processor</name>
	<properties>
		<java.version>8</java.version>
		<spring-cloud.version>2020.0.3</spring-cloud.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
			<scope>test</scope>
			<classifier>test-binder</classifier>
			<type>test-jar</type>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>


The next step is configuration. Stream processor needs to now target and destination or 'IN' and 'OUT' topics, which is easily done in application.properties files:

Shell
 
spring.kafka.bootstrap-servers= localhost:9092

spring.cloud.stream.bindings.process-in-0.destination=first
spring.cloud.stream.bindings.process-out-0.destination=second


The final step is writing our simple processor which just puts success messages in topic 'second'.

Java
 
@Configuration
public class Streamprocesor {


    @Bean
    public Function<KStream<String, Payment>,KStream<String, Payment>> process() {
        return input ->
                input.filter((key,payment) -> 
                             payment.getStatus()==PaymentStatus.SUCCESS);
    }

 

Don't forget that all three projects should have 2 Order and PaymentStatus classes with the same structure.

In addition to all this, the NotificationService classes should change a little, so we should change the listener topic to topic with the name 'second':

Java
 
@EnableKafka
@Configuration
public class NotificationService {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;


    @KafkaListener(topics = "second", groupId = "stream" ,containerFactory = "kafkaListenerContainerFactory" )
    public void listenGroupFoo( Order payment) {
        if(payment.getStatus()== PaymentStatus.SUCCESS)
            System.out.println("we have successful ordered and going to send success message: " + payment);
        else
            System.out.println("we have unsuccessful ordered and going to send failed message: " + payment);

    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        JsonDeserializer<HeaderEnricher.Container> deserializer = new JsonDeserializer<>(HeaderEnricher.Container.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);


        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        props.put(JsonDeserializer.TRUSTED_PACKAGES, Order.class.getPackage().getName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Order.class);
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");





        return props;
    }

    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Order>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}



If you run all three projects and execute the following URLs in the browser:

http://localhost:8080/kafka/publish?message=reza&paymentStatus=SUCCESS

http://localhost:8080/kafka/publish?message=test&paymentStatus=SUCCESS

http://localhost:8080/kafka/publish?message=test1&paymentStatus=FAIL

http://localhost:8080/kafka/publish?message=test2&paymentStatus=FAIL

The NotificationService should display (producer just sent the success messages):

Shell
 
ccessfully synced group in generation Generation{generationId=11, memberId='consumer-stream-1-417cf00b-ec05-4a9d-ada3-48b927354b00', protocol='range'}
2021-08-20 12:49:15.129  INFO 11624 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-stream-1, groupId=stream] Notifying assignor about the new Assignment(partitions=[second-0])
2021-08-20 12:49:15.132  INFO 11624 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-stream-1, groupId=stream] Adding newly assigned partitions: second-0
2021-08-20 12:49:15.143  INFO 11624 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-stream-1, groupId=stream] Setting offset for partition second-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.7.61:9092 (id: 1 rack: null)], epoch=0}}
2021-08-20 12:49:15.143  INFO 11624 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : stream: partitions assigned: [second-0]
we have successful ordered and going to send success message: Payment{id=1, Name='reza', status=SUCCESS}
we have successful ordered and going to send success message: Payment{id=1, Name='test', status=SUCCESS}


What Is Next?

In the next article will try to write something about transaction management with Kafka, with separate databases.

Conclusion

As you can see we developed not blocking microservices with the use of Kafka, but don't forget Maslow's Hammer: DO NOT USE EVERY SOLUTION EVERYWHERE.

kafka microservice Database Spring Cloud Spring Framework Stream processing Java (programming language) Docker (software) Data structure

Opinions expressed by DZone contributors are their own.

Related

  • Spring Cloud Stream: A Brief Guide
  • How To Get Closer to Consistency in Microservice Architecture
  • 7 Microservices Best Practices for Developers
  • Spring Microservice Tip: Abstracting the Database Hostname With Environment Variable

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!