DZone
Integration Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > Integration Zone > How to Consume Large SQS Messages With JMS and Spring Boot

How to Consume Large SQS Messages With JMS and Spring Boot

Learn about using Spring Boot and JMS to consume large messages and the nuances of working with SQS through this working example code.

Laszlo Csontos user avatar by
Laszlo Csontos
·
Oct. 23, 17 · Integration Zone · Tutorial
Like (3)
Save
Tweet
20.25K Views

Join the DZone community and get the full member experience.

Join For Free

Spring Boot became ubiquitous in recent years and provided an opinionated way of integrating various pieces of technology. Working with JMS is no exception to that. Although Amazon has its own Java API for interacting with SQS, using it through JMS ensures that we’ll be able to use the same piece of code with another messaging infrastructure. After taking a look at a basic message consumer and producer setup, we dive into a more advanced use case: consuming large messages.

Working Example

If you’re like most people, perhaps you’d like to see the big picture first and go into the details after that. I prepared a fully working example, which is available on GitHub, and here:

% git clone git@github.com:springuni/springuni-examples.git
% cd springuni-examples

Create a local configuration file .env with the following contents in the root of the project (springuni-examples).

AMAZON_SQS_ENDPOINT=https://sqs.us-east-1.amazonaws.com/XXXXXXX/queue-name
AMAZON_SQS_ACCESS_KEY=XXXXXXXX
AMAZON_SQS_SECRET_KEY=XXXXXXXX
AMAZON_SQS_EXT_S3_ACCESS_KEY=XXXXXXXX
AMAZON_SQS_EXT_S3_SECRET_KEY=XXXXXXXX
AMAZON_SQS_EXT_S3_BUCKET_NAME=queue-name-large-payloads

After starting the demo app, it will listen on port 5000. I would have liked to keep it simple and it just handles bare text messages. Spring’s automated message converter infrastructure doesn’t play a role here, as the article’s focus is how to deal with large SQS messages.

% mvn spring-boot:run -pl springuni-jms-sqs
% curl -H 'Content-Type: text/plain' http://localhost:5000/message -XPOST -d 'test'

If everything went well, you should see the following messages:

2017-08-09 19:13:01.443 INFO 29525 --- [nio-5000-exec-1] c.s.examples.jms.MessageProducer : Sending message test.
2017-08-09 19:13:02.069 INFO 29525 --- [nio-5000-exec-1] c.a.s.javamessaging.SQSMessageProducer : Message sent to SQS with SQS-assigned messageId: 1001f7ba-55fc-4bdb-8732-8f4d40343068
2017-08-09 19:13:02.069 INFO 29525 --- [nio-5000-exec-1] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor
2017-08-09 19:13:02.243 INFO 29525 --- [enerContainer-1] c.s.examples.jms.MessageConsumer : Received message test

I’m hoping you’re still with me and interested in seeing the details.

Nuances of Working With SQS

SQS is an odd one out from the point of view of how message brokers operate in general.

SQS Maximizes the Size of Messages at 256K

Exchanging data in larger chunks than that can be achieved in various ways. When I first faced this limitation, I applied gzip compression and encoded the compressed binary data with base64. That solution worked just fine for textual (JSON) data, however, it required customization on both the producer’s and the consumer’s side. Furthermore, compression itself doesn’t guarantee that the size of messages to be sent will never exceed the 256K limit. Amazon SDK provides an extended SQS client, which enables end users to exchange messages larger than 256K transparently without having to apply customization themselves. The extended SQS client leverages S3 for storing messages and only a reference to the stored object is being sent to queues.

SQS Isn’t Transactional

However, Spring Boot tries to set up JmsListenerContainerFactory as transactional. When JMS autoconfiguration is enabled, JmsAnnotationDrivenConfiguration delegates configuring a DefaultJmsListenerContainerFactory to DefaultJmsListenerContainerFactoryConfigurer and that expects either a JtaTransactionManager be present or is set the container factory’s sessionTransacted property to true.

Annotation Driven Message Listener Configuration Requires the Queue Name to Be Defined Up Front

Arbitrary methods of a managed bean can be annotated with @JmsListener(destination = "queueName") or alternatively javax.jms.MessageListener can be implemented instead. Nevertheless, going for the first option is much more convenient as Spring intelligently converts the received message to various user-defined data type through its MessageConverter infrastructure.

In SQS, endpoint URLs identify queues and they also contain the queue’s name.
Such a URL looks like this: https://sqs.<region>.amazonaws.com/<acctount- id>/<queue-name>.

Obviously, we can extract the queue name from an URL like this, however, the way JMS can be set up with Spring Boot requires you to define the queue’s name directly.

In order to be able to leverage Spring’s messaging infrastructure without having to hard code a JMS destination in the message consumer or having to repeat the queue’s name in the application’s configuration, we need to implement a custom DestinationResolver. That DestinationResolver will eventually parse the endpoints URL of an SQS queue and we’ll have to fiddle with only a single application property.

Basic Setup With SQS

We continue from that point where Messaging with JMS left off. That guide doesn’t configure ConnectionFactory explicitly, in which case Spring Boot configures an embedded ActiveMQ broker.

Maven Dependencies

To showcase how Amazon’s JMS messaging library plays with Spring, we need to set up the following dependencies.

Basically, amazon-sqs-java-extended-client-lib is only required if you would like to send large messages and 256K might not be enough. For basic use cases, however, you can omit that.

The example uses spring-boot-starter-web, because we produce and consume messages in the same application. Real world solutions, however, have these functionalities separated.

Producing JMS Messages

For the sake of simplicity, we’ll be exchanging simple text messages. The aforementioned official Spring tutorial covers sending structured messages and it also explains how JSON messages are getting converted to/from simple Java POJOs.

We’re however focusing on the details of integrating SQS as our message broker instead.

For producing messages a simple REST controller (MessageProducer) is used which in turn puts the HTTP request’s body to an SQS queue. It’s fairly trivial to do just that, after all, there’s nothing specific to SQS in that piece of code.

@RestController
@Slf4j
public class MessageProducer {

  private final JmsOperations jmsTemplate;

  @Autowired
  public MessageProducer(JmsOperations jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
  }

  @PostMapping("/message")
  public ResponseEntity sendMessage(@RequestBody String message) {
    log.info("Sending message {}.", message);
    jmsTemplate.convertAndSend(message);
    return ResponseEntity.ok().build();
  }

}

Receiving JMS Messages

Although receiving messages (MessageConsumer) looks equally trivial at a first sight, the challenge here was to eliminate the requirement of having to define a hard-coded message destination.

@Component
@Slf4j
public class MessageConsumer {

  @JmsListener(destination = "")
  public void receive(@Payload String message) {
    log.info("Received message {}.", message);
  }

}

Spring gives support for resolving destinations based on that destination name which is supplied by the destination attribute of the @JmsListener annotation (DynamicDestinationResolver). As the application is listening on messages coming from a single queue, we don’t want to do that. Instead, the queue’s name is to be determined at that time when the application boots.

StaticDestinationResolver gets initialized with a fixed queue name and it resolves destinations against that same queue name every time.

public class StaticDestinationResolver implements DestinationResolver {

  private final String queueName;

  public StaticDestinationResolver(String queueName) {
    this.queueName = queueName;
  }

  @Override
  public Destination resolveDestinationName(
      Session session, String destinationName, boolean pubSubDomain) throws JMSException {

    return session.createQueue(queueName);
  }

}

SQS Configuration

SqsProperties encapsulate all of the required properties required for creating an SQS client. Basically, we need to define an AWS region, queue endpoint URL, and AWS access/secret keys, thought the two latter can be omitted if the underlying EC2 container has the necessary IAM roles.

@Data
@ConfigurationProperties(prefix = "amazon.sqs")
public class SqsProperties {

  private String region;
  private String endpoint;
  private String accessKey;
  private String secretKey;

  private Integer numberOfMessagesToPrefetch;

  private Extended extended = new Extended();

  public Optional<Integer> getNumberOfMessagesToPrefetch() {
    return Optional.ofNullable(numberOfMessagesToPrefetch);
  }

  public String getQueueName() {
    URI endpointUri = URI.create(endpoint);
    String path = endpointUri.getPath();
    int pos = path.lastIndexOf('/');
    return path.substring(pos + 1);
  }

  @Data
  public static class Extended {

    private String s3Region;
    private String s3BucketName;
    private String s3AccessKey;
    private String s3SecretKey;

  }

}

In application.yml, these properties are mapped to individual, uppercased application configuration options. Most of the time, these kinds of applications are deployed on EB (Elasticbeantalk) or ECS (Elastic Container Services) which supply configuration data as environment variables.

All of the configurations steps below are taken from AbstractSqsConfiguration. They were implemented as reusable building blocks and we would be inspecting them one-by-one in what follows.

Creating an AWS Credentials Provider

Connecting to AWS usually starts with authenticating to one of its services. As I mentioned above, the steps of having to supply access and secret keys can be omitted in case of IAM roles, yet creating an AWSCredentialsProvider (one way or another) is necessary.

private AWSCredentialsProvider createAwsCredentialsProvider(
    String localAccessKey, String localSecretKey) {

  AWSCredentialsProvider ec2ContainerCredentialsProvider =
      new EC2ContainerCredentialsProviderWrapper();

  if (StringUtils.isEmpty(localAccessKey) || StringUtils.isEmpty(localSecretKey)) {
    return ec2ContainerCredentialsProvider;
  }

  AWSCredentialsProvider localAwsCredentialsProvider =
      new AWSStaticCredentialsProvider(
          new BasicAWSCredentials(localAccessKey, localSecretKey));

  return new AWSCredentialsProviderChain(
      localAwsCredentialsProvider, ec2ContainerCredentialsProvider);
}

When access and secret keys are supplied we try to authenticate with those static credentials first with a fallback to fetching credentials from ECS directly.

Creating an SQS Client

Once we have an AWSCredentialsProvider at hand, using the AmazonSQSClientBuilder for making a client instance is straightforward.

Creating an SQS Connection Factory

SQSConnectionFactory is the concrete implementation of javax.jms.ConnectionFactory and as such it’s the gateway between the standard JMS API and native access to SQS through its Java SDK.

protected SQSConnectionFactory createStandardSQSConnectionFactory(SqsProperties sqsProperties) {
  AmazonSQS sqsClient = createAmazonSQSClient(sqsProperties);

  ProviderConfiguration providerConfiguration = new ProviderConfiguration();
  sqsProperties.getNumberOfMessagesToPrefetch()
      .ifPresent(providerConfiguration::setNumberOfMessagesToPrefetch);

  return new SQSConnectionFactory(providerConfiguration, sqsClient);
}

After we’ve had an AmazonSQSClient instance created in the former step, we need a ProviderConfiguration object as well in order to set the number of messages to be pre-fetched.

JMS Configuration

Spring Boot makes working with JMS very easy. Under normal circumstances, it’s enough to create register a single javax.jms.ConnectionFactory bean and then it takes care of creating a message listener container and a JmsTemplate.

@Bean
@Override
public ConnectionFactory connectionFactory(SqsProperties sqsProperties) {
  return createStandardSQSConnectionFactory(sqsProperties);
}
In case of SQS, however, there some nuances we should take care of ourselves. I mentioned that SQS wasn’t transactional, but Spring Boot’s autoconfiguration mechanism tried to create a listener container factory that way.  JmsTemplate needs a destination and that can be extracted from the given SQS endpoint URL and also it’s more convenient to have the queue name resolved automatically in contrast to having to hard-code it with  @JmsListner.

Using the SQS Extended Client

We’ve covered how to set up a basic JMS config. If you want to produce and consume message larger than 256K, keep reading.

Creating an S3 Client

Amazon SQS client relies on S3 as a means of persisting large messages and only a reference is sent over SQS. In order to be able to leverage S3, a bucket and credentials for accessing S3 are required. The following piece of code demonstrates how an AmazonS3Client can be built.

Creating the bucket automatically upon the first initialization of the app is also taken care of, although that’s optional.

Creating an Extended SQS Client

This is the key step for enabling large message handling. Basically, we need an AmazonS3Client and an AmazonSQSClient and we covered that already how to create them.

Thereafter these two are linked through AmazonSQSExtendedClient and ExtendedClientConfiguration provides a way to customize how large messages should be handled. By default, only messages larger than 256K will be sent over S3, but a user-defined message size threshold can also be specified. It’s also possible to configure it in a way that all of the messages go through S3 regardless of their size.

Conclusion

I read somewhere that Spring makes simple things easy and complex things possible. This assertion proved to be true on many occasions (including this one), when I was dealing with configuring a sophisticated application infrastructure with it.

It took me approximately a half day of tweaking to get every aspect SQS large message handling with JMS and Spring Boot right. Eventually, difficulties stemmed from that fact that SQS is a bit different from other message queuing solutions. For example, as I mentioned earlier, it isn’t transactional, but Spring Boot – being an opinionated framework – tries to configure it that way, as most messages brokers are transactional.

So, in conclusion, I can say that this setup has been working just fine for a couple of month in three applications using SQS with large messages.

Spring Framework Spring Boot

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Ultra-Fast Microservices: When Microstream Meets Payara
  • Portfolio Architecture Examples: Retail Collection
  • Why I'm Choosing Pulumi Over Terraform
  • Deployment of Low-Latency Solutions in the Cloud

Comments

Integration Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo