DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Containerization and Helm Templatization Best Practices for Microservices in Kubernetes
  • Why Camel K?
  • Manage Microservices With Docker Compose
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)

Trending

  • Using Python Libraries in Java
  • Tired of Spring Overhead? Try Dropwizard for Your Next Java Microservice
  • Enforcing Architecture With ArchUnit in Java
  • How To Build Resilient Microservices Using Circuit Breakers and Retries: A Developer’s Guide To Surviving
  1. DZone
  2. Testing, Deployment, and Maintenance
  3. Deployment
  4. Messaging With Spring Boot and Azure Service Bus

Messaging With Spring Boot and Azure Service Bus

Moving to the cloud? Working on a project using event-based messaging? Using Kotlin to do it? Time to dig into some beans on the Azure Service Bus.

By 
Thomas Sørensen user avatar
Thomas Sørensen
·
Updated Dec. 18, 20 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
19.1K Views

Join the DZone community and get the full member experience.

Join For Free

Like many other companies, my employer is currently on a journey towards a cloud-based infrastructure. We have decided to go with Azure as our cloud provider, and my team and I are the lucky ones to be in the front row to deploy our project to the cloud. Our project consists primarily of Spring Boot based microservices written in Kotlin that are hosted on a Kubernetes cluster.  

We have recently had to implement features that would benefit from event-based messaging using topics, which brought the opportunity to dig into Azure Service Bus. Luckily, Microsoft has created a Spring Boot Starter package that can communicate with Azure Service Bus using JMS and the AMQP messaging protocol. How difficult can that be? Going from zero to something that connects and sends a couple of messages is quite easy. Still, I have discovered some pitfalls that should be avoided before the application hits the production environment that isn’t covered by the Azure Service Bus JMS Spring Boot Starter client library documentation.

Transactions

When an application is consuming messages from a topic, it is most likely because it is important to that application. The application might need to react somehow – like saving something to a database, call another application, or something else. If the application fails to process the message, maybe because the database or the other application is unable to respond, the use of transactions often comes in handy.

Unfortunately, when using the topicJmsListenerContainerFactory bean provided by the azure-spring-boot package (the package providing autoconfiguration of Spring JMS for Azure Service Bus), you are relying on the default configuration of the JmsAccessor interface provided by Spring Framework, where the default transaction mode is false.

This, combined with the default acknowledge mode AUTO_ACKNOWLEDGE, leaves you without support for message re-delivery when delivery of a message or message group fails, despite having a “max delivery count” greater than 1 configured on the service bus subscription.

Kotlin code snippet

The default JmsListenerContainerFactory implementation, provided by the azure-spring-boot package.

With a transacted session, the application completely controls the message delivery by either committing to (when the receiver function returns successfully) or rolling back (if the receiver function throws an unhandled exception) the session.

Enabling local transactions is not enough, though, if you need message re-delivery in case of an exception. The topicJmsListenerContainerFactory bean provided by the azure-spring-boot package creates listener container instances of the type DefaultMessageListenerContainer, which applies message acknowledgment before listener execution when the default AUTO_ACKNOWLEDGE mode is used. This means that if the message listener throws an exception during message receipt or message processing, the message is not handed back to the broker because the message has already been acknowledged. 

Therefore, session acknowledge mode CLIENT_ACKNOWLEDGE should be used on the container factory, which provides re-delivery in case of an exception.

Instead of using the topicJmsListenerContainerFactory bean provided by the azure-spring-boot package, we can define our own:

Kotlin
 




xxxxxxxxxx
1
13


 
1
@Bean
2
@Primary
3
fun transactedListenerContainerFactory(
4
      connectionFactory: ConnectionFactory
5
): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
6
    val listenerContainerFactory = DefaultJmsListenerContainerFactory()
7
    listenerContainerFactory.setConnectionFactory(connectionFactory)
8
    listenerContainerFactory.setSubscriptionDurable(true)
9
    listenerContainerFactory.setSessionTransacted(true)
10
    listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
11

          
12
    return listenerContainerFactory
13
}



We will then need to change the value of the containerFactory attribute of the @JmsListener, to the name of the JmsListenerContainerFactory bean we have just created.

Kotlin
 




xxxxxxxxxx
1


 
1
@JmsListener(
2
      destination = "message-topic",
3
      subscription = "consumer-service",
4
      containerFactory = "transactedListenerContainerFactory"
5
)
6
fun receiveMessage(msg: TextMessage) {
7
    log.info("Message received: ${msg.text}")
8
}



Slow Consumers and the Default Prefetch Policy

After a lot of testing and bugfixing, we thought that we had fixed why many of the messages on the topic ended up in the DeadLetter Queue. Unfortunately, many messages were still ending up in the DeadLetter Queue, but we didn’t see any signs of errors or re-deliveries in the logs.

Suppose we take a closer look at the javax.jms.ConnectionFactory bean provided by the azure-spring-boot package, we’ll find out that the bean definition relies heavily on the default JmsConnectionFactory implementation from the Apache QPID library, which is completely fine in most cases. Still, because we had a slow consumer, the default prefetch policy was causing some trouble.

ConnectionFactory bean, provided by the azure-spring-boot package

ConnectionFactory bean, provided by the azure-spring-boot package

The default prefetch policy defines a prefetch size of 1000. This means that our application will always ask for chunks of 1000 messages to reduce the overhead of message delivery by avoiding unnecessary round trips to the Service Bus. Chunks of 1000 messages will then get processed by the application, but the messages are not acknowledged before they have been processed. 

The late acknowledgment combined with the default message lock duration of 30 seconds (a property of the service bus subscription on Azure) can cause the messages to end up in the DLQ if the application fails to process and acknowledge all messages before the lock duration has expired.

Instead of using the default prefetch policy defined in the Apache QPID library, we can define our own.

Kotlin
 




xxxxxxxxxx


 
1
@Bean
2
@Primary
3
fun transactedListenerContainerFactory(
4
      connectionFactory: ConnectionFactory
5
): JmsListenerContainerFactory<DefaultMessageListenerContainer> {
6
    ((connectionFactory as CachingConnectionFactory).targetConnectionFactory as JmsConnectionFactory)
7
        .prefetchPolicy = prefetchPolicy()
8

          
9
    val listenerContainerFactory = DefaultJmsListenerContainerFactory()
10
    listenerContainerFactory.setConnectionFactory(connectionFactory)
11
    listenerContainerFactory.setSubscriptionDurable(true)
12
    listenerContainerFactory.setSessionTransacted(true)
13
    listenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
14

          
15
    return listenerContainerFactory
16
}
17
    
18
private fun prefetchPolicy(): JmsPrefetchPolicy {
19
    val prefetchPolicy = JmsDefaultPrefetchPolicy()
20
    prefetchPolicy.setAll(100)
21

          
22
    return prefetchPolicy
23
}



Notice how the prefetchPolicy property is overwritten on the JmsConnectionFactory. When doing it this way, we’re still using the ConnectionFactory bean provided by the azure-spring-boot package, thus avoiding having to manually provide the connection settings that otherwise are provided by the azure-spring-boot package.

Conclusion

In this article, I described some of the pitfalls one can encounter when using the default JMS settings provided by the azure-spring-boot Java package, along with a possible solution to each of those.

The complete code examples used in this article are available on GitHub.

Spring Framework Spring Boot microservice azure application Kubernetes

Opinions expressed by DZone contributors are their own.

Related

  • Containerization and Helm Templatization Best Practices for Microservices in Kubernetes
  • Why Camel K?
  • Manage Microservices With Docker Compose
  • Distributed Tracing System (Spring Cloud Sleuth + OpenZipkin)

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!