{{announcement.body}}
{{announcement.title}}

Reactive Messaging Examples for Quarkus

DZone 's Guide to

Reactive Messaging Examples for Quarkus

The articles service stores data in a Postgres database. Messages are sent between the microservices via Kafka.

· Big Data Zone ·
Free Resource

Quarkus provides several different reactive messaging capabilities. The open-source project cloud-native-starter uses some of these capabilities in a sample application which is described in this article.

There a several easy to follow Quarkus guides about the topic ‘reactive messaging’. Tutorials are great, but the best way to learn new technologies is for me is to use them in simple applications. That’s why I’ve come up with a simple scenario.

Sample Application

The sample comes with a web application that displays links to articles with author information in a simple web application. The web application invokes the web-API service, which implements a backend-for-frontend pattern and invokes the articles' and authors' microservices. The articles service stores data in a Postgres database. Messages are sent between the microservices via Kafka. This diagram describes the high-level architecture:

clients and kubernetes

One benefit of reactive models is the ability to update web applications by sending messages, rather than pulling for updates. This is more efficient and improves the user experience.

The following video shows how articles can be created via the REST API. The web application receives notifications and adds new articles to the page.

The next diagram explains the flow which I’ll go through in more detail in this article.

clients and kubernetes

  1. The ‘submission’ API client invokes a REST endpoint of the ‘articles’ service to create a new article.
  2. After the article has been created, a message is sent to Kafka.
  3. The ‘web-API’ service has subscribed to Kafka messages so that a listener is invoked.
  4. When new articles are created, events are streamed to the web-app.

Let’s take a closer look at the used technologies.

Sending In-Memory Messages via Vert.X Event Bus

The ‘articles’ and the ‘web-API’ service have been implemented in Java with Quarkus. In both cases, I have used a clean architecture approach where the code of the microservice is organized into three packages. These packages are rather independent of each other and could be exchanged with other implementations.

  1. API: Contains the REST endpoints and handles incoming and outgoing messages.
  2. Business: Contains the business logic of the microservice and business entities.
  3. Data: Contains the code to access databases or other microservices.

After a new article has been stored in the Postgres database, a message is sent to Kafka. This is triggered by the business logic, but the actual code resides in the API layer. That’s why the business layer needs to send the message to the API layer first.

Quarkus provides a mechanism for beans to interact via asynchronous messages by enforcing loose-coupling. Check out the guide Asynchronous messaging between beans. This functionality is provided via Eclipse Vert.x which comes with Quarkus.

Here is the code that sends the event in memory to the API layer:

Java
 




x
15


 
1
import io.vertx.axle.core.eventbus.EventBus;
2
 
          
3
...
4
 
          
5
@Inject
6
 
          
7
EventBus bus;
8
 
          
9
...
10
 
          
11
private void sendMessageToKafka(Article article) {
12
 
          
13
  bus.publish("com.ibm.articles.apis.NewArticleCreatedListener", article.id);
14
 
          
15
}



In the API layer the event can be consumed (see code):

Java
 




xxxxxxxxxx
1
11


 
1
import io.quarkus.vertx.ConsumeEvent;
2
 
          
3
...
4
 
          
5
@ConsumeEvent
6
 
          
7
public void sendMessageToKafka(String articleId) {
8
 
          
9
...
10
 
          
11
}



Eclipse MicroProfile supports another mechanism for in-memory messages. The reason why I didn’t use it, in this case, was that I didn’t get it to work. For me, the @Outgoing annotation only worked on methods that are either triggered by an incoming event or by the platform (e.g. @PostConstruct).

In my case, I have to trigger this functionality from business logic. I’m not sure whether this is a missing feature in MircoProfile, a defect or user error. I’m trying to find this out.

The documentation mentions another reason when you should use the Vert.x event bus: “The asynchronous message passing feature allows replying to messages which are not supported by MicroProfile Reactive Messaging. However, it is limited to single-event behavior (no stream) and local messages.”

Sending Kafka Messages via Kafka API

Next, the API layer of the ‘web-API’ service needs to send the message to Kafka. To set up Kafka in Kubernetes, follow the instructions from my previous article Accessing Apache Kafka from Quarkus.

Eclipse MicroProfile Reactive Messaging provides the same @Outgoing annotation to do this, but I couldn’t get it to work since I have to trigger this functionality manually. I want to find out the reason for this as well.

As a workaround, I used the Kafka API instead. The usage is pretty straight forward. Unfortunately, it looks like the configuration is not read from the same ‘application.properties’ file which MicroProfile uses. Instead, I had to do this in the code:

Java
 




xxxxxxxxxx
1
57


 
1
import io.vertx.core.Vertx;
2
 
          
3
import io.vertx.kafka.client.producer.KafkaProducer;
4
 
          
5
...
6
 
          
7
@Inject
8
 
          
9
Vertx vertx;
10
 
          
11
 
12
 
          
13
@ConfigProperty(name = "kafka.bootstrap.servers")
14
 
          
15
String kafkaBootstrapServer;
16
 
          
17
 
18
 
          
19
KafkaProducer<String, String> producer;
20
 
          
21
...
22
 
          
23
@PostConstruct
24
 
          
25
void initKafkaClient() {
26
 
          
27
  Map<String, String> config = new HashMap<>();
28
 
          
29
  config.put("bootstrap.servers", kafkaBootstrapServer);
30
 
          
31
  config.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
32
 
          
33
  config.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
34
 
          
35
  producer = KafkaProducer.create(vertx, config);
36
 
          
37
}
38
 
          
39
...
40
 
          
41
@ConsumeEvent
42
 
          
43
public void sendMessageToKafka(String articleId) {
44
 
          
45
  try {
46
 
          
47
    KafkaProducerRecord<String, String> record = KafkaProducerRecord.create("new-article-created", articleId);
48
 
          
49
    producer.write(record, done -> System.out.println("Kafka message sent: new-article-created - " + articleId));
50
 
          
51
  } catch (Exception e) {
52
 
          
53
    // allow to run this functionality if Kafka hasn't been set up
54
 
          
55
  }
56
 
          
57
}



Sending and Receiving Messages via MicroProfile

Next, the ‘web-API’ service needs to receive this message from Kafka. This part can be implemented very easily with MicroProfile Reactive Messaging via the annotation @Incoming. Here is the code:

Java
 




x
21


 
1
import org.eclipse.microprofile.reactive.messaging.Incoming;
2
 
          
3
import org.eclipse.microprofile.reactive.messaging.Outgoing;
4
 
          
5
import io.smallrye.reactive.messaging.annotations.Broadcast;
6
 
          
7
...
8
 
          
9
@Incoming("new-article-created")
10
 
          
11
@Outgoing("stream-new-article")
12
 
          
13
@Broadcast
14
 
          
15
public String process(String articleId) {
16
 
          
17
   System.out.println("Kafka message received: new-article-created - " + articleId);
18
 
          
19
   return articleId;
20
 
          
21
}



In the next step notifications about new articles need to be streamed to the web application. To do this, the event is forwarded to the streaming endpoint via the MicroProfile annotations @Outgoing and @Broadcast.

Check out the guide Using Apache Kafka with reactive messaging for more information about these annotations. From a developer experience, this is as easy as it can get. I like especially that the same annotations can be used for Kafka channels as well as in-memory messaging.

Sending Events to Web Applications via Server-Sent Events

The last step is to stream the events to web applications. This is done via Server-Sent Events and with Quarkus very easy to implement. The streaming endpoint receives the messages via @Channel and forwards them via @Produces(MediaType.SERVER_SENT_EVENTS) and @SseElementType (see code):

Java
 




xxxxxxxxxx
1
31


 
1
import org.reactivestreams.Publisher;
2
 
          
3
import io.smallrye.reactive.messaging.annotations.Channel;
4
 
          
5
import org.jboss.resteasy.annotations.SseElementType;
6
 
          
7
...
8
 
          
9
public class NewArticlesStream
10
 
          
11
   @Inject
12
 
          
13
   @Channel("stream-new-article") Publisher<String> newArticles;
14
 
          
15
 
16
 
          
17
   @GET
18
 
          
19
   @Path("/server-sent-events")
20
 
          
21
   @Produces(MediaType.SERVER_SENT_EVENTS
22
 
          
23
   @SseElementType("text/plain"
24
 
          
25
   public Publisher<String> stream() { 
26
 
          
27
      return newArticles;
28
 
          
29
   }
30
 
          
31
}



In the web application, the events can be consumed via EventSource. In my case, I only send the id of the article and refresh the list of displayed articles (see code). Alternatively, I also could send the complete article information in the event.

Java
 




xxxxxxxxxx
1


 
1
let source = new EventSource(this.$store.state.endpoints.api + "server-sent-events");
2
 
          
3
let that = this;
4
 
          
5
source.onmessage = function (event) {
6
 
          
7
   that.readArticles();
8
 
          
9
};



Next Steps

If you want to learn more about reactive programming and reactive messaging, try out the code yourself.

Topics:
big data, cloud, java, kafka, microservices, quarkus, reactive messaging

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}