DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • 7 Microservices Best Practices for Developers
  • How Kafka Can Make Microservice Planet a Better Place
  • Microservices With JHipster
  • Spring Reactive Microservices: A Showcase

Trending

  • How to Format Articles for DZone
  • Unlocking the Benefits of a Private API in AWS API Gateway
  • AI Meets Vector Databases: Redefining Data Retrieval in the Age of Intelligence
  • The Modern Data Stack Is Overrated — Here’s What Works
  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Using Apache Kafka to Communicate Between Microservices

Using Apache Kafka to Communicate Between Microservices

Learn more about how to communicate between microservices using Apache Kafka by decoupling them to simplify design.

By 
Jimena Garbarino user avatar
Jimena Garbarino
·
May. 22, 20 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
21.4K Views

Join the DZone community and get the full member experience.

Join For Free

Traditionally, communication between microservices is done using their REST APIs. As systems evolve, however, the number of microservices gets larger—making communication more complex. Services begin depending on each other in a tightly coupled manner, slowing down dev teams work. Although this type of model may exhibit low latency, it only works if services are highly available.

To solve for this, new architectures decouple senders and receivers using asynchronous messaging. By using a Kafka-centric approach, you preserve low latency, gaining message balancing and centralized management.

When you have a legacy platform, it’s recommended that you implement asynchronous messaging to de-couple it and ready it for a move to microservices.  

This tutorial will teach you:

  • How to create a microservices architecture with JHipster
  • How to enable Kafka integration for communicating microservices
  • How to set up Okta as the authentication provider

What is Kafka?

Apache Kafka is a distributed streaming platform. It was initially conceived as a message queue and open-sourced by LinkedIn in 2011. Its community evolved Kafka to provide key capabilities:

  • Publish and Subscribe to streams of records, like a message queue.
  • Storage system so messages can be consumed asynchronously. Kafka writes data to a scalable disk structure and replicates for fault-tolerance. Producers can wait for write acknowledgments.
  • Stream processing with Kafka Streams API, enables complex aggregations or joins of input streams onto an output stream of processed data.

Traditional messaging models are queue and publish-subscribe. In a queue, each record goes to one consumer. In publish-subscribe, the record is received by all consumers.

The Consumer Group in Kafka is an abstraction that combines both models. Record processing can be load balanced among the members of a consumer group and Kafka allows you to broadcast messages to multiple consumer groups. It is the same publish-subscribe semantic where the subscriber is a cluster of consumers instead of a single process.

Popular use cases of Kafka include:

  • The traditional messaging, to decouple data producers from processors with better latency and scalability.
  • Site activity tracking with real-time publish-subscribe feeds
  • As a replacement for file-based log aggregation, where event data becomes a stream of messages
  • Data Pipelines where data consumed from topics is transformed and fed to new topics
  • As an external commit log for a distributed system
  • As a backend log storage for event sourcing applications, where each state change is logged in time order.

Microservices Communication With Kafka

Let’s build a microservices architecture with JHipster and Kafka support. In this tutorial, you’ll create a store and an alert microservices. The store microservices will create and update store records. The alert microservice will receive update events from store and send an email alert.

Prerequisites:

  • Java 8+
  • Docker
  • Docker Compose
  • Node.js

Install JHipster.

Shell
 




x


 
1
npm install -g generator-jhipster@6.6.0


The --version command should output something like this:

Shell
 




xxxxxxxxxx
1


 
1
$ jhipster --version
2
INFO! Using JHipster version installed globally
3
6.6.0


Create a directory for the project.

Shell
 




xxxxxxxxxx
1


 
1
mkdir jhipster-kafka
2
cd jhipster-kafka


Create an apps.jh file that defines the store, alert, and gateway applications in JHipster Domain Language (JDL). Kafka integration is enabled by adding messageBroker kafka to the store and alert app definitions.

Java
 




x
64


 
1
application {
2
  config {
3
    baseName gateway,
4
    packageName com.okta.developer.gateway,
5
    applicationType gateway,
6
    authenticationType oauth2,
7
    prodDatabaseType postgresql,
8
    serviceDiscoveryType eureka,
9
    testFrameworks [protractor]
10
  }
11
  entities Store, StoreAlert
12
}
13
 
          
14
application {
15
  config {
16
    baseName store,
17
    packageName com.okta.developer.store,
18
    applicationType microservice,
19
    authenticationType oauth2,
20
    databaseType mongodb,
21
    devDatabaseType mongodb,
22
    prodDatabaseType mongodb,
23
    enableHibernateCache false,
24
    serverPort 8082,
25
    serviceDiscoveryType eureka
26
    messageBroker kafka
27
  }
28
  entities Store
29
}
30
 
          
31
application {
32
  config {
33
    baseName alert,
34
    packageName com.okta.developer.alert,
35
    applicationType microservice,
36
    authenticationType oauth2,
37
    serverPort 8082,
38
    serviceDiscoveryType eureka
39
    messageBroker kafka
40
  }
41
  entities StoreAlert
42
}
43
 
          
44
enum StoreStatus {
45
  OPEN,
46
  CLOSED
47
}
48
 
          
49
entity Store {
50
  name String required,
51
  address String required,
52
  status StoreStatus,
53
  createTimestamp Instant required,
54
  updateTimestamp Instant
55
}
56
 
          
57
entity StoreAlert {
58
  storeName String required,
59
  storeStatus String required,
60
  timestamp Instant required
61
}
62
 
          
63
microservice Store with store
64
microservice StoreAlert with alert


Now, in your jhipster-kafka folder, import this file using import-jdl.

Shell
 




xxxxxxxxxx
1


 
1
jhipster import-jdl apps.jh
2
 
          


Configure Microservices Deployment with Docker Compose

In the project folder, create a sub-folder for Docker Compose and run JHipster’s docker-compose sub-generator.

Shell
 




xxxxxxxxxx
1


 
1
mkdir docker-compose
2
cd docker-compose
3
jhipster docker-compose


The generator will ask you to define the following things:

  1. Type of application: Microservice application
  2. Type of gateway: JHipster gateway based on Netflix Zuul
  3. Leave the root directory for services as default: ../
  4. Which applications to include: gateway, store, alert
  5. If the database is clustered: No
  6. If monitoring should be enabled: No
  7. Password for JHipster Registry: <default>

Almost when the generator completes, a warning shows in the output:

Plain Text
 




xxxxxxxxxx
1


 
1
WARNING! Docker Compose configuration generated, but no Jib cache found
2
If you forgot to generate the Docker image for this application, please run:
3
To generate the missing Docker image(s), please run:
4
 ./mvnw -ntp -Pprod verify jib:dockerBuild in /home/indiepopart/jhipster-kafka/alert
5
 ./mvnw -ntp -Pprod verify jib:dockerBuild in /home/indiepopart/jhipster-kafka/gateway
6
 ./mvnw -ntp -Pprod verify jib:dockerBuild in /home/indiepopart/jhipster-kafka/store


You will generate the images later, but first, let’s add some security and Kafka integration to your microservices.

Add OpenID Connect (OIDC) Authentication

This microservices architecture is set up to authenticate against Keycloak. Let’s update the settings to use Okta as the authentication provider.

First of all, go to Okta and get a free developer account.

After you’ve activated your account, log in and go to Applications > New Application. Click Web and Next. Set the following application settings:

  • Name: JHipster Kafka
  • Login redirect URIs:
    • http://localhost:8080/login/oauth2/code/oidc
    • http://localhost:8761/login/oauth2/code/oidc
  • Logout redirect URIs:
    • http://localhost:8080
    • http://localhost:8761
  • Grant Type Allowed: Authorization Code and Refresh Token

Click Done to continue. Copy the Client ID and Client secret, as you will need them to configure your JHipster application. You can find the Org URL at the top right corner of your Okta Dashboard.

JHipster applications require the specific user roles ROLE_USER and ROLE_ADMIN to be included as claims in the ID Token. In the Okta Developer Console, go to Users > Groups and create a group for each JHipster role, then add users to each group.

Now go to API > Authorization Servers, select the default server, and Add Claim with the following settings:

  1. Name: groups
  2. Include in token type: ID Token, Always
  3. Value type: Groups
  4. Filter: Matches regex, set the Regex to be .*

In the project, create a docker-compose/.env file and add the following variables. For the values, use the settings from the Okta web application you created:

Java
 




xxxxxxxxxx
1


 
1
OIDC_CLIENT_ID={yourClientId}
2
OIDC_CLIENT_SECRET={yourClientSecret}
3
OIDC_ISSUER_URI={yourOrgUrl}/oauth2/default


Edit docker-compose/docker-compose.yml and update the SPRING_SECURITY_* settings for the services store-app, alert-app and gateway-app:

Java
 




xxxxxxxxxx
1


 
1
- SPRING_SECURITY_OAUTH2_CLIENT_REGISTRATION_OIDC_CLIENT_ID=${OIDC_CLIENT_ID}
2
- SPRING_SECURITY_OAUTH2_CLIENT_REGISTRATION_OIDC_CLIENT_SECRET=${OIDC_CLIENT_SECRET}
3
- SPRING_SECURITY_OAUTH2_CLIENT_PROVIDER_OIDC_ISSUER_URI=${OIDC_ISSUER_URI}


The same authentication must be set up for the JHipster Registry. Edit docker-compose/jhipster-registry.yml and set the same values.

Use Spring Cloud Config to Override OpenID Connect Settings

An alternative to setting environment variables for each application in docker-compose.yml is to use Spring Cloud Config. JHipster Registry includes Spring Cloud Config, so it’s pretty easy to do.

Open docker-compose/central-server-config/application.yml and add your Okta settings there.

Java
 




xxxxxxxxxx
1
11


 
1
spring:
2
  security:
3
    oauth2:
4
      client:
5
        provider:
6
          oidc:
7
            issuer-uri: https://{yourOktaDomain}/oauth2/default
8
        registration:
9
          oidc:
10
            client-id: {yourClientId}
11
            client-secret: {yourClientSecret}


The registry, gateway, store, and alert applications are all configured to read this configuration on startup.

Communicate Between Store and Alert Microservices

The JHipster generator adds a kafka-clients dependency to applications that declare messageBroker kafka (in JDL), enabling the Kafka Consumer and Producer Core APIs.

For the sake of this example, update the store microservice to send a message to the alert microservice through Kafka, whenever a store entity is updated.

In the store project, create an AlertService for sending the event details. This service will build the payload and serialize it into a JSON String, and use the default Kafka StringSerializer and StringDeserializer already defined in application.yml.


JSON
 




xxxxxxxxxx
1
59


 
1
package com.okta.developer.store.service;
2
 
          
3
import com.fasterxml.jackson.core.JsonProcessingException;
4
import com.fasterxml.jackson.databind.ObjectMapper;
5
import com.okta.developer.store.config.KafkaProperties;
6
import com.okta.developer.store.domain.Store;
7
import com.okta.developer.store.service.dto.StoreAlertDTO;
8
import org.apache.kafka.clients.producer.KafkaProducer;
9
import org.apache.kafka.clients.producer.ProducerRecord;
10
import org.slf4j.Logger;
11
import org.slf4j.LoggerFactory;
12
import org.springframework.stereotype.Service;
13
 
          
14
import javax.annotation.PostConstruct;
15
import javax.annotation.PreDestroy;
16
 
          
17
@Service
18
public class AlertService {
19
 
          
20
    private final Logger log = LoggerFactory.getLogger(AlertService.class);
21
 
          
22
    private static final String TOPIC = "topic_alert";
23
 
          
24
    private final KafkaProperties kafkaProperties;
25
 
          
26
    private final static Logger logger = LoggerFactory.getLogger(AlertService.class);
27
    private KafkaProducer<String, String> producer;
28
    private final ObjectMapper objectMapper = new ObjectMapper();
29
 
          
30
    public AlertService(KafkaProperties kafkaProperties) {
31
        this.kafkaProperties = kafkaProperties;
32
    }
33
 
          
34
    @PostConstruct
35
    public void initialize(){
36
        log.info("Kafka producer initializing...");
37
        this.producer = new KafkaProducer<>(kafkaProperties.getProducerProps());
38
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
39
        log.info("Kafka producer initialized");
40
    }
41
 
          
42
    public void alertStoreStatus(Store store) {
43
        try {
44
            StoreAlertDTO storeAlertDTO = new StoreAlertDTO(store);
45
            String message = objectMapper.writeValueAsString(storeAlertDTO);
46
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
47
            producer.send(record);
48
        } catch (JsonProcessingException e) {
49
            logger.error("Could not send store alert", e);
50
            throw new AlertServiceException(e);
51
        }
52
    }
53
 
          
54
    @PreDestroy
55
    public void shutdown() {
56
        log.info("Shutdown Kafka producer");
57
        producer.close();
58
    }
59
}


Create the referenced AlertServiceException class.

Java
 




xxxxxxxxxx
1


 
1
package com.okta.developer.store.service;
2
 
          
3
public class AlertServiceException extends RuntimeException {
4
 
          
5
    public AlertServiceException(Throwable e) {
6
        super(e);
7
    }
8
}


And add a StoreAlertDTO class in the ...service.dto package.

Java
 




xxxxxxxxxx
1
31


 
1
package com.okta.developer.store.service.dto;
2
 
          
3
import com.okta.developer.store.domain.Store;
4
 
          
5
public class StoreAlertDTO {
6
 
          
7
    private String storeName;
8
    private String storeStatus;
9
 
          
10
    public StoreAlertDTO(Store store){
11
        this.storeName = store.getName();
12
        this.storeStatus = store.getStatus().name();
13
    }
14
 
          
15
    public String getStoreName() {
16
        return storeName;
17
    }
18
 
          
19
    public void setStoreName(String storeName) {
20
        this.storeName = storeName;
21
    }
22
 
          
23
    public String getStoreStatus() {
24
        return storeStatus;
25
    }
26
 
          
27
    public void setStoreStatus(String storeStatus) {
28
        this.storeStatus = storeStatus;
29
    }
30
 
          
31
}


Inject the AlertService into the StoreResource API implementation, modifying its constructor. Also modify the updateStore call to publish a StoreAlertDTO for the alert service:

Java
 




xxxxxxxxxx
1
33


 
1
@RestController
2
@RequestMapping("/api")
3
public class StoreResource {
4
 
          
5
    ...
6
    private final StoreRepository storeRepository;
7
    private final AlertService alertService;
8
 
          
9
    public StoreResource(StoreRepository storeRepository, AlertService alertService) {
10
        this.storeRepository = storeRepository;
11
        this.alertService = alertService;
12
    }
13
 
          
14
    ...
15
 
          
16
    @PutMapping("/stores")
17
    public ResponseEntity<Store> updateStore(@Valid @RequestBody Store store) throws URISyntaxException {
18
        log.debug("REST request to update Store : {}", store);
19
        if (store.getId() == null) {
20
            throw new BadRequestAlertException("Invalid id", ENTITY_NAME, "idnull");
21
        }
22
        Store result = storeRepository.save(store);
23
 
          
24
        log.debug("SEND store alert for Store: {}", store);
25
        alertService.alertStoreStatus(result);
26
 
          
27
        return ResponseEntity.ok()
28
            .headers(HeaderUtil.createEntityUpdateAlert(applicationName, true, ENTITY_NAME, store.getId().toString()))
29
            .body(result);
30
    }
31
 
          
32
   ...
33
}


Fix Integration Tests

Update the StoreResourceIT integration test to initialize the StoreResource correctly:

Java
 




xxxxxxxxxx
1
20


 
1
@SpringBootTest(classes = {StoreApp.class, TestSecurityConfiguration.class})
2
public class StoreResourceIT {
3
 
          
4
    ...
5
    @Autowired
6
    private StoreRepository storeRepository;
7
 
          
8
    @Autowired
9
    private AlertService alertService;
10
 
          
11
    ...
12
 
          
13
    @BeforeEach
14
    public void setup() {
15
        MockitoAnnotations.initMocks(this);
16
        final StoreResource storeResource = new StoreResource(storeRepository, alertService);
17
        ...
18
    }
19
    ...
20
}


Enable Debug Logging in Production

Since you are going to deploy the prod profile, let’s enable logging in production. Modify the store/src/main/java/com/okta/.../config/LoggingAspectConfiguration class:

Java
 




xxxxxxxxxx
1
10


 
1
@Configuration
2
@EnableAspectJAutoProxy
3
public class LoggingAspectConfiguration {
4
 
          
5
    @Bean
6
    @Profile({JHipsterConstants.SPRING_PROFILE_DEVELOPMENT, JHipsterConstants.SPRING_PROFILE_PRODUCTION})
7
    public LoggingAspect loggingAspect(Environment env) {
8
        return new LoggingAspect(env);
9
    }
10
}


Edit store/src/main/resources/config/application-prod.yml and change the log level to DEBUG for the store application:

Java
 




xxxxxxxxxx
1


 
1
logging:
2
  level:
3
    ROOT: INFO
4
    io.github.jhipster: INFO
5
    com.okta.developer.store: DEBUG


Add Email Service to Alert Microservice

Now let’s customize the alert microservice. First, create an EmailService to send the store update notification, using the Spring Framework’s JavaMailSender.

Java
 




xxxxxxxxxx
1
33


 
1
package com.okta.developer.alert.service;
2
 
          
3
import com.okta.developer.alert.service.dto.StoreAlertDTO;
4
import org.springframework.beans.factory.annotation.Value;
5
import org.springframework.mail.SimpleMailMessage;
6
import org.springframework.mail.javamail.JavaMailSender;
7
import org.springframework.stereotype.Service;
8
 
          
9
@Service
10
public class EmailService {
11
 
          
12
    private JavaMailSender emailSender;
13
 
          
14
    @Value("${alert.distribution-list}")
15
    private String distributionList;
16
 
          
17
    public EmailService(JavaMailSender emailSender){
18
        this.emailSender = emailSender;
19
    }
20
 
          
21
    public void sendSimpleMessage(StoreAlertDTO alertDTO){
22
        try {
23
            SimpleMailMessage message = new SimpleMailMessage();
24
            message.setTo(distributionList);
25
            message.setSubject("Store Alert: " + alertDTO.getStoreName());
26
            message.setText(alertDTO.getStoreStatus());
27
            message.setFrom("StoreAlert");
28
            emailSender.send(message);
29
        } catch (Exception exception) {
30
            throw new EmailServiceException(exception);
31
        }
32
    }
33
}


Create the referenced EmailServiceException.

Java
 




xxxxxxxxxx
1


 
1
package com.okta.developer.alert.service;
2
 
          
3
public class EmailServiceException extends RuntimeException {
4
 
          
5
    public EmailServiceException(Exception exception) {
6
        super(exception);
7
    }
8
}


Add a StoreAlertDTO class in the ...service.dto package.


Java
 




xxxxxxxxxx
1
24


 
1
package com.okta.developer.alert.service.dto;
2
 
          
3
public class StoreAlertDTO {
4
 
          
5
    private String storeName;
6
    private String storeStatus;
7
 
          
8
    public String getStoreName() {
9
        return storeName;
10
    }
11
 
          
12
    public void setStoreName(String storeName) {
13
        this.storeName = storeName;
14
    }
15
 
          
16
    public String getStoreStatus() {
17
        return storeStatus;
18
    }
19
 
          
20
    public void setStoreStatus(String storeStatus) {
21
        this.storeStatus = storeStatus;
22
    }
23
 
          
24
}


Add a new property to alert/src/main/resources/config/application.yml and to alert/src/test/resources/config/application.yml for the destination email of the store alert.

Java
 




xxxxxxxxxx
1


 
1
alert:
2
  distribution-list: {distributionListAddress}


NOTE: You’ll need to set a value for the email (e.g., list@email.com will work) in src/test/.../application.yml for tests to pass. For Docker, you’ll override the {distributionListAddress} and {username} + {password} placeholder values with environment variables below.

Update spring.mail.* properties in application-prod.yml to set Gmail as the email service:

Java
 




xxxxxxxxxx
1
13


 
1
spring:
2
  ...
3
  mail:
4
    host: smtp.gmail.com
5
    port: 587
6
    username: {username}
7
    password: {password}
8
    protocol: smtp
9
    tls: true
10
    properties.mail.smtp:
11
      auth: true
12
      starttls.enable: true
13
      ssl.trust: smtp.gmail.com


Add a Kafka Consumer to Persist Alert and Send Email

Create an AlertConsumer service to persist a StoreAlert and send the email notification when receiving an alert message through Kafka. Add KafkaProperties, StoreAlertRepository and EmailService as constructor arguments. Then add a start() method to initialize the consumer and enter the processing loop.

Java
 




xxxxxxxxxx
1
99


 
1
package com.okta.developer.alert.service;
2
 
          
3
import com.fasterxml.jackson.databind.ObjectMapper;
4
import com.okta.developer.alert.config.KafkaProperties;
5
import com.okta.developer.alert.domain.StoreAlert;
6
import com.okta.developer.alert.repository.StoreAlertRepository;
7
import com.okta.developer.alert.service.dto.StoreAlertDTO;
8
import org.apache.kafka.clients.consumer.ConsumerRecord;
9
import org.apache.kafka.clients.consumer.ConsumerRecords;
10
import org.apache.kafka.clients.consumer.KafkaConsumer;
11
import org.apache.kafka.common.errors.WakeupException;
12
import org.slf4j.Logger;
13
import org.slf4j.LoggerFactory;
14
import org.springframework.stereotype.Service;
15
 
          
16
import javax.annotation.PostConstruct;
17
import javax.annotation.PreDestroy;
18
import java.time.Duration;
19
import java.time.Instant;
20
import java.util.Collections;
21
import java.util.concurrent.ExecutorService;
22
import java.util.concurrent.Executors;
23
import java.util.concurrent.atomic.AtomicBoolean;
24
 
          
25
@Service
26
public class AlertConsumer {
27
 
          
28
    private final Logger log = LoggerFactory.getLogger(AlertConsumer.class);
29
 
          
30
    private final AtomicBoolean closed = new AtomicBoolean(false);
31
 
          
32
    public static final String TOPIC = "topic_alert";
33
 
          
34
    private final KafkaProperties kafkaProperties;
35
 
          
36
    private KafkaConsumer<String, String> kafkaConsumer;
37
 
          
38
    private StoreAlertRepository storeAlertRepository;
39
 
          
40
    private EmailService emailService;
41
 
          
42
    private ExecutorService executorService = Executors.newCachedThreadPool();
43
 
          
44
    public AlertConsumer(KafkaProperties kafkaProperties, StoreAlertRepository storeAlertRepository, EmailService emailService) {
45
        this.kafkaProperties = kafkaProperties;
46
        this.storeAlertRepository = storeAlertRepository;
47
        this.emailService = emailService;
48
    }
49
 
          
50
    @PostConstruct
51
    public void start() {
52
 
          
53
        log.info("Kafka consumer starting...");
54
        this.kafkaConsumer = new KafkaConsumer<>(kafkaProperties.getConsumerProps());
55
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
56
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC));
57
        log.info("Kafka consumer started");
58
 
          
59
        executorService.execute(() -> {
60
            try {
61
                while (!closed.get()) {
62
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(3));
63
                    for (ConsumerRecord<String, String> record : records) {
64
                        log.info("Consumed message in {} : {}", TOPIC, record.value());
65
 
          
66
                        ObjectMapper objectMapper = new ObjectMapper();
67
                        StoreAlertDTO storeAlertDTO = objectMapper.readValue(record.value(), StoreAlertDTO.class);
68
                        StoreAlert storeAlert = new StoreAlert();
69
                        storeAlert.setStoreName(storeAlertDTO.getStoreName());
70
                        storeAlert.setStoreStatus(storeAlertDTO.getStoreStatus());
71
                        storeAlert.setTimestamp(Instant.now());
72
                        storeAlertRepository.save(storeAlert);
73
 
          
74
                        emailService.sendSimpleMessage(storeAlertDTO);
75
                    }
76
                }
77
                kafkaConsumer.commitSync();
78
            } catch (WakeupException e) {
79
                // Ignore exception if closing
80
                if (!closed.get()) throw e;
81
            } catch (Exception e) {
82
                log.error(e.getMessage(), e);
83
            } finally {
84
                log.info("Kafka consumer close");
85
                kafkaConsumer.close();
86
            }
87
        });
88
    }
89
 
          
90
    public KafkaConsumer<String, String> getKafkaConsumer() {
91
        return kafkaConsumer;
92
    }
93
 
          
94
    public void shutdown() {
95
        log.info("Shutdown Kafka consumer");
96
        closed.set(true);
97
        kafkaConsumer.wakeup();
98
    }
99
}


NOTE: Any unhandled exception during message processing will make the service leave the consumer group. That’s why there’s code above that catches Exception.

As a last customization step, update the logging configuration the same way you did for the store microservice.

Microservices + Kafka Container Deployment

Modify docker-compose/docker-compose.yml and add the following environment variables for the alert-app application:

Java
 




xxxxxxxxxx
1


 
1
- SPRING_MAIL_USERNAME=${MAIL_USERNAME}
2
- SPRING_MAIL_PASSWORD=${MAIL_PASSWORD}
3
- ALERT_DISTRIBUTION_LIST=${DISTRIBUTION_LIST}


Edit docker-compose/.env and add values for the new environment variables:

Java
 




xxxxxxxxxx
1


 
1
MAIL_USERNAME={yourGmailAccount}
2
MAIL_PASSWORD={yourPassword}
3
DISTRIBUTION_LIST={anotherEmailAccount}


Make sure Docker Desktop is running, then generate the Docker image for the store microservice. Run the following command from the store directory.


Java
 




xxxxxxxxxx
1


 
1
./mvnw -ntp -Pprod verify jib:dockerBuild


Repeat for the alert and gateway apps.

Before you run your microservices architecture, make sure you have enough RAM allocated. Docker Desktop’s default is 2GB, I recommend 8GB. This setting is under Docker > Resources > Advanced.

Then, run everything using Docker Compose:

Shell
 




xxxxxxxxxx
1


 
1
cd docker-compose
2
docker-compose up


You will see a huge amount of logging while each service starts. Wait a minute or two, then open http://localhost:8761 and log in with your Okta account. This is the JHipster Registry which you can use to monitor your apps’ statuses. Wait for all the services to be up.JHipster Registry

Open a new terminal window and tail the alert microservice logs to verify it’s processing StoreAlert records:

Java
 




xxxxxxxxxx
1


 
1
docker exec -it docker-compose_alert-app_1 /bin/bash
2
tail -f /tmp/spring.log


You should see log entries indicating the consumer group to which the alert microservice joined on startup:

Java
 




xxxxxxxxxx
1


 
1
2020-01-22 03:12:08.186  INFO 1 --- [Thread-7] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=alert] (Re-)joining group
2
2020-01-22 03:12:08.215  INFO 1 --- [Thread-7] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=alert] (Re-)joining group
3
2020-01-22 03:12:11.237  INFO 1 --- [Thread-7] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=alert] Successfully joined group with generation 1


Once everything is up, go to the gateway at http://localhost:8080 and log in. Create a store entity and then update it. The alert microservice should log entries when processing the received message from the store service.

Java
 




xxxxxxxxxx
1


 
1
2020-01-22 03:18:26.528  INFO 1 --- [Thread-7] c.o.d.alert.service.AlertConsumer   : Consumed message in topic_alert : {"storeName":"Zara","storeStatus":"CLOSED"}
2
2020-01-22 03:18:26.664 DEBUG 1 --- [Thread-7] c.o.d.alert.aop.logging.LoggingAspect    : Enter: com.okta.developer.alert.service.EmailService.sendSimpleMessage() with argument[s] = [com.okta.developer.alert.service.dto.StoreAlertDTO@13038372]


If you see a MailAuthenticationException in the alert microservices log, when attempting to send the notification, it might be your Gmail security configuration.

Java
 




xxxxxxxxxx
1


 
1
alert-app_1           | org.springframework.mail.MailAuthenticationException: Authentication failed; nested exception is javax.mail.AuthenticationFailedException: 535-5.7.8 Username and Password not accepted. Learn more at
2
alert-app_1           | 535 5.7.8  https://support.google.com/mail/?p=BadCredentials *** - gsmtp
3
alert-app_1           |
4
alert-app_1           |     at org.springframework.mail.javamail.JavaMailSenderImpl.doSend(JavaMailSenderImpl.java:440)


To enable the login from the alert application, go to https://myaccount.google.com/lesssecureapps and allow less secure applications. This is required because the alert application is unknown to Google and sign-on is blocked for third-party applications that don’t meet Google security standards.

IMPORTANT: Don’t forget to turn off Less secure app access once you finish the test.

Restart the alert microservice:

Shell
 




xxxxxxxxxx
1


 
1
docker restart docker-compose_alert-app_1
2
 
          


Update a store again and you should receive an email with the store’s status this time.

In this tutorial, authentication (of producers and consumers), authorization (of read/write operations), and encryption (of data) were not covered, as security in Kafka is optional. See Kafka’s documentation on security to learn how to enable these features.

Learn More About Kafka and Microservices

This tutorial showed how a Kafka-centric architecture allows decoupling microservices to simplify the design and development of distributed systems. To continue learning about these topics check out the following links:

  • JHipster: Using Kafka
  • JHipster: OAuth2 and OpenID Connect
  • Apache Kafka Introduction

There are also a few tutorials on Kafka and microservices that you might enjoy on this blog:

  • Kafka with Java: Build a Secure, Scalable Messaging App
  • Java Microservices with Spring Cloud Config and JHipster
  • Secure Reactive Microservices with Spring Cloud Gateway

You can find all the code for this tutorial on GitHub.

Please follow us @oktadev on Twitter for more tutorials like this one. We also have a YouTube channel where we frequently publish videos.

kafka microservice application Database Java (programming language) Docker (software) Spring Cloud Spring Framework JHipster Web Service

Published at DZone with permission of Jimena Garbarino, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • 7 Microservices Best Practices for Developers
  • How Kafka Can Make Microservice Planet a Better Place
  • Microservices With JHipster
  • Spring Reactive Microservices: A Showcase

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!