{{announcement.body}}
{{announcement.title}}

Spring Cloud Stream + Apache Kafka

DZone 's Guide to

Spring Cloud Stream + Apache Kafka

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

· Java Zone ·
Free Resource

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

Binder Implementations

Spring Cloud Stream supports a variety of binder implementations and the following table includes the link to the GitHub projects.

The core building blocks of Spring Cloud Stream are:

  • Destination Binders: Components responsible to provide integration with the external messaging systems.

  • Destination Bindings: Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).

  • Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems)

# Spring-cloud-stream-example

Apache-Kafka Binary Distribution — http://apachemirror.wuchna.com/kafka/2.3.1/kafka_2.11-2.3.1.tgz 

This video may help you — https://www.youtube.com/watch?v=TTsOoQ6_QB0  to setup Apache Zookeeper and Apache Kafka.

# Apache Zookeeper

Create  zookeeper_data  (You can choose any name) inside  C:\kafka_2.11-2.3.1  where my Kafka distribution has kept.

Then go to `C:\kafka_2.11-2.3.1\config` and edit `zookeeper.properties` file and use 

 dataDir=C:\kafka_2.11-2.3.1\zookeeper_data 

#Step to Start Zookeeper

 zookeeper-server-start.bat C:\kafka_2.11-2.3.1\config\

zookeeper.properties 

# Apache Kafka

Create  kafka-logs  folder under  C:\kafka_2.11-2.3.1  and then Go to  C:\kafka_2.11-2.3.1\config  and edit  server.properties  file and use below

Java
 




x
13
9


 
1
log.dirs=C:\kafka_2.11-2.3.1\kafka-logs
2
offsets.topic.num.partitions=1
3
offsets.topic.replication.factor=1
4
transaction.state.log.replication.factor=1
5
transaction.state.log.min.isr=1
6
min.insync.replicas=1
7
default.replication.factor=1



# Step To start Apache Kafka

 kafka-server-start.bat C:\kafka_2.11-2.3.1\config\

server.properties 

Publisher Code

Book.java

Java
 




xxxxxxxxxx
1


1
@Data
2
@AllArgsConstructor
3
@NoArgsConstructor
4
public class Book {
5
    private int id;
6
    private String name;
7
}



SpringCloudStreamPublisherApplication.java

Java
 




xxxxxxxxxx
1
18


1
@SpringBootApplication
2
@EnableBinding(Source.class)
3
@RestController
4
public class SpringCloudStreamPublisherApplication {
5
    @Autowired
6
    private MessageChannel output;
7
 
          
8
    @PostMapping("/publish")
9
    public Book publishEvent(@RequestBody Book book) {
10
        output.send(MessageBuilder.withPayload(book).build());
11
        return book;
12
    }
13
 
          
14
    public static void main(String[] args) {
15
        SpringApplication.run(SpringCloudStreamPublisherApplication.class, args);
16
    }
17
 
          
18
}



The annotation @EnableBinding configures the application to bind the channels INPUT and OUTPUT defined within the interface ProcessorBoth channels are bindings that can be configured to use a concrete messaging-middleware or binder.

Let's take a look at the definition of all these concepts:

  • Bindings — a collection of interfaces that identify the input and output channels declaratively
  • Binder — messaging-middleware implementation such as Kafka or RabbitMQ
  • Channel — represents the communication pipe between messaging-middleware and the application
  • StreamListeners — message-handling methods in beans that will be automatically invoked on a message from the channel after the MessageConverter does the serialization/deserialization between middleware-specific events and domain object types / POJOs
  • Message Schemas — used for serialization and deserialization of messages, these schemas can be statically read from a location or loaded dynamically, supporting the evolution of domain object types

Messages designated to destinations are delivered by the Publish-Subscribe messaging pattern. Publishers categorize messages into topics, each identified by a name. Subscribers express interest in one or more topics. The middleware filters the messages, delivering those of the interesting topics to the subscribers.

Now, the subscribers could be grouped. A consumer group is a set of subscribers or consumers, identified by a group id, within which messages from a topic or topic's partition are delivered in a load-balanced manner.

application.yml

Java
 




xxxxxxxxxx
1
10


1
server:
2
  port: 9192
3
 
          
4
# Using the command from read me create Topic
5
spring:
6
  cloud:
7
    stream:
8
      bindings:
9
        output:
10
          destination: javatechie-stream



Consumer Code —

Book.java

Java
 




xxxxxxxxxx
1


1
@Data
2
@AllArgsConstructor
3
@NoArgsConstructor
4
public class Book {
5
    private int id;
6
    private String name;
7
}



SpringCloudStreamConsumerApplication.java

Java
 




xxxxxxxxxx
1
18


1
@SpringBootApplication
2
@EnableBinding(Sink.class)
3
public class SpringCloudStreamConsumerApplication {
4
 
          
5
    private Logger logger = LoggerFactory.getLogger(SpringCloudStreamConsumerApplication.class);
6
 
          
7
    @StreamListener("input")
8
    public void consumeMessage(Book book) {
9
        logger.info("Consume payload : " + book);
10
    }
11
 
          
12
 
          
13
    public static void main(String[] args) {
14
        SpringApplication.run(SpringCloudStreamConsumerApplication.class, args);
15
    }
16
 
          
17
}
18
 
          



application.yml

Java
 




xxxxxxxxxx
1


1
spring:
2
  cloud:
3
    stream:
4
      bindings:
5
        input:
6
          destination: javatechie-stream



Now create the topic using the below command - 

Create Topic —

 kafka-topics.bat --create --zookeeper localhost:2181 

 --replication-factor 1 --partitions 1 -topic javatechie 

List down all available topics —

 kafka-topics.bat --list --zookeeper localhost:2181 

Now start the publisher application and consumer application and make the POST request

Java
 




xxxxxxxxxx
1


 
1
curl -X POST http://localhost:9192/publish  -H 'content-type: application/json' -d '{"id" : 1, "name" : "Java 8 In Action" }'



Execute this 

Java
 




xxxxxxxxxx
1


1
2020-04-16 20:56:27.160  INFO 2940 --- [container-0-C-1] s.a.SpringCloudStreamConsumerApplication : Consume payload : Book(id=1, name=Java 8 In Action)
2
 
          
3
2020-04-16 21:13:11.890  INFO 2940 --- [container-0-C-1] s.a.SpringCloudStreamConsumerApplication : Consume payload : Book(id=1, name=Java 8 In Action)
4
 
          
5
2020-04-16 21:13:12.200  INFO 2940 --- [container-0-C-1] s.a.SpringCloudStreamConsumerApplication : Consume payload : Book(id=1, name=Java 8 In Action)
6
 
          
7
2020-04-16 21:13:12.381  INFO 2940 --- [container-0-C-1] s.a.SpringCloudStreamConsumerApplication : Consume payload : Book(id=1, name=Java 8 In Action)
8
 
          
9
2020-04-16 21:13:12.573  INFO 2940 --- [container-0-C-1] s.a.SpringCloudStreamConsumerApplication : Consume payload : Book(id=1, name=Java 8 In Action)



Topics:
cloud ,java ,spring ,spring batch ,spring cloud

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}