Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Integrate SQS and Lambda: Serverless Architecture for Asynchronous Workloads

DZone's Guide to

Integrate SQS and Lambda: Serverless Architecture for Asynchronous Workloads

Author Andreas Wittig shows you how to process asynchronous tasks serverlessly. An SQS queue will be used to decouple your microservice from other parts of your system. You'll learn how to implement the microservice with AWS Lambda.

· Cloud Zone
Free Resource

Are you joining the containers revolution? Start leveraging container management using Platform9's ultimate guide to Kubernetes deployment.

The Gold Standard for modern cloud-native applications is a serverless architecture. AWS Lambda allows you to implement scalable and fault tolerant applications without the need of a single virtual machine.

A serverless infrastructure based on AWS Lambda has two main advantages:

  1. You don’t need to manage a fleet of virtual machines anymore.
  2. Deploying new versions of your code can be fully controlled by API calls.

This article shows you how to process asynchronous tasks serverlessly. Possible use cases are: sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior. An SQS queue will be used to decouple your microservice from other parts of your system. You'll learn how to implement the microservice with AWS Lambda.

What is SQS?

A best practice when building scalable and highly available systems on AWS is to decouple your microservice by using one of the following options:

  • ELB (Load Balancer) for synchronous decoupling: web application answering HTTPS requests from browsers
  • SQS queue for asynchronous decoupling: sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior.

Amazon Simple Queue Service (SQS) is a managed message queuing service. SQS offers fault tolerant and scalable distributed message queues: simple to use but very powerful.

The following figure shows a typical architecture based on SQS. A producer sends tasks (also called messages) to an SQS queue. A fleet of consumers reads tasks from the SQS queue and does the actual computing.

The number of consumers can be scaled based on the number of tasks in the queue very easily. As the tasks are stored durable and are managed within SQS it is also very easy to build a fault tolerant system recovering from failed consumers without losing any tasks automatically.

SQS offers a REST API so there are various options to integrate into producers and consumers: mobile or client-side web applications, applications running on EC2 instances, and AWS Lambda.

What is Lambda?

Amazon announced AWS Lambda in November 2014. Since then serverless computing has become one of the hottest topics within the public cloud market.

You are able to execute small computing tasks with the help of AWS Lambda. Currently, your function needs to finish its work within 300 seconds.

AWS Lambda offers:

  • Scalable and highly available computing capacity.
  • Automated deployments allowing you to set up a continuous delivery pipeline.
  • A fully managed and maintenance-free service.

Currently, Lambda supports Node.js, Java, and Python. You just have to upload your source code to S3. Afterwards, you are able to execute your functions by calling the REST API or use one of the integrations: Kinesis, S3, DynamoDB, CloudTrail, and API Gateway.

Ath the moment there is no out-of-the-box integration for SQS. But you'll learn how to integrate SQS with Lambda easily next.

Integrate Lambda and SQS

The following figure shows all components needed to read and process messages from an SQS queue serverless:

  1. The SQS queue receives and stores tasks from other parts of the system.
  2. The CloudWatch Event Rule triggers the Lambda Consumer based on a schedule (e.g. every minute).
  3. The Lambda Consumer reads as many messages as possible from the SQS and executes a Lambda Worker for each message.
  4. The Lambda Worker performs the actual computing tasks and deletes the task from the SQS queue after the task was completed successfully.

Let's dive a little bit deeper into how to implement and set up this infrastructure with the help of CloudFormation, my favourite Infrastructure as Code tool.

Setup SQS Queue

First, you need to set up an SQS queue. The following CloudFormation snippet contains:

  • An SQS queue for your tasks.
  • An SQS queue acting as dead letter queue.

What is a dead letter queue? A dead letter queue is used to collect failed tasks. In this example, a task is moved to the dead letter queue if the Lambda Consumer or Worker has failed to successfully complete the task within 60 seconds for 10 times. This mechanism allows your system to get rid of broken tasks without blocking the whole task queue.

"TaskQueue": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
        "QueueName": "task-queue",
        "MessageRetentionPeriod": "1209600",
        "VisibilityTimeout": "60",
        "RedrivePolicy": {
            "deadLetterTargetArn": {"Fn::GetAtt": ["DeadLetterQueue", "Arn"]},
            "maxReceiveCount": "10"
        }
    }
},
"DeadLetterQueue": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
        "QueueName": "dead-letter-queue",
        "MessageRetentionPeriod": "1209600"
    }
}

Let's proceed with the Lambda Consumer.

Set Up and Implement Lambda Consumer

The following snippet contains all resources needed for the Lambda Consumer:

  • An Events Rule called ScheduleRuleForConsumerLambda will trigger the Lambda Consumer every minute.
  • A Lambda Permission named PermissionToInvokeConsumerLambda allows the Event Rule to invoke the Lambda Consumer.
  • An IAM Role called ConsumerLambdaRole contains an IAM policy allowing the Lambda Consumer function to read messages from SQS and invoke the Lambda Worker.
  • A Lambda function named ConsumerLambda creates and configures the Lambda Consumer.
"ScheduleRuleForConsumerLambda": {
    "Type": "AWS::Events::Rule",
    "Properties": {
        "Description": "Schedule Rule for Consumer Lambda",
        "ScheduleExpression": "rate(1 minute)",
        "State": "ENABLED",
        "Targets": [{
            "Arn": {"Fn::GetAtt": ["ConsumerLambda", "Arn"]},
            "Id": "DailyScheduleTarget"
        }]
    }
},
"PermissionToInvokeConsumerLambda": {
    "Type": "AWS::Lambda::Permission",
    "Properties": {
        "FunctionName": {"Ref": "ConsumerLambda"},
        "Action": "lambda:InvokeFunction",
        "Principal": "events.amazonaws.com",
        "SourceArn": {"Fn::GetAtt": ["ScheduleRuleForConsumerLambda", "Arn"]}
    }
},
"ConsumerLambdaRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "lambda.amazonaws.com"},
                "Action": ["sts:AssumeRole"]
            }]
        },
        "Path": "/",
        "Policies": [{
            "PolicyName": "logs",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "logs:CreateLogGroup",
                        "logs:CreateLogStream",
                        "logs:PutLogEvents"
                    ],
                    "Resource": "arn:aws:logs:*:*:*"
                }]
            }
        },
        {
            "PolicyName": "sqs",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "sqs:ReceiveMessage"
                    ],
                    "Resource": {"Fn::GetAtt": ["TaskQueue", "Arn"]}
                }
            ]}
        },
        {
            "PolicyName": "lambda",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "lambda:InvokeFunction"
                    ],
                    "Resource": {"Fn::GetAtt": ["WorkerLambda", "Arn"]}
                }
            ]}
        }]
    }
},
"ConsumerLambda": {
    "Type": "AWS::Lambda::Function",
    "Properties": {
        "Code": {
            "S3Bucket" : {"Ref": "LambdaCodeBucket"},
            "S3Key" : {"Ref": "ConsumerLambdaCodeKey"}
        },
        "Handler": "index.handler",
        "MemorySize": 128,
        "Role": {"Fn::GetAtt": ["ConsumerLambdaRole", "Arn"]},
        "Runtime": "nodejs4.3",
        "Timeout": 60
    }
}

The previous snippet can be used to set up the resources needed with the help of CloudFormation. What's still missing is the source code for the Lambda Consumer executed on AWS Lambda. The Lambda function needs to:

  • Read as many tasks as possible from the task queue.
  • Invoke a Lambda Worker for each task.

The following snippet contains an example for the Lambda Consumer written in JavaScript:

var AWS = require("aws-sdk");  
var async = require("async");

var QUEUE_URL = "SQS_QUEUE_URK";  
var WORKER_LAMBDA_FUNCTION_NAME = "LAMBDA_FUNCTION_NAME";  
var REGION = "AWS_REGION";

var sqs = new AWS.SQS({region: REGION});  
var lambda = new AWS.Lambda({region: REGION});

function receiveMessages(callback) {  
    var params = {
        QueueUrl: QUEUE_URL,
        MaxNumberOfMessages: 10
    };
    sqs.receiveMessage(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);
            callback(err);
        } else {
            callback(null, data.Messages);
        }
    });
}

function invokeWorkerLambda(task, callback) {  
    var params = {
        FunctionName: WORKER_LAMBDA_FUNCTION_NAME,
        InvocationType: 'Event',
        Payload: JSON.stringify(task)
    };
    lambda.invoke(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);
            callback(err);
        } else {
            callback(null, data)
        }
    });
}

function handleSQSMessages(callback) {  
    receiveMessages(function(err, messages) {
        if (messages && messages.length > 0) {
            var invocations = [];
            messages.forEach(function(message) {
                invocations.push(function(callback) {
                    invokeWorkerLambda(message, callback)
                });
            });
            async.parallel(invocations, function(err) {
                if (err) {
                    console.error(err, err.stack);
                    callback(err);
                } else {
                    if (context.getRemainingTimeInMillis() > 20000) {
                        handleSQSMessages(callback);    
                    } else {
                        callback(null, "PAUSE");
                    }                   
                }
            });
        } else {
            callback(null, "DONE");
        }
    });
}

exports.handler = function(event, context, callback) {  
    handleSQSMessages(callback);
};

Only one more part missing: the Worker Lambda.

Set Up Worker Lambda

The following snippet contains all resources needed to set up the Worker Lambda with the help of CloudFormation:

  • An IAM Role called WorkerLambdaRole contains an IAM policy allowing the Lambda Worker function to delete messages from SQS.
  • A Lambda function named WorkerLambda creates and configures the Lambda Worker.
"WorkerLambdaRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "lambda.amazonaws.com"},
                "Action": ["sts:AssumeRole"]
            }]
        },
        "Path": "/",
        "Policies": [{
            "PolicyName": "logs",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "logs:CreateLogGroup",
                        "logs:CreateLogStream",
                        "logs:PutLogEvents"
                    ],
                    "Resource": "arn:aws:logs:*:*:*"
                }]
            }
        },
        {
            "PolicyName": "sqs",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "sqs:DeleteMessage"
                    ],
                    "Resource": {"Fn::GetAtt": ["TaskQueue", "Arn"]}
                }
            ]}
        }]
    }
},
"WorkerLambda": {
    "Type": "AWS::Lambda::Function",
    "Properties": {
        "Code": {
            "S3Bucket" : {"Ref": "LambdaCodeBucket"},
            "S3Key" : {"Ref": "WorkerLambdaCodeKey"}
        },
        "Handler": "index.handler",
        "MemorySize": 128,
        "Role": {"Fn::GetAtt": ["WorkerLambdaRole", "Arn"]},
        "Runtime": "nodejs4.3",
        "Timeout": 60
    }
}

Of course, you need to implement your own source code for the Lambda Worker. You can do whatever can be done within 300 seconds, the current execution timeout for Lambda functions.

Asynchronous decoupling can be achieved with the help of SQS. It is possible to integrate the managed queuing system with Lambda. This allows you to build a serverless microservice consuming tasks from a queue (e.g. sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior).

Read on

Feedback

Discovered a mistake? Anything missing? Looking forward to your feedback! @andreaswittig or andreas@widdix.de.

Using Containers? Read our Kubernetes Comparison eBook to learn the positives and negatives of Kubernetes, Mesos, Docker Swarm and EC2 Container Services.

Topics:
aws lambda ,microservice ,serverless ,sqs ,amazon ,aws ,lambda ,asynchronous

Published at DZone with permission of Andreas Wittig, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}