DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Containerization and Helm Templatization Best Practices for Microservices in Kubernetes
  • Manage Microservices With Docker Compose
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)
  • Using Kong Ingress Controller with Spring Boot Services

Trending

  • Architecting an Embedded Efficiency Layer: A Platform Deep Dive into Day-Two Operational Tuning
  • A Walk-Through of the DZone Article Editor
  • How to Write for DZone Publications: Trend Reports and Refcards
  • A Deep Dive into Tracing Agentic Workflows (Part 1)
  1. DZone
  2. Testing, Deployment, and Maintenance
  3. Deployment
  4. Messaging With Spring Boot and Azure Service Bus

Messaging With Spring Boot and Azure Service Bus

Moving to the cloud? Working on a project using event-based messaging? Using Kotlin to do it? Time to dig into some beans on the Azure Service Bus.

By 
Thomas Sørensen user avatar
Thomas Sørensen
·
Updated Dec. 18, 20 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
20.1K Views

Join the DZone community and get the full member experience.

Join For Free

Like many other companies, my employer is currently on a journey towards a cloud-based infrastructure. We have decided to go with Azure as our cloud provider, and my team and I are the lucky ones to be in the front row to deploy our project to the cloud. Our project consists primarily of Spring Boot based microservices written in Kotlin that are hosted on a Kubernetes cluster.  

We have recently had to implement features that would benefit from event-based messaging using topics, which brought the opportunity to dig into Azure Service Bus. Luckily, Microsoft has created a Spring Boot Starter package that can communicate with Azure Service Bus using JMS and the AMQP messaging protocol. How difficult can that be? Going from zero to something that connects and sends a couple of messages is quite easy. Still, I have discovered some pitfalls that should be avoided before the application hits the production environment that isn’t covered by the Azure Service Bus JMS Spring Boot Starter client library documentation.

Transactions

When an application is consuming messages from a topic, it is most likely because it is important to that application. The application might need to react somehow – like saving something to a database, call another application, or something else. If the application fails to process the message, maybe because the database or the other application is unable to respond, the use of transactions often comes in handy.

Unfortunately, when using the topicJmsListenerContainerFactory bean provided by the azure-spring-boot package (the package providing autoconfiguration of Spring JMS for Azure Service Bus), you are relying on the default configuration of the JmsAccessor interface provided by Spring Framework, where the default transaction mode is false.

This, combined with the default acknowledge mode AUTO_ACKNOWLEDGE, leaves you without support for message re-delivery when delivery of a message or message group fails, despite having a “max delivery count” greater than 1 configured on the service bus subscription.

Kotlin code snippet

The default JmsListenerContainerFactory implementation, provided by the azure-spring-boot package.

With a transacted session, the application completely controls the message delivery by either committing to (when the receiver function returns successfully) or rolling back (if the receiver function throws an unhandled exception) the session.

Enabling local transactions is not enough, though, if you need message re-delivery in case of an exception. The topicJmsListenerContainerFactory bean provided by the azure-spring-boot package creates listener container instances of the type DefaultMessageListenerContainer, which applies message acknowledgment before listener execution when the default AUTO_ACKNOWLEDGE mode is used. This means that if the message listener throws an exception during message receipt or message processing, the message is not handed back to the broker because the message has already been acknowledged. 

Therefore, session acknowledge mode CLIENT_ACKNOWLEDGE should be used on the container factory, which provides re-delivery in case of an exception.

Instead of using the topicJmsListenerContainerFactory bean provided by the azure-spring-boot package, we can define our own:

Kotlin
 




xxxxxxxxxx
1
13


 
1
@Bean
2
@Primary
3
fun transactedListenerContainerFactory(
4
      connectionFactory: ConnectionFactory
5
): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
6
    val listenerContainerFactory = DefaultJmsListenerContainerFactory()
7
    listenerContainerFactory.setConnectionFactory(connectionFactory)
8
    listenerContainerFactory.setSubscriptionDurable(true)
9
    listenerContainerFactory.setSessionTransacted(true)
10
    listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
11

          
12
    return listenerContainerFactory
13
}



We will then need to change the value of the containerFactory attribute of the @JmsListener, to the name of the JmsListenerContainerFactory bean we have just created.

Kotlin
 




xxxxxxxxxx
1


 
1
@JmsListener(
2
      destination = "message-topic",
3
      subscription = "consumer-service",
4
      containerFactory = "transactedListenerContainerFactory"
5
)
6
fun receiveMessage(msg: TextMessage) {
7
    log.info("Message received: ${msg.text}")
8
}



Slow Consumers and the Default Prefetch Policy

After a lot of testing and bugfixing, we thought that we had fixed why many of the messages on the topic ended up in the DeadLetter Queue. Unfortunately, many messages were still ending up in the DeadLetter Queue, but we didn’t see any signs of errors or re-deliveries in the logs.

Suppose we take a closer look at the javax.jms.ConnectionFactory bean provided by the azure-spring-boot package, we’ll find out that the bean definition relies heavily on the default JmsConnectionFactory implementation from the Apache QPID library, which is completely fine in most cases. Still, because we had a slow consumer, the default prefetch policy was causing some trouble.

ConnectionFactory bean, provided by the azure-spring-boot package

ConnectionFactory bean, provided by the azure-spring-boot package

The default prefetch policy defines a prefetch size of 1000. This means that our application will always ask for chunks of 1000 messages to reduce the overhead of message delivery by avoiding unnecessary round trips to the Service Bus. Chunks of 1000 messages will then get processed by the application, but the messages are not acknowledged before they have been processed. 

The late acknowledgment combined with the default message lock duration of 30 seconds (a property of the service bus subscription on Azure) can cause the messages to end up in the DLQ if the application fails to process and acknowledge all messages before the lock duration has expired.

Instead of using the default prefetch policy defined in the Apache QPID library, we can define our own.

Kotlin
 




xxxxxxxxxx


 
1
@Bean
2
@Primary
3
fun transactedListenerContainerFactory(
4
      connectionFactory: ConnectionFactory
5
): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
6
    ((connectionFactory as CachingConnectionFactory).targetConnectionFactory as JmsConnectionFactory)
7
        .prefetchPolicy = prefetchPolicy()
8

          
9
    val listenerContainerFactory = DefaultJmsListenerContainerFactory()
10
    listenerContainerFactory.setConnectionFactory(connectionFactory)
11
    listenerContainerFactory.setSubscriptionDurable(true)
12
    listenerContainerFactory.setSessionTransacted(true)
13
    listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
14

          
15
    return listenerContainerFactory
16
}
17
    
18
private fun prefetchPolicy(): JmsPrefetchPolicy {
19
    val prefetchPolicy = JmsDefaultPrefetchPolicy()
20
    prefetchPolicy.setAll(100)
21

          
22
    return prefetchPolicy
23
}



Notice how the prefetchPolicy property is overwritten on the JmsConnectionFactory. When doing it this way, we’re still using the ConnectionFactory bean provided by the azure-spring-boot package, thus avoiding having to manually provide the connection settings that otherwise are provided by the azure-spring-boot package.

Conclusion

In this article, I described some of the pitfalls one can encounter when using the default JMS settings provided by the azure-spring-boot Java package, along with a possible solution to each of those.

The complete code examples used in this article are available on GitHub.

Spring Framework Spring Boot microservice azure application Kubernetes

Opinions expressed by DZone contributors are their own.

Related

  • Containerization and Helm Templatization Best Practices for Microservices in Kubernetes
  • Manage Microservices With Docker Compose
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)
  • Using Kong Ingress Controller with Spring Boot Services

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook