Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using AWS IoT (Minus the IoT) for Serverless, Async Event Systems

DZone's Guide to

Using AWS IoT (Minus the IoT) for Serverless, Async Event Systems

Even if you're not into the Internet of Things, AWS' IoT service can help. Added to the Serverless Application Model, you can receive and process events asynchronously.

· Cloud Zone
Free Resource

Site24x7 - Full stack It Infrastructure Monitoring from the cloud. Sign up for free trial.

An event system receives and processes events by following rules that are defined inside the system. All processing happens asynchronously. When an event is sent to the system, it is processed at some point in time, but you will not get an immediate response. Asynchronous processing has advantages if you want to build a scalable solution because it frees you from the burden of an immediate response. Instead, you can queue them and process them as fast as you can. In this article, I will demonstrate how the AWS IoT service can be used to process events that have nothing to do with IoT. I will also use the new Serverless Application Model (SAM) to deploy the solution. Needless to mention that the solution will be serverless and highly cost effective for workloads up to 1 mio events per day.

How AWS IoT Works

AWS IoT can do many things, but here I focus on messages, topics, rules, and actions.

message is sent to a topic. In this case, a message is an event. AWS IoT can deal with JSON so I choose JSON as my data representation.

topic has a name like event/buy and as you can see you can add a hierarchy by using up to 7 forward slashes (/).

rule subscribes to a topic and triggers actions when a message is received. As simple rule can subscribe to the topic event/buy. But you can also use wildcards like event/+ or event/*+ is used for exactly one hierarchy while * matches to any number of hierarchies.

AWS IoT comes with many built-in actions. To mention just a few:

  • Write the message to DynamoDB.
  • Save the message as a file to S3.
  • Send the message to SNS.
  • Invoke a Lambda function to process the message.

So a message is sent to a topic. If a rule matches the topic’s name, it triggers the defined actions. That’s an event system, isn’t it?

Architecture

The event system I design in this article can handle events that are generated at exchanges like buy and sell events. It is important to store all events on durable storage for archival. For some reasons buy events need to be checked for fraud. If a fraud event is detected, external systems must be notified. The following figure shows the architecture of the system.

AWS IoT Architecture

Event Flow

  1. It all starts with a HTTP API (provided by API Gateway) that triggers a Lambda function for every HTTP POST call /event.
  2. The Lambda event-api does some input validation. Depending on the payload the event is published on a topic like event/buyevent/sell, …
  3. One rule subscribes to all event/+ topics with an action to write the message to DynamoDB.
  4. Another rule subscribes only to the event/buy topic and triggers the buy-event Lambda for every message.
  5. The buy-event Lambda decides if the event is fraud or not. If it is fraud, it publishes the event to the alert/fraud topic.
  6. A rule subscribes to the alert/fraud topic and triggers two actions: Save message to S3 and send event to SNS.

Implementation

I use the brand new Serverless Application Model (SAM) for this example. SAM builds upon CloudFormation, so most of the interesting pieces happen inside file that I will name template.yml.

You can find the full source code on GitHub.

The first file defines Node.js dependencies that are needed in the Lambda functions.

Package.json

{
  "name": "sam-iot-example",
  "version": "1.0.0",
  "author": "Michael Wittig",
  "license": "MIT",
  "dependencies": {
    "uuid": "3.0.1"
  },
  "devDependencies": {
    "aws-sdk": "2.6.9"
  }
}


REST API

This is how you define a APi Gateway with SAM.

Template.yml

---
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Resources:
  EventApiLambda:
    Type: 'AWS::Serverless::Function'
    Properties:
      Handler: 'event-api-handler.create'
      Runtime: 'nodejs4.3'
      Policies:
      - Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Action: 'iot:Publish'
          Resource: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:topic/event/*'
        - Effect: Allow
          Action: 'iot:DescribeEndpoint'
          Resource: '*'
      Events:
        Http:
          Type: Api
          Properties:
            Path: /event
            Method: post


And here comes the implementation that will run inside Lambda. The name of the file must match with the Handler from above.

Event-API-Handler.js

'use strict';

const uuid = require('uuid');
const cache = require('./cache.js');

function publish(iotdata, payload) {
  return iotdata.publish({
    topic: `event/${payload.type}`,
    qos: 0,
    payload: JSON.stringify(payload),
  }).promise();
}

module.exports.create = (event, context, cb) => {
  console.log(JSON.stringify(event));
  try {
    var payload = JSON.parse(event.body); // safely parse body JSON
  } catch(err) {
    cb(null, {statusCode: 400});
    return;
  }
  if (payload.id === undefined || payload.id === null) {
    payload.id = uuid.v4(); // assign id if no id was passed in
  }
  if (payload.type === undefined || payload.type === null) {
    cb(null, {statusCode: 400});
  } else {
    cache.iotdata
      .then((iotdata) => publish(iotdata, payload))
      .then(() => cb(null, {statusCode: 204}))
      .catch((err) => cb(err));
  }
};


Event Archival

Now it’s time to create a rule that inserts the events into a DynamoDB table.

Template.yml

[...]
Resources:
  [...]
  EventTable:
    Type: 'AWS::DynamoDB::Table'
    Properties:
      AttributeDefinitions:
      - AttributeName: id
        AttributeType: S
      - AttributeName: timestamp
        AttributeType: S
      KeySchema:
      - AttributeName: id
        KeyType: HASH
      - AttributeName: timestamp
        KeyType: RANGE
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5
  EventTableRole:
    Type: 'AWS::IAM::Role'
    Properties: 
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: 'Allow'
          Principal:
            Service: 'iot.amazonaws.com'
          Action: 'sts:AssumeRole'
      Policies:
      - PolicyName: 'dynamodb'
        PolicyDocument: 
          Version: '2012-10-17'
          Statement: 
          - Effect: Allow
            Action: 'dynamodb:PutItem'
            Resource: !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${EventTable}'
  EventTableRule:
    Type: 'AWS::IoT::TopicRule'
    Properties:
      TopicRulePayload:
        Actions:
        - DynamoDB:
            HashKeyField: id
            HashKeyValue: '${id}'
            RangeKeyField: timestamp # unfortunately this field is required by CloudFormation
            RangeKeyValue: '${timestamp()}' # unfortunately this field is required by CloudFormation
            RoleArn: !GetAtt 'EventTableRole.Arn'
            TableName: !Ref EventTable
        RuleDisabled: false
        Sql: "SELECT * FROM 'event/+'"


Buy Event Lambda

This is a rule that subscribes to the event/buy topic and triggers a Lambda function for every message.

Template.yml

[...]
Resources:
  [...]
  BuyEventLambda:
    Type: 'AWS::Serverless::Function'
    Properties:
      Handler: 'buy-event-handler.analyze'
      Runtime: 'nodejs4.3'
      Policies:
      - Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Action: 'iot:Publish'
          Resource: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:topic/alert/fraud'
        - Effect: Allow
          Action: 'iot:DescribeEndpoint'
          Resource: '*'
      Events:
        IoT:
          Type: IoTRule
          Properties:
            Sql: "SELECT * FROM 'event/buy'"
            AwsIotSqlVersion: '2016-03-23'


The is the implementation of the fraud detection that runs inside Lambda.

Buy-Event-Handler.js

'use strict';

const cache = require('./cache.js');

function publish(iotdata, payload) {
  return iotdata.publish({
    topic: `alert/fraud`,
    qos: 0,
    payload: JSON.stringify(payload),
  }).promise();
}

function isFraud(payload) {
  return Promise.resolve(Math.random() < 0.5); // very simple implementation :)
}

module.exports.analyze = (payload, context, cb) => {
  console.log(JSON.stringify(payload));
  isFraud()
    .then(fraud => {
      if (fraud === true) {
        return cache.iotdata
          .then((iotdata) => publish(iotdata, payload));
      } else {
        return fraud;
      }
    })
    .then(() => cb(null, {statusCode: 204}))
    .catch((err) => cb(err));
};


Fraud Archival

And here we define what happens whit messages that are published to the alert/fraud topic.

Template.yml

[...]
Resources:
  [...]
 ArchiveFraudBucket:
    Type: 'AWS::S3::Bucket'
    Properties: {}
  ArchiveFraudS3Role:
    Type: 'AWS::IAM::Role'
    Properties: 
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: 'Allow'
          Principal:
            Service: 'iot.amazonaws.com'
          Action: 'sts:AssumeRole'
      Policies:
      - PolicyName: 's3'
        PolicyDocument: 
          Version: '2012-10-17'
          Statement: 
          - Effect: Allow
            Action: 's3:PutObject'
            Resource: !Sub 'arn:aws:s3:::${ArchiveFraudBucket}/*'
  ArchiveFraudTopic:
    Type: 'AWS::SNS::Topic'
    Properties: {}
  ArchiveFraudTopicRole:
    Type: 'AWS::IAM::Role'
    Properties: 
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: 'Allow'
          Principal:
            Service: 'iot.amazonaws.com'
          Action: 'sts:AssumeRole'
      Policies:
      - PolicyName: 's3'
        PolicyDocument: 
          Version: '2012-10-17'
          Statement: 
          - Effect: Allow
            Action: 'sns:Publish'
            Resource: !Ref ArchiveFraudTopic
  ArchiveFraudRule:
    Type: 'AWS::IoT::TopicRule'
    Properties:
      TopicRulePayload:
        Actions:
        - S3:
            BucketName: !Ref ArchiveFraudBucket
            Key: '${id}.json'
            RoleArn: !GetAtt 'ArchiveFraudS3Role.Arn'
        - Sns:
            MessageFormat: RAW
            RoleArn: !GetAtt 'ArchiveFraudTopicRole.Arn'
            TargetArn: !Ref ArchiveFraudTopic
        AwsIotSqlVersion: '2016-03-23'
        RuleDisabled: false
        Sql: "SELECT * FROM 'alert/fraud'"


And finally, we have some shared code that is needed by both Lambdas. It creates a singleton of an AWS.IotData client with the needed variable endpointAddress.

Cache.js

'use strict';

const AWS = require('aws-sdk');

const iotApiVersion = '2015-05-28';
const iot = new AWS.Iot({
  apiVersion: iotApiVersion
});

module.exports.iotdata = new Promise((resolve, reject) => {
  iot.describeEndpoint({}, (err, data) => {
    if (err) {
      reject(err);
    } else {
      resolve(new AWS.IotData({
        apiVersion: iotApiVersion,
        endpoint: data.endpointAddress
      }));
    }
  });
});


You may be surprised how little code was needed. Most of the stuff functionality is provided by AWS IoT and we only need to configure it trough the CloudFormation template and the SAM extensions.

Deploy

Make sure to update the AWS CLI. Otherwise, you may not have support for SAM:

sudo pip install --upgrade awscli

Select a region that supports the IoT service

export AWS_DEFAULT_REGION=us-east-1

Then create a artifacts bucket:

aws s3 mb s3://$USER-artifacts

Clone the repository:

git clone git@github.com:michaelwittig/sam-iot-example.git
cd sam-iot-example/

Then install the Node.js dependencies:

npm install --production

Then deploy the template using SAM:

aws cloudformation package --template-file template.yml --output-template-file template.tmp.yml --s3-bucket "$USER-artifacts"
aws cloudformation deploy --template-file template.tmp.yml --stack-name sam-iot-example --capabilities CAPABILITY_IAM

Done. You now have a running event system.

Usage

  1. Go to https://console.aws.amazon.com/apigateway/
  2. Select sam-iot-example
  3. Select /event -> POST and click Test
  4. Fill the Request Body with: {"type":"buy","price":123.45} and submit a few times
  5. Go to https://console.aws.amazon.com/dynamodb/
  6. Select Tables
  7. Select the table that starts with sam-iot-example-EventTable-
  8. Click on Items, and you would see a few events
  9. Go to https://console.aws.amazon.com/s3/
  10. Select the bucket that starts with sam-iot-example-archivefraudbucket-
  11. You should see a few fraud events

Cleanup

Remove archived events from S3 Bucket by using the AWS Management Console. The name of the bucket starts with sam-iot-example-archivefraudbucket-

Then remove the stack:

aws cloudformation delete-stack --stack-name sam-iot-example

Then remove the artifacts bucket:

aws s3 rb s3://$USER-artifacts --force
  • The Serverless Application Model (SAM) makes deploying CloudFormation templates very easy. I like it.
  • Using topics to decouple your event system is very powerful. You can always add topic rules and actions if the business process changes or just someone else is interested in an event in the system. Keep the Lambda small. Better have more small Lambdas than few big ones.
  • The event system is very cost effective for workloads up to 1 mio messages per day. If you want to process more than that, I recommend that you first calculate the costs and compare them to an architecture using Kinesis.

Site24x7 - Full stack It Infrastructure Monitoring from the cloud. Sign up for free trial.

Topics:
cloud ,aws iot ,serverless application model ,asynchronous events

Published at DZone with permission of Michael Wittig, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}