Data Quality Checks With StreamSets Using Drift Rules
Learn how to create a dataflow pipeline to check the quality of source data and load the data into HDFS using StreamSets.
Join the DZone community and get the full member experience.
Join For FreeIn the world of big data, data drift has emerged as a critical technical challenge for data scientists and engineers in unleashing the power of data. It delays businesses from gaining real-time actionable business insights and making more informed business decisions.
StreamSets is not only used for big data ingestion but also for analyzing real-time streaming data. It is used to identify null or bad data in source data and filter out the bad data from the source data in order to get precise results. It also helps the businesses in making quick and accurate decisions.
In this blog, let's discuss checking the quality of data using data rules and data drift rules in StreamSets.
Prerequisites
- Install Java 1.8
- Install streamsets-datacollector-2.6.0.1
Use Case
Create a dataflow pipeline to check the quality of source data and load the data into HDFS using StreamSets.
Data Description
Network data of outdoor field sensors is used as the source file. Additional fields, dummy data, empty data, and duplicate data were added to the source file.
The dataset has total record count of 600K.
Sample Data
{"ambient_temperature":"16.70","datetime":"Wed Aug 30 18:42:45 IST 2017","humidity":"76.4517","lat":36.17,"lng":-119.7462,"photo_sensor":"1003.3","radiation_level":"201","sensor_id":"c6698873b4f14b995c9e66ad0d8f29e3","sensor_name":"California","sensor_uuid":"probe-2a2515fc","timestamp":1504098765}
Synopsis
- Read data from local file system
- Configure data drift rules and alerts
- Convert data types
- Configure data rules and alerts
- Derive fields
- Load data into HDFS
- Get alerts during data quality checks
- Visualize data in motion
Reading Data From Local File System
To read data from the local file system, perform the following:
- Create a new pipeline.
- Configure Directory origin to read files from a directory.
- Set Batch Size (recs) as 1 to read records one by one to easily analyze data and get accurate results.
- Set Data Format as JSON.
- Select JSON content as Multiple JSON objects.
Configuring Data Drift Rules and Alerts
To configure data drift rules and alerts, perform the following:
- Gather details about data drift as and when data passes between two stages.
- Provide meters and alerts.
- Create data drift rules to indicate data structure changes.
- Click Add to add the conditions in the links between the stages. Few conditions applied are:
- Alerts when field name varies between two subsequent JSON records.
- Function:
drift:names(, )
- For example:
${drift:names('/', false)}
- For example:
- Function:
- Alerts when number of fields vary between two subsequent JSON records.
- Function:
drift:size(, )
- For example:
${drift:size('/', false)}
- For example:
- Function:
- Alerts when data type of specified field changes and specified field is missing (i.e.) Double-String, String-Integer.
- Function:
drift:type(, )
- For example:
${drift:type('/photo_sensor', false)}
- For example:
- Function:
- Alerts when order of fields varies between two subsequent JSON records.
- Function:
drift:order(, )
- For example:
${drift:order('/', false)}
- For example:
- Function:
- Alerts when String is Empty.
- For example:
${record:value('/photo_sensor')==""}
- For example:
- Click Activate to activate all the rules.
Converting Data Types
To analyze data and apply data rules, convert data with String data type into Decimal or Integer type.
For example, convert String data type of “humidity” data ("humidity":"76.4517") in the source data into Double type ("humidity":76.4517):
Configuring Data Rules and Alerts
To configure data rules and alerts, perform the following:
- Click Add to add the conditions in data rules and data drift rules in the links between stages.
- Apply data rules for attributes.
- For example:
${record:value('/humidity') < 66.2353
andrecord:value('/humidity')>92.4165}
- For example:
Deriving Fields
To derive a new field using the Expression Evaluator processor, add the below language expression in Field Expression:
if ambient_temperature < 20 and humidity > 90:
return 'Anomaly'
else if ambient_temperature > 20 and ambient_temperature < 30 and humidity > 80 and humidity < 90:
return 'Suspicious'
else:
return 'Normal'
For example, if the derived field is /prediction
, the expression is:
${record:value('/ambient_temperature') < 20 and record:value('/humidity') > 90? "Anomaly": (record:value('/ambient_temperature') > 20 and record:value('/ambient_temperature') < 30 and record:value('/humidity') > 80 and record:value('/humidity') < 90? "Suspicious": "Normal")}
Use the Stream Selector processor to split records with different conditions:
${record:value('/prediction')=="Suspicious"} and ${record:value('/prediction')=="Anomaly"}
Loading Data Into HDFS
To load data into HDFS, perform the following:
- Configure the Hadoop FS destination processor.
- Select data format as JSON.
Note: Hadoop-conf directory (/var/lib/sdc-resources/hadoop-conf
) contains core-site.xml
and hdfs-site.xml
files. The sdc-resources directory will be created while installing StreamSets.
Getting Alerts During Data Quality Checks
We can get alerts while data is in motion and get an alert summary on detecting data anomalies.
Alerts while data in motion:
Alert summary on detecting data anomalies:
Visualizing Data in Motion
We can get record summary statistics and record count in-out statistics.
Record summary statistics:
Record count in/out statistics:
References
Published at DZone with permission of Rathnadevi Manivannan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments