Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Data Flow Pipeline Using StreamSets

DZone's Guide to

Data Flow Pipeline Using StreamSets

Learn about configuring JDBC Query Consumer, performing JDBC lookup with multiple tables, creating a data flow pipeline, and monitoring the stage and pipeline stats.

· Database Zone ·
Free Resource

Slow SQL Server? These SentryOne resources share tips and tricks for not only troubleshooting SQL Server performance issues, but also preventing them before they hit your production environment.

StreamSets Data Collector — an open-source, lightweight, powerful engine — is used to stream data in real time. It is a continuous big data ingest and enterprise-grade infrastructure used to route and process data in your data streams. It accelerates time to analysis by bringing unique transparency and processing to data in motion.

In this blog, let's discuss generating a data flow pipeline using StreamSets.

Prerequisites

  • Install Java 1.8
  • Install streamsets-datacollector-2.5.1.1

Use Case 

Generating a data flow pipeline using StreamSets via JDBC connections.

What we need to do:

  1. Install StreamSets data collector.
  2. Create JDBC origin.
  3. Create JDBC lookup.
  4. Create a data flow pipeline.
  5. View pipeline and stage statistics.

Installing StreamSets Data Collector

As core software is developed in Java, the web interface is developed in JavaScript/Angular JS, D3.js, HTML, and CSS.

To install StreamSets, perform the following:

Installing Java

To install Java, use the below command:

sudo apt-add-repository ppa:webupd8team/java
 sudo apt-get update
 sudo apt-get install oracle-java8-installer

Use command whereis java to check Java location.

Ensure that the JAVA_HOME variable is set to:

/usr/lib/jvm/java-8-oracle

To set JAVA_HOME, use the below command:

export JAVA_HOME=/usr/lib/jvm/java-8-oracle

Installing StreamSets (From Tarball)

To install StreamSets, perform the following:

Create a directory as follows:

mkdir /home/streamsets

Download Data Collector RPM package from the StreamSets website. Then, extract the TAR file using the below command:

tar -xzf streamsets-datacollector-core-2.5.1.1.tgz.

Create a system user and group named sdc using the below commands:

sudo addgroup sdc
sudo adduser --ingroup sdc

Create the /etc/init.d directory (in root) using the below command:

# mkdir /etc/init.d

Copy /home/streamsets/streamsets-datacollector-2.5.1.1/initd/_sdcinitd_prototype to /etc/init.d directory and change ownership of the file to sdc using the below commands:

cp /home/streamsets/streamsets-datacollector-2.5.1.1/initd/_sdcinitd_prototype /etc/init.d/sdc
chown sdc:sdc /etc/init.d/sdc

Edit /etc/init.d/sdc file and set $SDC_DIST and $SDC_HOME environment variables to the location from where tarball is extracted using the below commands:

export SDC_DIST="/home/ubuntu/streamsets/streamsets-datacollector-2.5.1.1/"
export SDC_HOME="/home/ubuntu/streamsets/streamsets-datacollector-2.5.1.1/"

Make the sdc file executable using the below command:

chmod 755 /etc/init.d/sdc

Create the Data Collector configuration directory at /etc/sdc (in root) using the below command:

# mkdir /etc/sdc

Copy all files from etc into the Data Collector configuration directory that you just created and extracted the tarball using the below command:

cp -R etc/ /etc/sdc

Change the ownership of the /etc/sdc directory and all files in the directory to sdc:sdc using the below command:

chown -R sdc:sdc /etc/sdc

Provide ownership only permission for the form-realm.properties file in the /etc/sdc directory using the below command:

chmod go-rwx /etc/sdc/form-realm.properties

Create the Data Collector log directory at /var/log/sdc and change the ownership to sdc:sdc using the below commands:

mkdir /var/log/sdc
chown sdc:sdc /var/log/sdc

Create a Data Collector data directory at the path /var/lib/sdc and change the ownership to sdc:sdc using the below commands (in root):

mkdir /var/lib/sdc
chown sdc:sdc /var/lib/sdc

Create the Data Collector resources directory at /var/lib/sdc-resources and change the ownership tosdc:sdc using the below commands:

mkdir /var/lib/sdc-resources
chown sdc:sdc /var/lib/sdc-resources

Start Data Collector as a service using the below command:

service sdc start

Note: Upon getting the error called “sdc is dead” check the configured limit for the current user using the below command:

ulimit -n

Set the session limit using the below command:

ulimit -u unlimited

Access the Data Collector console by entering the following URL in the address bar of the browser: http://<system-ip>:18630/.

Note: The default username is “admin” and password is “admin”.

Creating JDBC Origin

To create a JDBC origin, perform the following steps:

  • Click Create New Pipeline to create a pipeline.
  • Add Title for the pipeline.

Note: In this analysis, the origin “JDBC Query consumer” is used.

Download the JDBC origin Package Manager as shown in the below diagram:

2Note:You can also import the package manually using the below command:

/home/streamsets/streamsets-datacollector-2.5.1.1/bin/streamsets stagelibs -install=streamsets-datacollector-jdbc-lib

Add configurations to the JDBC Query Consumer origin.

Uncheck Incremental mode in Configuration to avoid the default Query consumer search for the where and order by clauses to execute the query as shown in the below diagram:

3

Add where and order by clauses using the Offset value.

4











Click Validate to check the connection.

Note: If you are unable to connect to JDBC Query Consumer, move mysql-connector-java-5.1.27-bin.jar to the below path:

/home/streamsets/streamsets-datacollector-2.5.1.1/streamsets-libs/streamsets-datacollector-jdbc-lib/lib

Creating JDBC Lookup

To create JDBC lookup, lookup columns are required from source and lookup table. For example, use the applicantId field in the applicant (source) table to look up the applicantId column in the applicant (lookup) table using the below query:

SELECT * 
FROM   application 
WHERE  applicantid = '${record:value('/applicantid')}'

The query uses the value of an applicantId column from the applicant (source) table. In this example, three tables are used for lookup.5

The result of the above JDBC lookup is given as an input to next lookup table loan_raw by using the below query:

SELECT * 
FROM   loan_raw 
WHERE  applicationid = '${record:value('/applicationid')}'

Creating Dataflow Pipeline

Different processors” are used for creating the data flow pipeline.

Field Remover

It discards unnecessary fields in the pipeline.8

Expression Evaluator

It performs calculations and writes the results to new or existing fields. It is also used to add or modify record header attributes and field attributes.9

Stream Selector

It passes data to streams based on conditions and uses a default stream to pass records unmatched with user-defined conditions. You can also define a condition for each stream of data.7

Local FS is used to store the resultant data.10

11

 The full data flow pipeline is as follows:12

Viewing Pipeline and Stage Statistics

A pipeline can be monitored while running it. Real-time summary and error statistics can be viewed for the pipeline and for the stages in the pipeline. By default, the Data Collector console displays pipeline monitoring information while running the pipeline. Any stage can be selected to view its statistics. Similarly, error information for the pipeline and its stages can be viewed.

Previewing Dataflow Pipeline

In the Data Collector pipeline, upon clicking Preview, input and output data can be seen in each level.13

Viewing Pipeline States

Pipeline state is defined as the current condition such as "running" or "stopped" of the pipeline. The pipeline state is displayed in the All Pipelines list and Data Collector log.viewing_pipeline_states

Viewing Pipeline Statistics

Record count, record and batch throughput, batch processing statistics, and heap memory usage are displayed for the pipeline as shown in the below diagram:14

Values of the parameters currently used by the pipeline are displayed for a pipeline started with runtime parameters as shown in the below diagram:15

Viewing Stage Statistics

Record and batch throughput, batch processing statistics, and heap memory usage are displayed for a stage as shown in the below diagram:16 17

Conclusion

In this blog, we discussed configuring JDBC Query Consumer, performing JDBC lookup with more than one table, creating a data flow pipeline, and monitoring the stage statistics and pipeline statistics.

Database monitoring tools letting you down? See how SentryOne empowers Enterprises to go faster.

Topics:
data pipeline ,jdbc ,tutorial ,database ,streamsets ,data flow ,pipeline

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}