DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • The Rise of the Data Reliability Engineer
  • Building an ETL Pipeline With Airflow and ECS
  • AWS Data Pipeline vs Glue vs Lambda: Who Is a Clear Winner?
  • How Trustworthy Is Big Data?

Trending

  • Beyond Simple Responses: Building Truly Conversational LLM Chatbots
  • Build a Simple REST API Using Python Flask and SQLite (With Tests)
  • MySQL to PostgreSQL Database Migration: A Practical Case Study
  • Securing the Future: Best Practices for Privacy and Data Governance in LLMOps
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Data Flow Pipeline Using StreamSets

Data Flow Pipeline Using StreamSets

Learn about configuring JDBC Query Consumer, performing JDBC lookup with multiple tables, creating a data flow pipeline, and monitoring the stage and pipeline stats.

By 
Rathnadevi Manivannan user avatar
Rathnadevi Manivannan
·
Jul. 18, 17 · Tutorial
Likes (7)
Comment
Save
Tweet
Share
16.4K Views

Join the DZone community and get the full member experience.

Join For Free

StreamSets Data Collector — an open-source, lightweight, powerful engine — is used to stream data in real time. It is a continuous big data ingest and enterprise-grade infrastructure used to route and process data in your data streams. It accelerates time to analysis by bringing unique transparency and processing to data in motion.

In this blog, let's discuss generating a data flow pipeline using StreamSets.

Prerequisites

  • Install Java 1.8
  • Install streamsets-datacollector-2.5.1.1

Use Case 

Generating a data flow pipeline using StreamSets via JDBC connections.

What we need to do:

  1. Install StreamSets data collector.
  2. Create JDBC origin.
  3. Create JDBC lookup.
  4. Create a data flow pipeline.
  5. View pipeline and stage statistics.

Installing StreamSets Data Collector

As core software is developed in Java, the web interface is developed in JavaScript/Angular JS, D3.js, HTML, and CSS.

To install StreamSets, perform the following:

Installing Java

To install Java, use the below command:

sudo apt-add-repository ppa:webupd8team/java
 sudo apt-get update
 sudo apt-get install oracle-java8-installer

Use command whereis java to check Java location.

Ensure that the JAVA_HOME variable is set to:

/usr/lib/jvm/java-8-oracle

To set JAVA_HOME, use the below command:

export JAVA_HOME=/usr/lib/jvm/java-8-oracle

Installing StreamSets (From Tarball)

To install StreamSets, perform the following:

Create a directory as follows:

mkdir /home/streamsets

Download Data Collector RPM package from the StreamSets website. Then, extract the TAR file using the below command:

tar -xzf streamsets-datacollector-core-2.5.1.1.tgz.

Create a system user and group named sdc using the below commands:

sudo addgroup sdc
sudo adduser --ingroup sdc

Create the /etc/init.d directory (in root) using the below command:

# mkdir /etc/init.d

Copy /home/streamsets/streamsets-datacollector-2.5.1.1/initd/_sdcinitd_prototype to /etc/init.d directory and change ownership of the file to sdc using the below commands:

cp /home/streamsets/streamsets-datacollector-2.5.1.1/initd/_sdcinitd_prototype /etc/init.d/sdc
chown sdc:sdc /etc/init.d/sdc

Edit /etc/init.d/sdc file and set $SDC_DIST and $SDC_HOME environment variables to the location from where tarball is extracted using the below commands:

export SDC_DIST="/home/ubuntu/streamsets/streamsets-datacollector-2.5.1.1/"
export SDC_HOME="/home/ubuntu/streamsets/streamsets-datacollector-2.5.1.1/"

Make the sdc file executable using the below command:

chmod 755 /etc/init.d/sdc

Create the Data Collector configuration directory at /etc/sdc (in root) using the below command:

# mkdir /etc/sdc

Copy all files from etc into the Data Collector configuration directory that you just created and extracted the tarball using the below command:

cp -R etc/ /etc/sdc

Change the ownership of the /etc/sdc directory and all files in the directory to sdc:sdc using the below command:

chown -R sdc:sdc /etc/sdc

Provide ownership only permission for the form-realm.properties file in the /etc/sdc directory using the below command:

chmod go-rwx /etc/sdc/form-realm.properties

Create the Data Collector log directory at /var/log/sdc and change the ownership to sdc:sdc using the below commands:

mkdir /var/log/sdc
chown sdc:sdc /var/log/sdc

Create a Data Collector data directory at the path /var/lib/sdc and change the ownership to sdc:sdc using the below commands (in root):

mkdir /var/lib/sdc
chown sdc:sdc /var/lib/sdc

Create the Data Collector resources directory at /var/lib/sdc-resources and change the ownership tosdc:sdc using the below commands:

mkdir /var/lib/sdc-resources
chown sdc:sdc /var/lib/sdc-resources

Start Data Collector as a service using the below command:

service sdc start

Note: Upon getting the error called “sdc is dead” check the configured limit for the current user using the below command:

ulimit -n

Set the session limit using the below command:

ulimit -u unlimited

Access the Data Collector console by entering the following URL in the address bar of the browser: http://<system-ip>:18630/.

Note: The default username is “admin” and password is “admin”.

Creating JDBC Origin

To create a JDBC origin, perform the following steps:

  • Click Create New Pipeline to create a pipeline.
  • Add Title for the pipeline.

Note: In this analysis, the origin “JDBC Query consumer” is used.

Download the JDBC origin Package Manager as shown in the below diagram:

2Note:You can also import the package manually using the below command:

/home/streamsets/streamsets-datacollector-2.5.1.1/bin/streamsets stagelibs -install=streamsets-datacollector-jdbc-lib

Add configurations to the JDBC Query Consumer origin.

Uncheck Incremental mode in Configuration to avoid the default Query consumer search for the where and order by clauses to execute the query as shown in the below diagram:

3

Add where and order by clauses using the Offset value.

4











Click Validate to check the connection.

Note: If you are unable to connect to JDBC Query Consumer, move mysql-connector-java-5.1.27-bin.jar to the below path:

/home/streamsets/streamsets-datacollector-2.5.1.1/streamsets-libs/streamsets-datacollector-jdbc-lib/lib

Creating JDBC Lookup

To create JDBC lookup, lookup columns are required from source and lookup table. For example, use the applicantId field in the applicant (source) table to look up the applicantId column in the applicant (lookup) table using the below query:

SELECT * 
FROM   application 
WHERE  applicantid = '${record:value('/applicantid')}'

The query uses the value of an applicantId column from the applicant (source) table. In this example, three tables are used for lookup.5

The result of the above JDBC lookup is given as an input to next lookup table loan_raw by using the below query:

SELECT * 
FROM   loan_raw 
WHERE  applicationid = '${record:value('/applicationid')}'

Creating Dataflow Pipeline

Different processors” are used for creating the data flow pipeline.

Field Remover

It discards unnecessary fields in the pipeline.8

Expression Evaluator

It performs calculations and writes the results to new or existing fields. It is also used to add or modify record header attributes and field attributes.9

Stream Selector

It passes data to streams based on conditions and uses a default stream to pass records unmatched with user-defined conditions. You can also define a condition for each stream of data.7

Local FS is used to store the resultant data.10

11

 The full data flow pipeline is as follows:12

Viewing Pipeline and Stage Statistics

A pipeline can be monitored while running it. Real-time summary and error statistics can be viewed for the pipeline and for the stages in the pipeline. By default, the Data Collector console displays pipeline monitoring information while running the pipeline. Any stage can be selected to view its statistics. Similarly, error information for the pipeline and its stages can be viewed.

Previewing Dataflow Pipeline

In the Data Collector pipeline, upon clicking Preview, input and output data can be seen in each level.13

Viewing Pipeline States

Pipeline state is defined as the current condition such as "running" or "stopped" of the pipeline. The pipeline state is displayed in the All Pipelines list and Data Collector log.viewing_pipeline_states

Viewing Pipeline Statistics

Record count, record and batch throughput, batch processing statistics, and heap memory usage are displayed for the pipeline as shown in the below diagram:14

Values of the parameters currently used by the pipeline are displayed for a pipeline started with runtime parameters as shown in the below diagram:15

Viewing Stage Statistics

Record and batch throughput, batch processing statistics, and heap memory usage are displayed for a stage as shown in the below diagram:16 17

Conclusion

In this blog, we discussed configuring JDBC Query Consumer, performing JDBC lookup with more than one table, creating a data flow pipeline, and monitoring the stage statistics and pipeline statistics.

Big data Pipeline (software) Database Flow (web browser) Command (computing)

Published at DZone with permission of Rathnadevi Manivannan. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • The Rise of the Data Reliability Engineer
  • Building an ETL Pipeline With Airflow and ECS
  • AWS Data Pipeline vs Glue vs Lambda: Who Is a Clear Winner?
  • How Trustworthy Is Big Data?

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: