Building Ingestion Pipelines for IoT Data With Talend Data Streams

DZone 's Guide to

Building Ingestion Pipelines for IoT Data With Talend Data Streams

How to make streaming data faster: a look at using IoT data and Talend Data Streams to make streaming data integration more accessible.

· IoT Zone ·
Free Resource


This month, Talend released a new product called Talend Data Streams. Talend Data Streams was designed for data scientists, analysts, and engineers to make streaming data integration faster, easier, and more accessible. I was incredibly excited when it was finally released on the AWS marketplace and have been testing out a few use cases.

Today, I want to walk you through a simple use case of building ingestion pipelines for IoT data. I’m going to show you how to connect to your Kafka queue from Talend Data Streams, collect data from an IoT device, transform that raw data, and then store it in an S3 bucket. Let’s dive in!


If you want to build your pipelines along with me, here’s what you’ll need:

Let's Get Started: IoT With Talend Data Streams


Network Setup

I’m going to start with the network setup. Here, I have an Amazon Web Services EC2 Windows instance and I've installed Apache Zookeeper and Kafka, using the default settings and ports (Zookeeper: 2181; Kafka: 9092) as mentioned in this article: Install & setup Kafka on Windows.

A couple of things to remember as you are setting up your Kafka network:

  • On the Kafka machine, make sure that all firewalls are turned off.
  • On AWS management console, make sure that you've created inbound rules that allow all TCP and UDP traffic from your Data Streams AMI (using the security group Id of your Data Streams AMI).

Launch Kafka

Run Zookeeper, then Kafka as described in the article:

  • Launch Zookeeper
  • Launch Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties

Zookeeper and Kafka up and running.

Create Kafka Topic

Your Kafka broker is now up and running!

Setting Up Your S3 Bucket

Create Your S3 Bucket

Now, we need somewhere to put the incoming data. Let’s create that place using Amazon S3. To get started, log in to your AWS account and look for S3 service in the Storage category.

AWS Management Console

Create a new bucket from the user interface.

Next, fill in your bucket name and select a region.

Leave the default setting by clicking twice on Next then Create Bucket.

Create IAM Role for an S3 Bucket

In order to access the bucket from a third-party application, we need to create an IAM role that has access to S3 and generates the Access Key ID and Secret Access Key. It’s a critical set up in this use case. Here’s how you get that done:

From the AWS management console, look for IAM.

Select the User section and click on Add user.

Choose a username and tick the box Programmatic Access then click on Next.

To make this easy we will use existing policies for S3 with full access. To do this, select Attach existing policies and check the AmazonS3FullAccess (you can change the policy setting afterward).

Make sure your setup is correct and click on Create User.

Now, write down your access key and click on Show to see your secret key (as you will see it just once).

IoT Device: Raspberry Pi and Arduino Uno

Set up Your Device

I have a Raspberry Pi with internet connectivity over wifi. I've set it up using this tutorial: Install Raspbian using NOOBS. An Arduino Uno is connected to the Raspberry Pi over serial. It has one RGB LED, a temperature and humidity sensor, and a distance sensor.

The Arduino Uno reads temperature, humidity, and distance values from the sensors, the RGB LED color change based on the distance measured.

Send Sensor Data to Kafka

The Raspberry Pi acts as a cloud gateway and a hub that collects sensor values from the Arduino. I'm using Node-RED (embedded with Raspbian) on the Pi in order to read sensor value from serial and send them to Kafka broker.

Node-RED Flow on the Raspberry Pi

Talend Data Streams

As a reminder, if you don't have your own Data Streams AMI, follow this tutorial: Getting Started on Talend Data Streams.

Talend Data Streams is free of charge in this open source version, and you only pay for AWS storage and compute consumption. 

Create Connections

When your Data Streams AMI is up and running, you can access it using the public DNS.

On the login screen use admin for user and your AMI ID (found in the EC2 console) for the password.

Now, select the Connections section and click on Add Connection.

First, let's create our Kafka connection. Give it a name, select the type as Kafka, and fill in your broker DNS with port and click on Check Connection then Validate.

Now, create the S3 bucket connection.  Then, check the connection and Validate.

Create Your Dataset

Click on Datasets. Then, select Add Dataset. From there, select the Kafka connection that we've just created, write the topic where the IoT device is sending over data (mytopic), choose the value format (CSV) and the field delimiter (;) and, finally, click on View Sample.

Do the same for your Amazon S3 instance.

Create a Pipeline

Now that our source and target are set, it's time to create a Data Streams pipeline. Here's how to do that:

  • Click on Pipeline and press Add Pipeline.

Image title

  • Now, edit the name of your Pipeline.

  • Now, on the canvas, click on Create Source.

  • Select the dataset we created on the top of our Kafka broker and press Select Dataset.

Now, you have your first source defined on your canvas, and you can press the refresh button to retrieve new data from Kafka.

Now, click on the Green + button next to your Kafka component and select a Python Row processor.

The Python processor is used to rename the column, change data type and create an extra field based on the value of the Distance sensor. Copy and paste the Python code below and click on Save.

output = json.loads("{}")

if test <= 20:
    led = "Red"
elif test > 80:

output['Temperature'] = int(input['field0'])
output['Humidity'] = int(input['field1'])
output['Distance'] = int(input['field2'])
output['LedColor'] = led


Let's now add a sink. I'm going to use the S3 bucket connection I created earlier. Click on Createsink in the canvas.

Select the loT dataset from Amazon S3.

Now that we are all set, press the run button on the top to run your pipeline.

Just like that, we've built our first Talend Data Streams pipeline that reads from Kafka and uses Python Row to process the data that is then stored on Amazon S3.

In my next blog post, we will dig deeper into Talend Data Streams components and capabilities by leveraging this pipeline to create a real-time anomalies detection model on the humidity sensor, using the Window component and by calculating the Z-Score for each sensor value in a Python processor.  

Happy Streaming!

apache kafka, iot, kafka, talend, tutorial

Published at DZone with permission of Benoit Barranco , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}