Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Creating Real-Time Anomaly Detection Pipelines With AWS and Talend Data Streams

DZone's Guide to

Creating Real-Time Anomaly Detection Pipelines With AWS and Talend Data Streams

Want to learn more about using Data Streams? Check out this tutorial where we learn how to create a real-time detection pipeline with AWS.

· IoT Zone ·
Free Resource

Introduction

Thanks for continuing to read all of our streaming data use cases during my exploration of Talend Data Streams. For the last article of this series, I wanted to walk you through a complete IoT integration scenario using a low consumption device and leveraging only cloud services.

In my previous posts, I’ve used a Raspberry Pi and some sensors as my main devices. This single board computer is pretty powerful and you can install a light version of Linux as well. But in real life, enterprises will probably use System On Chip things, such as Arduino, PLC, ESP8266. Those SOC are less powerful, consume less energy, and are mostly programmed in C, C++, or Python. I’ll be using an ESP8266 that has embedded Wi-Fi and some GPIO to attach sensors. If you want to know more about IoT hardware, have a look at my last article "Everything You Need to Know About IoT: Hardware."

Our use case is straightforward. First, the IoT device will send sensor values to Amazon Web Services (AWS) IoT using MQTT. Then, we will create a rule in AWS IoT to redirect device payload to a Kinesis Stream. Next, from Talend Data Streams, we will connect to the Kinesis stream and transform our raw data using standard components. Finally, with the Python processor, we will create an anomaly detection model using Z-Score and all anomalies will be stored in HDFS. You can download Talend Data Streams for AWS here.

Pre-Requisites

If you want to build your pipelines along with me, here’s what you’ll need:

  • An Amazon Web Services (AWS) account
  • AWS IoT service
  • AWS Kinesis streaming service
  • AWS EMR cluster (version 5.11.1 and Hadoop 2.7.X) on the same VPC and Subnet as your Data Streams AMI.
  • Talend Data Streams from Amazon AMI Marketplace. (If you don't have one follow this tutorial: Access Data Streams through the AWS Marketplace)
  • An IoT device (can be replaced by any IoT data simulator)

High-Level Architecture

Currently, Talend Data Streams doesn’t feature an MQTT connector. In order to get around this, you’ll find an architecture sample below to leverage Talend Data Streams to ingest IoT data in real-time and storing it to a Hadoop Cluster.

Preparing Your IoT Device

As mentioned previously, I'm using an ESP8266, also called Node MCU; it has been programmed to:

  • Connect to a Wi-Fi hotspot
  • Connect securely to AWS IoT broker using the MQTT protocol
  • Read every second distance, temperature, and humidity sensor values
  • Publish over MQTT sensor values to the topic IoT

If you are interested in how to develop an MQTT client on the ESP8266, take a look at this link. However, you could use any device simulator.

IoT Infrastructure: AWS IoT and Kinesis

AWS IoT

The AWS IoT service is a secure and managed MQTT broker. In this first step, I'll walk you through registering your device, generating a public/private key, and implementing C.A. to connect securely.

First, login to your Amazon Web Services account and look for IoT. Then, select IoT Core in the list.

Register your connected thing. From the left-hand side menu, click on “Manage," select “Things,” and click on “Create."

Now, select “Create a single thing” from your list of options (alternatively, you can select "Create many things "for bulk registration of things).

Now, give your thing a name. You can also create device types, groups, and other searchable attributes. For this example, let’s keep default settings and click on next.

Now, to secure the device authentication using the "One-click certification" creation, click on "Create a Certificate."

Download all the files. Those have to be stored on the edge device and used with MQTT client to securely connect to AWS IoT. Click on "Activate," then press “Done."

In order to allow our device to publish messages and subscribe to topics, we need to attach a policy from the menu. Click on "Secure" and select "Policies," then click on "Create."

Give a name to the policy, in the action, start typing IoT and select IoT. NOTE: To allow all actions, tick the box “Allow” below and click on “Create”.

Let’s attach this policy to a certificate. From the left menu, click on “Secure” and select certificate and click on the certificate of your thing.

If you have multiple certificates, click on "Things" to make sure that the right certificate. Next, click on "Actions" and select "Attach Policy."

Select the policy we’ve just created and click on "Attach."

Your thing is now registered and can connect, publish messages and subscribe to topics securely! Let’s test it (it’s now time to turn on the ESP).

Testing Your IoT Connection in AWS

From the menu click on Test, select Subscribe to a topic, type IoT for a topic, and click on "Subscribe to Topic."

You can see that the sensor data is being sent to the IoT topic.

Setting Up AWS Kinesis

On your AWS console, search for "Kinesis" and select it.

Click on "Create data stream."

Give your stream a name and select 1 shards to start out. Later on, if you add more devices, you’ll need to increase the number of shards. Next, click on "create Kinesis stream."

Now, we are all set on the Kinesis part. Let's return back to AWS IoT, on the left menu click on "Act" and press "Create."

Name your rule, select all the attributes by typing "*," and filter on the topic IoT.

Scroll down and click on "Add Action" and select "Sends messages to an Amazon Kinesis Stream." Then, click "Configure action" at the bottom of the page.

Select the stream you’ve previously created, use an existing role, or create a new one that can access to AWS IoT. Click on "Add action" and then "Create Rule."

We are all set at this point, the sensor data collected from the device through MQTT will be redirected to the Kinesis Stream that will be the input source for our Talend Data Streams pipeline.

Cloud Data Lake: AWS EMR

Currently, with the Talend Data Streams free version, you can use HDFS, but only with an EMR cluster. In this part, I’ll describe how to provision a cluster and how to set up Data Streams to use HDFS in our pipeline.

Provision Your EMR cluster

Continuing on your AWS Console, look for EMR:

Click on "Create cluster."

Next, go to the advanced options.

Let’s choose a release that is fully compatible with Talend Data Streams. The 5.11.1 and below will do, then select the components of your choice (Hadoop, Spark, Livy and Zeppelin and Hue in my case). We are almost there, but don't click on next just yet.

In the Edit software settings, we are going to edit the core-site.xml when the cluster is provisioned, in order to use specific compression codecs required for Data Streams and to allow root impersonation.

Paste the following code into the config:

[

  {

    "Classification": "core-site",

    "Properties": {

      "io.compression.codecs": "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec",

"hadoop.proxyuser.root.hosts": "*",

"hadoop.proxyuser.root.groups": "*"


    }

  }

]

On the next step, select the same VPC, subnet as your Data Streams AMI, and click "Next." Then, name your cluster and click "Next."

Select an EC2 key pair and go with default settings for the rest and click on "Create Cluster." After a few minutes, your cluster should be up and running.

Talend Data Streams and EMR Set-Up

Still on your AWS Console, look for EC2.

You will find three new instances with blank names that we need to rename. Then, by looking at the security groups, you can identify which one is the master node.

Now, we need to connect to the master node through SSH (check that your client computer can access port 22, if not add an inbound security rule to allow your IP). Because we need to retrieve Hadoop config files, I’m using Cyberduck. Alternatively, use FileZilla or any tool that features SFTP, and use the EC2 DNS for the server, Hadoop as a user, and the related EC2 key pair to connect.

Now, using your favorite SFTP tool, connect to your Data Streams EC2 machine, then do this using the ec2-user and allow your client to access port 22. If you don't have the Data Streams free AMI yet, follow this tutorial to provision one: Access Data Streams through the AWS Marketplace.

Navigate to /opt/data-streams/extras/etc/hadoop. NOTE: The folders /etc/hadoop might not exist in /opt/data-streams/extras/ so you need to create them.

Restart your Data Streams EC2 machine so that it will start to pick up the Hadoop config files.

The last step is to allow all traffic from Data Streams to your EMR cluster and vice versa. To do so, create security rules to allow all traffic inbound on both sides for Data Streams and EMR security groups ID.

Talend Data Streams: IoT Streaming Pipeline

Now, it's time to finalize our real-time anomaly detection pipeline that uses Zscore. This pipeline is based on my previous article, so if you want to understand the math behind the scenes you should read this article.

All the infrastructure is in place and the required set up is done; we can now start building some pipelines. Now, log on to your Data Streams Free AMI using the public IP and the instance ID.

Create Your Data Sources and Add the Data Set

In this part, we will create two data sources:

  1. Our Kinesis Input Stream
  2. HDFS using our EMR cluster

From the landing page, select Connection on the left-hand side menu and click on "ADD CONNECTION."

Give a name to your connection, and for the Type select "Amazon Kinesis" in the drop-down box, complete the following:

Now, use an IAM user that has access to Kinesis with an Access key. Fill in the connection field with Access key and Secret, click on "Check connection," and click on "Validate." Now, from the left-hand side menu, select Datasets and click on "ADD DATASET."

Give your dataset a name and select the Kinesis connection that we’ve created before from the drop-down box. Select the region of your Kinesis stream, your Stream, CSV, for the format, and Semicolon for the delimiter. Once that is done, click on "View Sample" then "Validate."

Our input data source is set up and our samples are ready to be used in our pipeline. Let’s create our output data source connection — on the left-hand-side menu, select "CONNECTIONS," click on "ADD CONNECTION," and give a name to your connection. Then, select "HDFS" for the type, use the "Hadoop as User" name, and click on “Check Connection." If it says it has connected, then click on "Validate."

That should do it for now; we will create the dataset within the pipeline, but before going further, make sure that the Data Streams AMI has access to all inbound traffic to EMR Master and Slave nodes (add an inbound network security rule for EMR ec2 machine to allow all traffic from Data Streams Security group) or you will not be able to read and write to the EMR cluster.

Build Your Pipeline

From the left-hand side menu, select Pipelines and click on "Add Pipeline."

In the pipeline, on the canvas click "Create" source, select "Kinesis Stream," and click on "Select Dataset."

Back to the pipeline canvas, you can see the sample data at the bottom. As you’ve noticed incoming IoT messages are really raw at this point, let’s convert current value types (string) to number; click on the green + sign next to Kinesis component and select the "Type Converter" processor.

Let’s convert all our fields to “Integer." To do that, select the first field (.field0) and change the output type to Integer. To change the field type on the next fields, click on NEW ELEMENT. Once you have done this for all fields, click on SAVE.

Next to the Type Converter processor on your canvas, click on the green + sign and add a Windows processor; in order to calculate a Z-Score, we need to define a processing window.

Now, let’s set up our window. My ESP8266 sends sensor values every second, and I want to create a Fixed Time window that contains more or less 20 values, so I’ll choose Window duration = Window slide length = 20000 ms— don’t forget to click Save.

Since I’m only interested about Humidity, which I know is in field1, I’ll make things easier for myself later by converting the humidity row values in my window into a list of values (or array in Python) by aggregating by the field1 (humidity) data. To do this, add an aggregation processor next to the Window Z-Score component. Within the aggregation processor, choose .field1 as your Field and List as the Operation (since you will be aggregating field1 into a list).

The next step is to calculate Z-score for humidity values. In order to create a more advanced transformation, we need to use the Python processor, so next to the Aggregate processor add a Python Row processor.

Change the Map type from FLATMAP to MAP, click on the 4 arrows to open up the Python editor, and paste the code below and click SAVE. In the Data Preview, you can see what we’ve calculated in the Python processor: the average humidity, standard deviation and Z-Score array, and humidity values for the current window.

Even if the code below is simple and self-explanatory, let me sum up the different steps:

  • Calculate the average humidity within the window
  • Find the number of sensor values within the window
  • Calculate the variance
  • Calculate the standard deviation
  • Calculate Z-Score
  • Output Humidity Average, Standard Deviation, Zscore and Humidity values.
#Import Standard python libraries

import math


#average function

def mean(numbers):

    return float(sum(numbers)) / max(len(numbers), 1)


#initialize variables

std=0



#Load input list




#average value for window

avg=mean(input['humidity'])




##lenth window

mylist=input['humidity']

lon=len(mylist)


# x100 in order to workaround Python limitation

lon100=100/lon


#Calculate Variance

for i in range(len(mylist)):

    std= std + math.pow(mylist[i]-avg,2)


#Calculate Standard deviation   

stdev= math.sqrt(lon100*std/100)


#Re-import all sensor values within the window

myZscore=(input['humidity'])

#Calculate Z-Score for all sensor value within the window

for j in range(len(myZscore)):

    myZscore[j]= (myZscore[j]-avg)/stdev


#Ouput results

output['HumidityAvg']=avg

output['stdev']=stdev

output['Zscore']=myZscore


If you open up the Z-Score array, you’ll see Z-score for each sensor value.

Next to the Python processor, add a Normalize processor to flatten the Python array into records, in the column to normalize type Zscore, and select is list option then save.

Let’s now recalculate the initial humidity value from the sensor, to do that we will a python processor and write the below code :

#Ouput results

output['HumidityAvg']=input['HumidityAvg']

output['stdev']=input['stdev']

output['Zscore']=input['Zscore']

output['humidity']=round(input['Zscore']*input['stdev']+input['HumidityAvg'])


Don’t forget to change the Map type to MAP and click save. Let’s go one step further and select only the anomalies; if you had a look at my previous article, anomalies are Zscores that are outside the -2 Standard Deviation and +2 Standard deviation range, in our case the range is around -1.29 and +1.29. And now, add a FilterRow processor, the product doesn’t allow us yet to filter on range of values, so we will filter the Absolute value of the Zscore that are superior to 1.29, we test on absolute value because Zscore can be negative.

The last output shows that five records that are anomalies out of the 50 sample records. Let’s now store those anomalies to HDFS, click on “Create a Sink” on the canvas, and click on “Add Dataset." Set it up as per below and click on Validate.

You will end up with an error message, don’t worry it’s just a warning Data Streams cannot fetch sample of a file that has not been created yet. We are now all set, let’s run the pipeline by clicking on the play button on the top.

To sum up, we’ve read data from Kinesis, used Type Convertor, Aggregation, and Window processors to transform our raw data and then Python row to calculate Standard Deviation, Average, and Z-Score for each individual humidity sensor readings. Then, we’ve filtered out normal values and stored anomalies in HDFS of an EMR cluster.

That was my last article on the Data Streams for the year. Stay tuned, I’ll write the next episodes when it becomes GA in the beginning of 2019. Again, happy streaming!

Topics:
iot ,tutorial ,aws ,talend ,data streams ,connected devices ,mqtt ,sensors ,raspberry pi

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}