Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Building Microservices With Netflix OSS, Apache Kafka, and Spring Boot – Part 2: Message Broker and User Service

DZone's Guide to

Building Microservices With Netflix OSS, Apache Kafka, and Spring Boot – Part 2: Message Broker and User Service

Learn how to set up Zookeeper and the user service in Part 2 of this tutorial on building microservices with Kafka and Spring Boot.

· Microservices Zone ·
Free Resource

Containerized Microservices require new monitoring. See why a new APM approach is needed to even see containerized applications.

After Part 1 of this series, here is what comes next:

Although we are not going to use the distributed features of Kafka for the test, it is still a distributed system and is built to use Zookeeper to track the status of its cluster nodes, topics, partitions, etc. So before using Kafka, it is necessary to have Zookeeper installed. The following commands are for installing Zookeeper and Kafka on Ubuntu 16.04.

Install Zookeeper:
$ sudo apt-get install zookeeperd

When installed, Zookeeper will be automatically started as a daemon, and by default will be listening on port 2181.

Ask Zookeeper if it is OK:
$ telnet localhost 2181

Type in  ruok  and press enter and you should be answered:  imok  Connection closed by foreign host.

Download the latest Kafka:

Go to https://kafka.apache.org/downloads and look for the latest binary release link (at the time of this writing, it is Kafka_2.12-0.11.0.1.tgz). Following the link, you will be navigated to a page suggesting a mirror site for your download.

$ mkdir ~/kafka
$ wget http://apache.cbox.biz/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
$ tar -xvzf kafka_2.11-0.11.0.1.tgz ~/kafka

Configure the Kafka server:

You need to update the server.properties file. By default deleting topics is disabled so it is good to enable it, adding delete.topic.enable at the end of the file.

$ nano ~/kafka/config/server.properties
delete.topic.enable = true

Run the Kafka Server as a background process:

$ nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &

Verify Kafka is running:

$ echo dump | nc localhost 2181 | grep brokers


User Service

Now we have Kafka running and we can continue with building the user microservice. As mentioned in ot will:

1. Register itself to the Service registry (Eureka).

2. Take its configuration from the Config server (Spring Cloud Config).

3. Have two endpoints:
  • /member - where with POST request will register the new users.
  • /member - where with GET request will be able to take all registered users.

4. On every new registration, the User service will send a message "USER_REGISTERED" to the message broker (Kafka).

5. Store the registered users in-memory H2 database for later reference.

Let's first create a new Spring boot project(ms-user) with SPRING INITIALIZR.

The following dependencies will be needed: Eureka Discovery; JPA; H2; Kafka; Config Client;

/pom.xml

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-config</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>run

Same as for the config server, to enable discovery client update the main Application file adding the @EnableEurekaClient annotation and in the application configuration file, add the name and the running port for the microservice. The new here is enabling cloud config discovery. It will make the microservice to look for the config server with the help of the Service registry only having the Config server id. And no hardcoded URLs or ports are needed.

/bootstrap.yml

server:
  port: 8081
spring:
  application:
    name: ms-user
  cloud:
    config:
      discovery:
        enabled: true
        service-id: ms-config-server

The configurations for the h2, datasource, and Kafka will be read from the config server so they go in the ms-config-properties folder under ms-user.

/ms-user.yml

spring:
  h2:
    console:
      enabled: true
      path: /h2-console
  datasource:
    url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    username: sa
    password:
  kafka:
    bootstrap-servers: localhost:9092
    topic:
      userCreated: USER_CREATED_TOPIC
security:
  basic:
    enabled: false

First, we will create a simple Spring web project structure with User entity, UserRepository, UserService and UserController. We will not discuss them widely as they are common and the structure is frequently used for spring projects.

The User entity will be used to transfer the data. It has a simple structure, just username and password. We will put the email to which will send a confirmation message as username.

/User. java

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@NotNull
private String username;
@NotNull
private String password;

We will use Spring data to handle the CRUD operations on the User entity, so UserRepository will be simple too.

/UserRepository.java

public interface UserRepository extends CrudRepository<User, Long>
 {
}

In the UserService, we will have the methods for registering a user and getting all users.

/UserService.java

public interface UserService {
 User registerUser(User input);

Finally, in UserController, create the GET /member and POST /member REST endpoints.

Call the UserService to return the result from it:

@RequestMapping(method = RequestMethod.GET, path = "/member")
public ResponseEntity < Iterable < User >> getAll() {
 Iterable < User > all = userService.findAll();
 return new ResponseEntity < > (all, HttpStatus.OK);
}

@RequestMapping(method = RequestMethod.POST, path = "/member")
public ResponseEntity < User > register(@RequestBody User input) {
  User result = userService.registerUser(input);

Let's take a closer look at the sender configuration. To be able to produce messages for the Kafka topics we need KafkaTemplate, which is the class responsible for executing high-level operations. The KafkaTemplate needs ProducerFactory, which sets the strategy to produce a Producer instance(s). The ProducerFactory for its part needs a Map of configuration properties. The most important of which are BOOTSTRAP_SERVERS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG, and VALUE_SERIALIZER_CLASS_CONFIG.

/SenderConfig.java

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map < String, Object > producerConfigs() {
 Map < String, Object > props = new HashMap < > ();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

 return props;
}

@Bean
public ProducerFactory < String, User > producerFactory() {
 return new DefaultKafkaProducerFactory < > (producerConfigs());
}

@Bean
public KafkaTemplate < String, User > kafkaTemplate() {
 return new KafkaTemplate < > (producerFactory());
}

@Bean
public Sender sender() {

Here we configured it to send messages to Kafka server on localhost:9092 (bootstrapServers - taken from cloud config server). The String message payload is transformed from User object with the help of JsonSerializer. Finish with the implementation of the Sender bean, which will use the above configured KafkaTemplate.

/Sender.java

public class Sender {

 @Autowired
 private KafkaTemplate < String, User > kafkaTemplate;

 public void send(String topic, User payload) {
   kafkaTemplate.send(topic, payload);

The business logic to send a message to Kafka when a new user is saved in the database goes in the UserService implementation.

/UserServiceImpl.java

@Value("${spring.kafka.topic.userCreated}")
private static String USER_CREATED_TOPIC;

private UserRepository userRepository;
private Sender sender;

@Override
public User registerUser(User input) {
 User createdUser = userRepository.save(input);
 sender.send(USER_CREATED_TOPIC, createdUser);
 return createdUser;
}

In Part 3 of this blog, we will build the email service and the gateway.

Don't forget to share your opinion in the comments section below.

Automatically manage containers and microservices with better control and performance using Instana APM. Try it for yourself today.

Topics:
microservices ,kafka ,spring boot ,integration

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}