How-to Predict Telco Churn With Apache Spark MLlib

DZone 's Guide to

How-to Predict Telco Churn With Apache Spark MLlib

Spark MLLib is growing in popularity for machine-learning model development due to its elegance and usability. In this post, you’ll learn why.

· Big Data Zone ·
Free Resource

Spark MLLib is a library for performing machine-learning and associated tasks on massive datasets. With MLlib, fitting a machine-learning model to a billion observations can take a couple lines of code and leverage hundreds of machines. MLlib greatly simplifies the model development process.

In this post, we’ll use MLlib to fit a machine-learning model that can predict which customers of a telecommunications company are likely to stop using their service. Churn prediction, is one of the most common applications of machine learning in the telecommunications industry, as well as many other subscriptions-based industries.

We’ll carry out our analysis and modeling using the Python programming language, and we’ll apply a variety of connected tools for the task. To load and manipulate the data, we’ll make use of Spark’s DataFrames API. To perform feature engineering, model fitting, and model evaluation, we’ll use Spark’s ML Pipelines API. (The core of MLlib is shipped inside CDH 5.5 supported in Cloudera Enterprise 5.5, but ML Pipelines will not be supported until a future release.)

This post is based off of the material we presented at our “Data Science for Telecom” tutorial at Strata + Hadoop World Singapore 2015. The full source code, with outputs, is available in an IPython notebook. The repository also contains a script showing how one might launch an IPython notebook with the required dependencies on a CDH cluster.

Loading the Data With Spark DataFrames

We’ll fit our model to a churn dataset provided by the UC Irvine machine-learning repository hosted by SGI. In this dataset, each record contains information corresponding to a single subscriber, as well as whether that subscriber went on to stop using the service.

The dataset contains only 5,000 observations, i.e. subscribers, many orders of magnitude smaller than what Spark can handle, but playing with data of this size makes it easy to try out the tools on a laptop.

The full set of fields, from the data subscription, are:

account length
area code
phone number
international plan
voice mail plan
number vmail messages
total day minutes
total day calls
total day charge
total eve minutes
total eve calls
total eve charge
total night minutes
total night calls
total night charge
total intl minutes
total intl calls
total intl charge
number customer service calls

The last field, “churned”, a categorical variable that can take the values “true” or “false”, is the label we would like to predict. The rest of the fields are fair game for use in creating independent variables, which are used in combination with a model to generate predictions.

To load this data into a Spark DataFrame, we just need to tell Spark the type of each field. We use the spark-csv package, which lives outside of the main Spark project, to interpret CSV-formatted data:

from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
schema = StructType([ \
    StructField("state", StringType(), True), \
    StructField("account_length", DoubleType(), True), \
    StructField("area_code", StringType(), True), \
    StructField("phone_number", StringType(), True), \
    StructField("intl_plan", StringType(), True), \
    StructField("voice_mail_plan", StringType(), True), \
    StructField("number_vmail_messages", DoubleType(), True), \
    StructField("total_day_minutes", DoubleType(), True), \
    StructField("total_day_calls", DoubleType(), True), \
    StructField("total_day_charge", DoubleType(), True), \
    StructField("total_eve_minutes", DoubleType(), True), \
    StructField("total_eve_calls", DoubleType(), True), \
    StructField("total_eve_charge", DoubleType(), True), \
    StructField("total_night_minutes", DoubleType(), True), \
    StructField("total_night_calls", DoubleType(), True), \
    StructField("total_night_charge", DoubleType(), True), \
    StructField("total_intl_minutes", DoubleType(), True), \
    StructField("total_intl_calls", DoubleType(), True), \
    StructField("total_intl_charge", DoubleType(), True), \
    StructField("number_customer_service_calls", DoubleType(), True), \
    StructField("churned", StringType(), True)])

churn_data = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load('churn.all', schema = schema)

Fitting a Machine-Learning Model

MLlib provides a set of algorithms for fitting machine-learning models to large data sets and performing related statistical processing. In particular, here we’ll make use of the ML Pipelines API, which is a framework for taking data in DataFrames, applying transformations to extract features, and feeding the extracted data into machine learning algorithms. We will use MLlib to train and evaluate a Random Forest model that can predict whether a user is likely to churn.

The broad flow of supervised machine-learning model development and evaluation looks like this:


The flow starts with a dataset, composed of columns with possibly a variety of types. In our case, this is the churn_data we created in the section above. We then perform feature extraction on this data to transform it into a set of feature vectors and labels. A feature vector is an array of floating point values representing the independent variables our model can use to make a prediction. A label is a single floating point value representing the dependent variable that our machine learning algorithm is trying to predict. In binary classification problems such as ours, we use 0.0 and 1.0 to represent the two possible predictions. In our case, 0.0 means “will not churn” and 1.0 means “will churn.”

Feature extraction refers to a wide set of possible transformations we might care to conduct produce feature vectors and labels from the input data. In our case, like to take categorical variables that are represented in the input data as strings, like intl_plan, and index them to turn them into numbers. 

We’d like select a subset of the columns. For example, we don’t expect that phone_number is likely to be a very useful feature, so we can leave it out of our model, but total_day_calls is likely to be, so we’d like to include it. We incorporate these transformation steps into our pipeline by defining two stages: StringIndexer and VectorAssembler.

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

label_indexer = StringIndexer(inputCol = 'churned', outputCol = 'label')
plan_indexer = StringIndexer(inputCol = 'intl_plan', outputCol = 'intl_plan_indexed')

reduced_numeric_cols = ["account_length", "number_vmail_messages", "total_day_calls",
                        "total_day_charge", "total_eve_calls", "total_eve_charge",
                        "total_night_calls", "total_intl_calls", "total_intl_charge"]

assembler = VectorAssembler(
    inputCols = ['intl_plan_indexed'] + reduced_numeric_cols,
    outputCol = 'features')

Having extracted features, our next step is to split up our dataset into train and test sets. The train set will be used by the machine learning algorithm to fit the model. The test set will be used to evaluate the model:

(train, test) = churn_data.randomSplit([0.7, 0.3])

Now we can assemble our pipeline and finally fit the model. An advantage of defining a pipeline is that you know that the same code is getting applied for the feature-extraction step. With MLlib, this is a few short lines of code!

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
pipeline = Pipeline(stages=[plan_indexer, label_indexer, assembler, classifier])
model = pipeline.fit(train)

Validating the Model

How do we know whether the model we’ve trained is a good one? Can we show that the predictions it produces are better than random guessing? For binary classification models, a useful evaluation metric is the area under the ROC curve. An ROC curve is created by taking a binary classification predictor that uses a threshold value to assign labels given predicted continuous values. As you vary the threshold for a model you cover from the two extremes, when the true positive rate (TPR) and the false positive rate (FPR) are both 0 because everything is labeled “not churned” and when both the TPR and FPR are both 1 because everything is labeled “churned.”

A random predictor that labels a customer as churned half the time and not churned the other half would have a ROC that was a straight diagonal line. This line cuts the unit square into two equally-sized triangles, so the area under the curve is 0.5. An AUROC value of 0.5 would mean that your predictor was no better as discriminating between the two classes than random guessing. The closer the value is to 1.0, the better its predictions are. A value below 0.5 indicates that we could actually make our model produce better predictions by reversing the answer it gives us.

MLlib also makes computing the AUROC exceedingly easy. If we were to compute the ROC curve based on all of our data, our classification evaluation metrics would be overly optimistic because we would be evaluating a model with the same data we trained on. We perform model evaluation only on our test set to avoid overly optimistic model evaluation metrics (like AUROC) as well as to help us avoid overfitting.

from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

In this case we produce an area under the curve greater than 0.8, indicating that the model’s results are reasonably good, and definitely better than random guessing.


This post provides just one example of a possible use case for MLlib. For more examples of how-tos around machine learning and Spark in general, see this list.

Juliet Hougland is a Data Scientist at Cloudera, and contributor/committer/maintainer for the Sparkling Pandas project.

Sandy Ryza is a Data Scientist at Cloudera, and a committer to the Apache Spark and Apache Hadoop projects. He is a co-author of Advanced Analytics with Spark, from O’Reilly Media.

big data, csv, java, scala, spark

Published at DZone with permission of Jeffrey Shmain , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}