Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Sensor Data Quality Management Using PySpark and Seaborn

DZone's Guide to

Sensor Data Quality Management Using PySpark and Seaborn

Learn how to check data for required values, validate data types, and detect integrity violation using data quality management (DQM).

· Big Data Zone ·
Free Resource

The open source HPCC Systems platform is a proven, easy to use solution for managing data at scale. Visit our Easy Guide to learn more about this completely free platform, test drive some code in the online Playground, and get started today.

Data quality management (DQM) is the process of analyzing, defining, monitoring, and improving the quality of data continuously. A few data quality dimensions widely used by the data practitioners are accuracy, completeness, consistency, timeliness, and validity. Various DQM rules are configured to apply DQM to the existing data. These DQM rules are applied to clean up, repair, and standardize incoming data and to identify and correct invalid data.

In this blog, let's check data for required values, validate data types, and detect integrity violation. DQM is applied to correct the data by providing default values, formatting numbers and dates, and removing missing values, null values, non-relevant values, duplicates, out of bounds, referential integrity violations, and value integrity violations.

Prerequisites

Install the following Python packages:

  • PySpark
  • XGBoost
  • Pandas
  • Matplotlib
  • Seaborn
  • NumPy
  • sklearn

Data Description

Sensor data from the pub-nub source is used as the source file.

  • Total record count: 6K
  • File types: JSON and CSV
  • # of columns: 11
  • # of records: 600K
  • # of duplicate records: 3.5K
  • # of NA Values:
    • Ambient temperature: 3370
    • Humidity: 345
    • Sensor IDs: 12

Sample dataset:select

Use Case

Perform data quality management on sensor data using the Python API PySpark.

Data quality management process:

select







Synopsis:

  • Data integrity

  • Data profiling

  • Data cleansing

  • Data transformation

Data Integrity

Data integrity is the process of guaranteeing the quality of the data in the database.

  • Analyzed input sensor data with:
    • 11 columns
    • 6K records
  • Validated source metadata
  • Populated relationships for an entity

Data Profiling

Data profiling is the process of discovering and analyzing enterprise metadata to discover patterns, entity relationships, data structure, and business rules. It provides statistics or informative summaries of the data to assess data issues and quality.

Few data profiling analyses include:

  • Completeness analysis: Analyze frequency of attribute population versus blank or null values.
  • Uniqueness analysis: Analyze and find unique or distinct values and duplicate values for a given attribute across all records.
  • Values distribution analysis: Analyze and find the distribution of records across different values of a given attribute.
  • Range analysis: Analyze and find minimum, maximum, median, and average values of a given attribute.
  • Pattern analysis: Analyze and find character patterns and pattern frequency.

Generating Profile Reports

To generate profile reports, use either Pandas profiling or PySpark data profiling using the below commands:

Pandas profiling:

import pandas as pd
import pandas_profiling
import numpy as np

#Read the source file that contains sensor data details
df= pd.read_json('E:\sensor_data.json', lines=True)

#Preprocessing on data
df = df.replace(r'\s+', np.nan, regex=True)
df['ambient_temperature']= df['ambient_temperature'].astype(float)
df['humidity'] = df['humidity'].astype(float)

#Generate profile report using pandas_profiling
report = pandas_profiling.ProfileReport(df)

#covert profile report as html file
report.to_file("E:\sensor_data.html")

PySpark profiling:

import pandas as pd
import spark_df_profiling
import numpy as np

#Initializing PySpark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

#Spark Config
conf = SparkConf().setAppName("sample_app")
sc = SparkContext(conf=conf)
sql = SQLContext(sc)

# Loading transaction Data
sensor_data_df = sql.read.format("com.databricks.spark.csv").option("header", "true").load("E:\spireon\Data\ganga\sensor_data.csv")
report = spark_df_profiling.ProfileReport(sensor_data_df)
report.to_file("E:\spireon\Data\ganga\pyspark_sensor_data_profiling_v2.html")

The profile report provides the following details:

  • Essentials: Type, unique values, missing values
  • Quantile statistics: Minimum value, Q1, median, Q3, maximum, range, interquartile range
  • Descriptive statistics: Mean, mode, standard deviation, sum, median absolute deviation, coefficient of variation, kurtosis, and skewness
  • Most frequent values
  • Histogram

Profile report overview:

select

The sample profile report for a single attribute (ambient temperature) is as follows:

Ambient temperature statistics:

select

Ambient temperature histogram:

select

Ambient temperature extreme values:

select

To view the complete profile report, see the References section.

Data Cleansing

Data cleansing is the process of identifying incomplete, incorrect, inaccurate, duplicate, or irrelevant data and modifying, replacing, or deleting dirty data.

select

  • Analyzed the number of null (NaN) values in the dataset using the command df.isnull().sum().

The number of null values is as follows:select

  • Deleted NaN values in String type columns using the below command:

df_v1 = df.dropna(subset=['sensor_id', 'sensor_name','sensor_uuid'], how='all')
df_v1.isnull().sum()
  • Imputed missing values using one of the below methods.

Method 1: Impute Package

Imputation is defined as the process of replacing the missing data with substituted values using any of the following options:

  • most_frequent: Columns of the dtype object (String) are imputed with the most frequent values in the column as mean or median cannot be found for this data type.
  • Mean: Ratio of the sum of elements to the number of elements in the list.
  • Median: Ratio of the sum of middle two numbers to two.

Note: If the missing values in the records are negligible, ignore those records.

In our use case, the most_frequent strategy is used for substituting the missing values using the below command:

imputer=Imputer(missing_values='NaN',strategy='most_frequent', axis=0)
imputer=imputer.fit(df_v1.ix[:,[2,3,4,5,6]])
df_v1.ix[:,[2,3,4,5,6]] =imputer.transform(df_v1.ix[:,[2,3,4,5,6]])

Method 2: Linear Regression Model

To replace the missing data with the substituted values using the linear regression model, use the below commands:

from sklearn.linear_model import LinearRegression,LogisticRegression

# Split values into sets with known and unknown ambient_temperature values
df_v2 = df_v1[["ambient_temperature","humidity","photosensor","radiation_level"]]
knownTemperature = df_v2.loc[(df_v1.ambient_temperature.notnull())]
unknownTemperature = df_v2.loc[(df_v1.ambient_temperature.isnull())]

# All ambient_temperature values stored in a target array
Y = knownTemperature.values[:, 0]

# All the other values stored in the feature array
X = knownTemperature.values[:,1::]

# Create and fit a linear regression model
linear_regression = LinearRegression()
linear_regression.fit(X, Y)

# Use the fitted regression model to predict the missing values
predictedTemperature = linear_regression.predict(unknownTemperature.values[:, 1::])

# Assign those predicted values to the full data set
df_v1.loc[ (df_v1.ambient_temperature.isnull()), 'ambient_temperature' ] = predictedTemperature

Data Transformation

Data transformation deals with converting data from the source format into the required destination format.

select

  • Converted attributes such as ambient_temperature and humidity from object type to float type using the below command:

#Preprocessing on data transformation
df = df.replace(r'\s+', np.nan, regex=True)
df['ambient_temperature']= df['ambient_temperature'].astype(float)
df['humidity'] = df['humidity'].astype(float)
  • Converted a non_numeric value of sensor_name into numeric data using the below command:
labelencoder_X=LabelEncoder()
labelencoder_X.fit(df_v1.ix[:,6])
list(labelencoder_X.classes_)
df_v1.ix[:,6] = labelencoder_X.transform(df_v1.ix[:,6])
  • Converted a non_numeric sensor name into numeric data using the below command:
labelencoder_y=LabelEncoder()
labelencoder_y.fit(df_v1.ix[:,4])
list(labelencoder_y.classes_)
df_v1.ix[:,4] = labelencoder_y.transform(df_v1.ix[:,4])
  • Converted a non_numeric value of sensor ID into numeric data using the below command:
labelencoder_z=LabelEncoder() 
labelencoder_z.fit(df_v1.ix[:,5])
list(labelencoder_z.classes_) 
df_v1.ix[:,5] = labelencoder_z.transform(df_v1.ix[:,5])
  • Based on the above transformation, find feature importance using built-in function using the below commands:
# plot feature importance using built-in function<br> from numpy import loadtxt<br> from xgboost import XGBClassifier<br> from xgboost import plot_importance<br> from matplotlib import pyplot

# split data into X and y
X = df_v1.ix[:,[0,1,2,3,4,5,6,7,10]]
Y = df_v1.ix[:,[10]]
plt.clf()

# fit model no training data
model = XGBClassifier()
model.fit(X, Y)

# plot feature importance
plot_importance(model)
plt.gcf().subplots_adjust(bottom=0.15)
plt.tight_layout()
plt.show()

Feature importance chart:

select

From the above diagram, it is evident that photosensor feature has the highest importance and the latitude feature has the lowest importance.

Correlation Analysis

Performed correlation analysis to explore data relationships and data correlations to highlight weak data relationships and find potential incorrect relationships. The correlation analysis between the sensor data variables is shown in the below diagram:select

From the above diagram, it is evident that the ambient_temperature is highly correlated with the dewpoint and humidity and the latitude and longitude are negatively correlated as per the correlation analysis.

Reference

Managing data at scale doesn’t have to be hard. Find out how the completely free, open source HPCC Systems platform makes it easier to update, easier to program, easier to integrate data, and easier to manage clusters. Download and get started today.

Topics:
data quality ,big data ,pyspark ,seaborn ,data management ,tutorial ,lienar regression ,data profiling ,data integrity ,sensor data

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}