DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Join us today at 1 PM EST: "3-Step Approach to Comprehensive Runtime Application Security"
Save your seat
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Rolling a Message Specification Update on a Message Broker (AWS SQS)

Rolling a Message Specification Update on a Message Broker (AWS SQS)

Learn how to roll a message specification update on a message broker (AWS SQS).

Murat Balkan user avatar by
Murat Balkan
CORE ·
Dec. 21, 18 · Tutorial
Like (2)
Save
Tweet
Share
13.67K Views

Join the DZone community and get the full member experience.

Join For Free

Message brokers facilitate communication between Message Publishers and Message Consumers. The instructions (fields) in the message are structured by the publisher of the message in a way the consumer can understand and parse without any problems. This mutual agreement is known to the developers, but not all projects externalize this agreement as a contract.

Just like we have API specifications (in different formats like Swagger), we ideally have to have Message specifications as well.  (These specifications are called event specifications in reactive solutions but the concept is the same) The message specifications should be externalized from the code, and be tracked in an SCM (Source Code Management) system like Git. This way, the specification can also be owned by the master of that source code repository and any feedbacks, change requests can be collected and handled by native functions of SCM (Pull requests, merge requests etc.). (I have seen companies using MS Excel for the same, but it is not easily manageable after some point).

Who will own the repository? In the case of event/message specifications, the repository is owned by the owner of the event, the Publisher. Publisher software's owner will be a team in our organization. This organization will have the task of creating the specifications, putting them into domain/microservice organized repositories and manage the lifecycle of them as the business, therefore technical capabilities evolve.

The changes in the specifications should also be published to the developer community by some mechanism, over an internal portal or even via a simple changelog file. (We don’t notify everyone about the updates though, more on that later.)

Messages/Events are generally expressed as JSON structures, with name-values in it. Sometimes, where the throughput or message size is a concern, binary message formats are preferred and guess what, these binary messages will mandate a specification. So, if we are dealing with binary message formats, we are already managing specifications by design. For JSON based messages, the specification management part is generally omitted by developers due to the size of the project, or the flexibility provided by JSON - Object mapping parsers. Additional changes to a JSON payload are not a breaking change for example, and knowing this fact, sometimes the publisher's developers will not even take the action to notify their consumers.

Regardless of it is a breaking change or not, we should try to design and implement our message/event repositories to keep track of our specifications. Event change management also includes knowing and tracking your consumers. Brokers expose reports which can be leveraged to keep track of the consumer applications (via their respective client-ids). Consumer Impact Analysis should be run on this inventory whenever we will execute a change in the metadata (queue names, TTLs, cluster updates) or a specification change. This way, only the necessary teams will be notified. (The new consumers (teams) do not need this update, they are developing from scratch and can go to the portal/MD file->repository to find the latest specification. Managing notifications (audience and frequency) wisely is very important. Poorly managed notifications lead to notification creep and negligence,  which will also lead to not-followed processes).

In this article, I will demonstrate a strategy for rolling an update to a consumer fleet. In this scenario, we will be having a backlog at a queue that resides on an AWS SQS broker. This backlog will be consisting of old messages which have a message version of v1.

In this scenario, we will update the publisher, on the fly to start emitting a new message format, v2.

The consumers, who are acting upon this publisher's messages will not be able to parse this v2 message format anymore. (We will be  assuming that this is a breaking change) The challenge is to process the old messages and the new messages from the same queue via different consumers who can understand each message type respectively.

To solve this challenge I will be applying the following steps:

  1. Old Publisher is decommissioned and New Publisher is commissioned at the same time. New publisher starts publishing messages conforming to the new specification. (v2)

  2. The new consumer who understands the newly structured messages are commissioned and starts listening to the already existing queue. (joins to the fleet)

  3. The broker will detect the new consumer and will also start sending some of the already existing "v1" messages to it (in a round-robin fashion).

  4. The old consumer will not complain about the v1 messages, but the new consumer has no idea how to handle them. Therefore, the new consumer rejects the messages that do not conform to its expected specification. (Specification ids are passed to the consumers as environment variables). This reject will make the same message available to the old consumer.

  5. When all of the "v1" messages are consumed from the queue, this time the situation explained on Item-4 will be reversed. The old consumer will start complaining about the "v2" messages, while the new consumer will process them. 

  6. System admins will keep an eye on the old consumer logs, and as soon as they are satisfied that there are no "v1" messages left on the broker, they decommission the old consumer and finalize the rollout.

As you may have noticed, the consumers keep track of a version attribute. The version attribute is passed to them through environment variables. They are checking whether this set version matches the received message's version and take appropriate action. This message version attribute can come in the message header/attributes or in the payload. I prefer the message header approach as this allows me to completely change the message payload, and this demonstration will also follow the same.

I will create a demo.fifo queue for this demo in AWS SQS with the following settings. (I am adding the delivery delay for demo purposes only)

Image title

I will be using NodeJS SDK for AWS SQS for both publisher and subscriber code. The publisher code looks like the following:

var AWS = require('aws-sdk');
AWS.config.update({region: 'us-east-1'});
var sqs = new AWS.SQS({apiVersion: '2012-11-05'});
const args=process.argv.slice(2);
const message_count=args[0];
const version=args[1];
for (var i=0;i<message_count;i++){
var params = {
 MessageAttributes: {
  "EventVersion": {
    DataType: "String",
    StringValue: version
   }
 },
 MessageBody: "{\"name\":\""+version+" message\"}",
 QueueUrl: "https://sqs.us-east-1.amazonaws.com/444/demo.fifo",
 MessageGroupId: "demo",
 MessageDeduplicationId: (new Date).getTime().toString()+i
};

sqs.sendMessage(params, function(err, data) {
  if (err) {
    console.log("Error", err);
  } else {
    console.log("Message published with Id: ", data.MessageId);
  }
});
}

By using this publisher, I will first publish 10 messages to the queue named "demo" with the old version identifier. (v1) Payload is a simple JSON object {"name":"message"}

Image title

I now want to be sure that I can see the backlog in my queue via AWS CLI:

Image title

Everything looks good. Now, the consumers. I will write the consumer code as follows:

var AWS = require('aws-sdk');
AWS.config.update({region: 'us-east-1'});
var sqs = new AWS.SQS({apiVersion: '2012-11-05'});
const args=process.argv.slice(2);
const current_version=args[0];
const queue ="https://sqs.us-east-1.amazonaws.com/444/demo.fifo";
setInterval(function() {
sqs.receiveMessage({
   QueueUrl: queue,
   MaxNumberOfMessages: 1, // get 1 message per poll
   WaitTimeSeconds: 4, // long polling seconds
   MessageAttributeNames:["All"]
 }, function(err, data) {
   if (data.Messages) {
      for(var i=0;i<data.Messages.length;i++){
        var message = data.Messages[i];
        var rh=message.ReceiptHandle;
        var version=message.MessageAttributes.EventVersion;
        if(version.StringValue==current_version){
                var body = JSON.parse(message.Body);
                console.log("Versions match. ("+version+") Processing message: " +  JSON.stringify(body));
                //delete the message from the queue
                var params = {
                        QueueUrl: queue,
                        ReceiptHandle: rh
                };
                sqs.deleteMessage(params, function(err, data) {
                        if (err) console.log(err, err.stack); // an error occurred
                });
        }
        else{
           //reject the message by setting the visibility timeout to 0
           var visibilityParams = {
                QueueUrl: queue,
                ReceiptHandle: rh,
                VisibilityTimeout: 0 // reject message
           };
           sqs.changeMessageVisibility(visibilityParams, function(err, data) {
           if (err) {
            console.log("Reject Error", err);
           } else {
            console.log("Message rejected due to version mismatch!", data);
           }
           });
        }
        }
   }
 });
},5000);

I am getting the expected message version from command line arguments. In real life, this could be injected inside the OS as an environment variable.

As you can see, the code checks for the message attribute named EventVersion to see if it matches its set version. If there is a match, it logs the message payload into the console, if not, the message is rejected and returns back to the queue.

The new consumer is using the same codebase, (for this example, in the real world there will probably be different parsing and or business logic.), but I am passing a different expected message version of “v2” via command line arguments.

Before starting the consumers, I will start the publisher one more time to emit some new messages (version 2) behind the old ones, I will be sending an additional 5 via;

 node publisher.js 5 v2 

I will start the consumers in different terminals to examine the results.

Consumer 1 (Old Consumer)

Starts by consuming first messages, all are "v1", and processes them without complaining. As soon as it reaches the "v2" messages, it starts rejecting them which causes them to be eligible for Customer 2.

Image title

Consumer 2 (New Consumer)

Consumer2 starts emitting errors at first because of the "v1" messages. It returns them back to the queue. After all "v1" messages are consumed from the queue by the Consumer1, Consumer 2 can happily Consume "v2"s (while Consumer1 starts emitting errors. These errors can now be collected by production support tools, and we can safely decommission Consumer1)

Image title

This rollout strategy can also be used for other interesting scenarios such as business logic changes at a specific date. (old messages published before that date will be subject to a business/regulatory logic which differs from the ones having a later publish date). 

Happy queueing!

AWS consumer Message broker

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Exploring the Benefits of Cloud Computing: From IaaS, PaaS, SaaS to Google Cloud, AWS, and Microsoft
  • Playwright vs. Cypress: The King Is Dead, Long Live the King?
  • The Enterprise, the Database, the Problem, and the Solution
  • How to Secure Your CI/CD Pipeline

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: