Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Lambda for Asynchronous Message Processing Through SQS

DZone's Guide to

Lambda for Asynchronous Message Processing Through SQS

This article puts the concept of serverless architecture into practice with a demonstration of using AWS Lambda for asynchronous messages.

· Cloud Zone ·
Free Resource

Insight into the right steps to take for migrating workloads to public cloud and successfully reducing cost as a result. Read the Guide.

Most modern application developers are moving towards a serverless paradigm which will be one of the key paradigms for enterprise application development in next few years. FaaS is the processing component of the serverless architecture. In this blog post, I’m going to explain how to use FaaS and serverless architecture for a real-world use case. When we are talking about FaaS and serverless there are few leading service providers around these technologies and obviously, AWS is the top of the list. So I’m going to use AWS services for this exercise.

Use Case Description

ReserveNow.LK is hotel booking service provider which manages an application to handle hotel bookings which are purchased by their client hotels. They currently have a requirement to process booking requests asynchronously to further analyze request patterns and failures in order to enhance their service in the future. Since they don’t want to introduce any delay to the booking request handing process due to this requirement, they have decided to push these booking messages to a message queue.

Our exercise here is to implement this asynchronous booking request processing application using AWS services. To make it simple, let’s focus only two main requirements in this case.

  1. The application should identify booking request anomalies based on the booking dates and detected anomalies should be persisted to a NoSQL database for further analysis using some data analytical tool.
  2. It should detect booking failures and notify failure state to the Ops team via an SMS message

Selected AWS Services

  • Since we need to decouple booking handling and asynchronous booking request processing, we need a queueing system in-between. AWS Simple Queue Service (SQS) is the best solution for this. So the booking engine will publish messages to the SQS, and our application will poll messages from SQS and do required post-processing for further analysis.
  • Detected anomalies should be persisted to a NoSQL database which obviously forces us to choose DynamoDB for that.
  • For notifying Ops team via an SMS about the booking failures, we can use AWS Simple Notification Service(SNS).

Those are the service components that we can use for each capability that we required. However, there is one big missing part which is the processing part or the FaaS component of the application to detect anomalies and failures of the booking requests. When we are talking about the serverless and FaaS with AWS, the only and the best option for that is AWS Lambda.

Now all the components are selected, but there is one more thing. Based on the architectural behavior, AWS SQS couldn’t use as a trigger for AWS lambda to process messages from an SQS queue. If we want to process SQS messages via Lambda, we need to trigger the Lambda function periodically via a CloudWatch rule and then let Lambda fetch messages from the configured SQS queue.

So overall architectural view of the application would be like this,

Image title



Setting Up the Resources

Enough of explanations and boring stuff. Let’s go to some fun stuff with the implementation of our application using AWS services.

The first step is to create an SQS queue to integrate with booking engine and fetch messages for anomaly detection. Assuming that you have an AWS account (which is obviously needed to try out this), let’s go to the SQS service using Services -> Simple Queue Service. Then click on Create New Queue button and give a proper name for your queue. Since we are not very worried about ordering, we can live with a Standard Queue and no need to go for FIFO type queues. In my case, I’m creating a queue with name BookingInfoQueue.

Image title

Then we should create our Lambda function to fetch messages from our SQS queue, but for that first, we need to set up required permission and roles for our application. For that, let’s go to the IAM service of the AWS, Services ->IAM, then go to the Roles section from the left menu. Then click on the Create Role button to create a new role. You will be navigated to a page with a set of AWS services to be configured for your role. Select Lambda from that list and then go to the next section. Then from the policy list, search for following policies and add them to your role.

  • AWSLambdaExecute
  • AmazonSQSFullAccess
  • CloudWatchEventsFullAccess
  • AmazonDynamoDBFullAccess
  • AmazonSNSFullAccess

After selecting above policies, click on the Review button to go to the new page. In that page make sure you have enabled all above policies and then give a proper name for your role. In my case it’s “lambda_sqs_tutorial” and final view should be like below. If it’s correct, click on the Create Role button to complete role creation.

Image title



Then let’s create the Lambda function to fetch messages from our SQS queue. For that, let’s go to the Lambda function section, Services -> Lambda, and then click on the Create Function button. There you can give a proper name for your lambda function (under the Author from Scratch section) As the runtime you can use whatever you prefer, but for this exercise, I’m going to use NodeJS.6.10. For the Role, select “Choose an existing role” option and select our newly created role from the “Existing role” drop-down menu. Then click on the Create function button to create the lambda function.

Image title


Then you will be navigated to the Lambda Designer view which can be used to configure triggers for your lambda function. In the left side of the designer view, there should be few services which can be added as a trigger for your lambda function. In our scenario, we need a CloudWatch event to trigger our lambda function periodically based on a CRON schedule. To add that, click on the CloudWatch Events item from the left side list. Then it should be added to the designer view and linked to our lambda function. To configure our CloudWatch event, let’s double-click on the CloudWatch event item which is linked to our lambda function as a trigger. Then scroll down to the bottom to find the trigger configuration section related to the CloudWatch event. In that section, select “Create a new rule” option from the drop-down of the Rule. Then give a proper name for our rule, and then configure the cron schedule using “cron(0/5 * ? * MON-FRI *)” value. Keep the “Schedule expression” selected for the Rule type and make sure the enable trigger option is also selected. Then click on the Add button to complete our trigger creation.

Image title


Now our receiving part is completed. The remaining part is to fetch messages from SQS queue, and then process those messages for anomaly detection. Before going for coding, we need to create one more resource, which is the DynamoDB table.

For that, let’s go to the DynamoDB service section, Services -> DynamoDB, and click on the Create table button to create a new table. Let’s fill that view as below (make sure table names and key names are same as mine if you want to use the same code for Lambda implementation) and click on the Create button to create the table.

Image title



You may be thinking, "Don’t we need to create anything for the SNS to send SMS messages for booking failures?" Fortunately, SNS provides a straightforward way to send SMS messages without doing any pre-configuration.

Implementing Lambda Function

Now we have set up all the required resources, the only remaining part is the coding part of the Lambda function which is the most interesting part for a developer. If we go through the logic that we need to implement, it’s as below:

  • Fetch messages from the queue
  • Go through each message and check for booking anomalies. Booking anomalies will be detected on two rules, if booking start date is 6 months after the current date or booking period (the difference between booking end date and start date) is greater than 20 days, it will be detected as booking anomaly. See following sample booking request to understand the structure of the message
{  
  "resellerId": "81023I",  
  "bookingRef": "SL-booking-NR-1284404",  
  "bookingReqProcessingState": "Success",  
  "bookingRequest": {    
    "startDate": "2018-11-19",    
    "endDate": "2018-12-02",    
    "pax": "2",    
    "city": "PAR",    
    "grade": "NR"  
  }
}
  • Detected booking anomalies should be persisted to the DynamoDB database.
  • If there are booking failures (based on the value of the “bookingReqProcessingState” field value), those booking failures should be notified to the users via an SMS message (to be simple, let’s send an SMS message only for one number)
  • Delete all processed messages from the SQS queue (SQS won’t deleted messages automatically once you read the messages. You have to specifically delete messages once you complete the processing for that message)

Create Lambda Deployment Artifact

It’s quite straightforward. Create a folder with name BookingInfoProcessor and go inside that folder and create package.json file with the following content.

{  
  "name": "BookingInfoProcessor",  
  "version": "1.0.0",  
  "dependencies": {}
}

Then execute the  npm install date-and-time  command within your directory to install utility module for date manipulations (it’s expected to have npm installed locally in your machine). Then create a lambda.js file inside your directory and start coding within that file. To facilitate our requirement, we need our Lambda file implementation as below,

let AWS = require('aws-sdk');
let sqs = new AWS.SQS();
let date = require('date-and-time');
const ddb = new AWS.DynamoDB.DocumentClient();
const sns = new AWS.SNS();

exports.handler = (event, context, callback) => {

    sqs.receiveMessage({
        QueueUrl: '<<queue_url>>',  // URL of your queue
        AttributeNames: ['All'],
        MaxNumberOfMessages: '10',
        VisibilityTimeout: '30',
        WaitTimeSeconds: '20'
    }).promise()
        .then(data => {
            data.Messages.forEach(message => {      // Going through all the fetched messages in this attempt
                console.log("Received message with payload", message.Body);

                let messageBody = JSON.parse(message.Body);

                let bookingDateObj = new Date();
                let startingDateObj = date.parse(messageBody.bookingRequest.startDate, 'YYYY-MM-DD');
                let endingDateObj = date.parse(messageBody.bookingRequest.endDate, 'YYYY-MM-DD');

                let failure = messageBody.bookingReqProcessingState === "Failed";       // Check whether it's a booking failure
                if (failure) {
                    let notificationMsg = "Notifying about booking failure for booking reference :" + messageBody.bookingRef;

                    sns.publish({                                   // Notifying booking failure via an SMS message using SNS
                        Message: notificationMsg,
                        MessageAttributes: {
                            'AWS.SNS.SMS.SMSType': {
                                DataType: 'String',
                                StringValue: 'Promotional'
                            },
                            'AWS.SNS.SMS.SenderID': {
                                DataType: 'String',
                                StringValue: 'BkFailures'
                            }
                        },
                        PhoneNumber: '<<phone_number>>'                 // Your phone number goes here to get an SMS notification
                    }).promise()
                        .then(data => {
                            console.log("Successfully sent notification to the operator with response :" + JSON.stringify(data));
                        })
                        .catch(err => {
                            console.log("Error while sending notification SMS", err);
                        });
                }

                let gapForBookingStartDate = date.subtract(startingDateObj, bookingDateObj).toDays();
                let gapBetweenBookingDates = date.subtract(endingDateObj, startingDateObj).toDays();

                // Check whether is it a booking anomaly. In this example it's detected as an anomaly if booking start date is
                // 6 months (180 days) away from the current date or booking date range is greater than 20 days
                if (gapBetweenBookingDates > 20 || gapForBookingStartDate > 180) {
                    let insertTimeStr = date.format(new Date(), 'YYYY-MM-DD HH:mm:ss');
                    ddb.put({
                        TableName: 'BookingInfoAnomalies',
                        Item: {
                            'ResellerID': messageBody.resellerId,
                            'BookingRef': messageBody.bookingRef,
                            'BookingState': !failure,
                            'StartDate': messageBody.bookingRequest.startDate,
                            'EndDate': messageBody.bookingRequest.endDate,
                            'Pax': messageBody.bookingRequest.pax,
                            'City': messageBody.bookingRequest.city,
                            'Grade': messageBody.bookingRequest.grade,
                            'InsertTime': insertTimeStr
                        }
                    }).promise()
                        .then(data => {
                            console.log("Successfully inserted booking ref : " + messageBody.bookingRef +
                                " to DynamoDB with response : " + JSON.stringify(data));
                        })
                        .catch(err => {
                            console.log("Error while inserting data to DynamoDB due to : ", err);
                        });
                }

                sqs.deleteMessage({                         // Deleting process message to make sure it's not processed again
                    QueueUrl: "<<queue_url>>",  // URL of your queue
                    ReceiptHandle: message.ReceiptHandle

                }).promise()
                    .then(data => {
                        console.log("Successfully deleted message with ReceiptHandle : " + message.ReceiptHandle +
                            "and booking reference : " + messageBody.bookingRef + " with response :" + JSON.stringify(data));
                    })
                    .catch(err => {
                        console.log("Error while deleting the fetched message with ReceiptHandle : " + message.ReceiptHandle +
                            "and booking reference : " + messageBody.bookingRef, err);
                    });

            });
        })
        .catch(err => {
            console.log("Error while fetching messages from the sqs queue", err);
        });


    callback(null, 'Lambda execution completed');
};


If you are quite new to NodeJS, I recommend you to get a clone of this GitHub repository to make sure you have to correct source code and artifact.

Once you have the project, you just have to create a zip file from the complete project to deploy it as a Lambda function. Then go to the AWS management console and open the Lambda Designer view again. Click on the Lambda Function Icon (with name BookingInfoProcessor) and scroll down to the Function Code section. Change the Code entry type to Upload a.zip file option and then click on the upload button to upload your created zip file. After uploading the file, click on the Save button to save your Lambda function updates. Now we have done and implemented our complete application using AWS services.

Testing

To test our application, we need to have sample messages in the SQS queue. For that go to SQS services section and find your queue in the queue listing view. From that view, you can publish messages to your queue using Queue Actions drop-down list, by selecting the Send a Message option. From that, you can easily send messages to your queue as you want with required changes to the payload of the above sample message.

Now you should see our Lambda function consumes messages from your queue in 5 minutes intervals and if there are any booking anomalies or booking failures, you should see a DynamoDB update or SMS message respectively. If you want to further analyses or monitor your Lambda function execution behavior, go to Services -> CloudWatch -> Logs section and find out log group relevant to your lambda function. Within that log group, there should be either one or multiple log streams for your lambda function execution.

AWS provides a cool set of services to facilitate most of the requirements for modern application development with Serverless architecture pattern. Once you are familiar with the AWS services, you will select it as the first preference for most of your application development. Yes, you may have to do few configurations and wiring in few places under different services in the AWS management console, but it’s worth the effort compared to the features and advantages of the whole process.

TrueSight Cloud Cost Control provides visibility and control over multi-cloud costs including AWS, Azure, Google Cloud, and others.

Topics:
aws ,lambda ,sqs ,serverless ,asynchronous ,sns ,anomaly detection ,cloud

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}