{{announcement.body}}
{{announcement.title}}

Reactive Messaging Examples for Quarkus

DZone 's Guide to

Reactive Messaging Examples for Quarkus

Quarkus provides different reactive messaging capabilities. Cloud-native-starter uses some of these capabilities in this sample application!

· Microservices Zone ·
Free Resource

Quarkus provides several different reactive messaging capabilities. The open-source project cloud-native-starter uses some of these capabilities in a sample application which is described in this article.

There a several easy to follow Quarkus guides about the topic 'reactive messaging'. Tutorials are great, but the best way to learn new technologies is for me is to use them in simple applications. That's why I've come up with a simple scenario.

You may also like: How to Write Reactive Applications With MicroProfile

Sample Application

The sample comes with a web application that displays links to articles with author information in a simple web application. The web application invokes the web-API service which implements a backend-for-frontend pattern and invokes the articles and authors microservices. The articles service store data in a Postgres database. Messages are sent between the microservices via Kafka. This diagram describes the high-level architecture:

kubernetes chart

One benefit of reactive models is the ability to update web applications by sending messages, rather than pulling for updates. This is more efficient and improves the user experience.

The following video shows how articles can be created via the REST API. The web application receives notifications and adds new articles to the page.

cloud native starter

The next diagram explains the flow which I'll go through in more detail in this article.

kubernetes clients

  1. The 'submission' API client invokes a REST endpoint of the 'articles' service to create a new article.
  2. After the article has been created, a message is sent to Kafka.
  3. The 'web-API' service has subscribed to Kafka messages so that a listener is invoked.
  4. When new articles are created, events are streamed to the web-app.

Let's take a closer look at the used technologies.

Sending In-Memory Messages via Vert.X Event Bus

The 'articles' and the 'web-API' service have been implemented in Java with Quarkus. In both cases, I have used a clean architecture approach where the code of the microservice is organized into three packages. These packages are rather independent of each other and could be exchanged with other implementations.

  1. API: Contains the REST endpoints and handles incoming and outgoing messages.
  2. Business: Contains the business logic of the microservice and business entities.
  3. Data: Contains the code to access databases or other microservices.

After a new article has been stored in the Postgres database, a message is sent to Kafka. This is triggered by the business logic, but the actual code resides in the API layer. That's why the business layer needs to send the message to the API layer first.

Quarkus provides a mechanism for beans to interact via asynchronous messages by enforcing loose-coupling. Check out the guide Asynchronous messaging between beans. This functionality is provided via Eclipse Vert.x which comes with Quarkus.

Here is the code that sends the event in memory to the API layer:

Java




x


1
import io.vertx.axle.core.eventbus.EventBus; ... @Inject EventBus bus; ... 
2
  private void sendMessageToKafka(Article article) { 
3
  bus.publish("com.ibm.articles.apis.NewArticleCreatedListener", article.id); }



In the API layer the event can be consumed (see code):

Java




xxxxxxxxxx
1


1
import io.quarkus.vertx.ConsumeEvent; ... @ConsumeEvent public void 
2
sendMessageToKafka(String articleId) { ... }



Eclipse MicroProfile supports another mechanism for memory messages. The reason why I didn't use it, in this case, was that I didn't get it to work. For me, the @Outgoing annotation only worked on methods that are either triggered by an incoming event or by the platform (e.g. @PostConstruct). In my case, I have to trigger this functionality from business logic. I'm not sure whether this is a missing feature in MircoProfile, a defect or user error. I'm trying to find this out.

The documentation mentions another reason when you should use the Vert.x event bus: "The asynchronous message passing feature allows replying to messages which are not supported by MicroProfile Reactive Messaging. However, it is limited to single-event behavior (no stream) and local messages."

Sending Kafka Messages via Kafka API

Next, the API layer of the 'web-API' service needs to send the message to Kafka. To set up Kafka in Kubernetes, follow the instructions from my previous article Accessing Apache Kafka from Quarkus.

Eclipse MicroProfile Reactive Messaging provides the same @Outgoing annotation to do this, but I couldn't get it to work since I have to trigger this functionality manually. I want to find out the reason for this as well.

As a workaround, I used the Kafka API instead. The usage is pretty straight forward. Unfortunately, it looks like the configuration is not read from the same 'application.properties' file which MicroProfile uses. Instead, I had to do this in the code:

Java




xxxxxxxxxx
1
10


 
1
import io.vertx.core.Vertx; import 
2
  io.vertx.kafka.client.producer.KafkaProducer; ... @Inject Vertx vertx; 
3
@ConfigProperty(name = "kafka.bootstrap.servers") String kafkaBootstrapServer; 
4
KafkaProducer<String, String> producer; ... @PostConstruct void 
5
initKafkaClient() { Map<String, String> config = new HashMap<>(); 
6
config.put("bootstrap.servers", kafkaBootstrapServer); 
7
config.put("key.serializer", 
8
"org.apache.kafka.common.serialization.StringSerializer"); 
9
config.put("value.serializer", 
10
"org.apache.kafka.common.serialization.StringSerializer"); producer = 
11
KafkaProducer.create(vertx, config); } ... @ConsumeEvent public void 
12
sendMessageToKafka(String articleId) { try { KafkaProducerRecord<String, 
13
String> record = KafkaProducerRecord.create("new-article-created", articleId); 
14
producer.write(record, done -> System.out.println("Kafka message sent: new-
15
article-created - " + articleId)); } catch (Exception e) { // allow to run this 
16
functionality if Kafka hasn't been set up } } Sending and receiving messages 
17
via MicroProfile
16
functionality if Kafka hasn't been set up } } Sending and receiving messages via MicroProfile



Next, the 'web-API' service needs to receive this message from Kafka. This part can be implemented very easily with MicroProfile Reactive Messaging via the annotation @Incoming. Here is the code:

Java







In the next step notifications about new articles need to be streamed to the web application. To do this, the event is forwarded to the streaming endpoint via the MicroProfile annotations @Outgoing and @Broadcast.

Check out the guide Using Apache Kafka with reactive messaging for more information about these annotations. From a developer experience, this is as easy as it can get. I like especially that the same annotations can be used for Kafka channels as well as in-memory messaging.

Sending Events to Web Applications via Server-Sent Events

The last step is to stream the events to web applications. This is done via Server-Sent Events and with Quarkus very easy to implement. The streaming endpoint receives the messages via @Channel and forwards them via @Produces(MediaType.SERVER_SENT_EVENTS) and @SseElementType (see code):

Java




x


 
1
import org.reactivestreams.Publisher; import 
2
  io.smallrye.reactive.messaging.annotations.Channel; import 
3
    org.jboss.resteasy.annotations.SseElementType; ... public class 
4
      NewArticlesStream { @Inject @Channel("stream-new-article") 
5
                         Publisher<String> newArticles; @GET 
6
                         @Path("/server-sent-events") 
7
                         @Produces(MediaType.SERVER_SENT_EVENTS) 
8
                         @SseElementType("text/plain") public Publisher<String> 
9
                          stream() { return newArticles; } }



In the web application, the events can be consumed via EventSource. In my case, I only send the id of the article and refresh the list of displayed articles (see code). Alternatively, I also could send the complete article information in the event.

Java




xxxxxxxxxx
1


1
let source = new EventSource(this.$store.state.endpoints.api + "server-sent-events"); let that = this; source.onmessage = function (event) { that.readArticles(); }; Next Steps



If you want to learn more about reactive programming and reactive messaging, try out the code yourself.


Further Reading

Reactive Programming

Spring Reactive Programming in Java

The Reactive Universe for Java Devs

Topics:
microservices ,quarkus ,sample application ,reactive ,reactive messaging ,reactive applications

Published at DZone with permission of Niklas Heidloff , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}