Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Predictive Analytics With Spark ML

DZone's Guide to

Predictive Analytics With Spark ML

Whether you're running Spark on a large cluster or embedded within a single node app, Spark makes it easy to create predictive analytics with just a few lines of code.

· AI Zone
Free Resource

Find out how AI-Fueled APIs from Neura can make interesting products more exciting and engaging. 

Spark is a fantastic cluster computing platform for big data processing. The relative ease of using Spark to mine data from a data lake or other large sources makes it a great choice for big data endeavors. While each piece of technology has its own learning curve, I have found Spark to be very easy for my engineering teams to adopt, even when making the jump from Java to Scala as the language of choice. I was delighted to see how easily our developers could learn Spark and how quickly they were able to build applications with it. 

When I started looking into big data platforms a few years ago, Hadoop and MapReduce were king. But, there was this up-and-coming technology — Apache Spark — and it was indeed going to be an easier alternative to classic MapReduce. Our developers embraced Spark, and we continue to reap the benefits of this superb platform. But what really excites me is the machine learning capabilities in Spark via its ML libraries. Spark makes it easy for a programmer to blend machine learning algorithms while taking full advantage of its big data capabilities. You quickly come to the realization that your organization can do incredible things with predictive analytics and ultimately create new growth opportunities.

In this article, I wanted to share a simple machine learning example using Spark ML. The example will demonstrate how to set up a classifier to predict “bad” documents via Spark ML. We’ll first talk about some basic but important concepts necessary for building a reliable prediction model, and then demonstrate how to do it in Spark.

Note: For our purposes here, we are going to predict if a document is good or bad. We will not go into the details of what makes a good or bad document, but instead, just know that a document can be labeled as good or bad.

While we will provide an example of how to leverage Spark ML for predictive analytics, I do want to add the disclaimer that your mileage may vary. Every problem domain can and will be different. Our example uses a single source of data to make predictions; in real life, you may need to tap into several sources to have your predictive models maintain high accuracy. Also, we are going to use the entire content of a document represented in JSON to train our classifier. This may or may not work for you in your problem domain, but it is sufficient for our example. 

When talking with people about predictive analytics, I describe predictive analytics to them like this: Predictive analytics answers a question, but to accurately answer that question, you must have reliable historical data. Why is this important? Because computers learn from historical data to make their predictions. Think of it like this — thinking of how we learned things (i.e. how we learned red is red, blue is blue, a dog is a dog, and a cat is a cat), a similar learning process is done in machine learning. Someone taught you what color is red, blue, etc. Similarly, in machine learning, we have to provide accurate examples via historical data so that the machine learning process learns from the historical data to train a classification model.

For our classifier, we would like to predict with very good accuracy, based on historical data, what future documents will be good or bad. That is my question: Given a new document, will it be good or bad? We have a history of documents and know whether or not they were good, so let’s see if we can train a classifier to answer the question.

Making Use of Semi-Structured Data

The documents we will use are semi-structured data in JSON format. In this example, I am going to make use of the entire content of the document, not just selected sections of it. Yes, we could select specific information from the document, but for this exercise, I am choosing to incorporate the entire content to see if Spark can discriminate what new documents will be good or bad. 

Consider what a document might look like in JSON:

{
        "field1" : "Field 1 information",
        "field2": "Field 2 information",
        "field3": "Field 3 information"
                       .
                       .
                       .
        "items" : [ {
                       "item" : "1",
                       "iteminfo" : "item information"
               },      .
                       .
        ],
                       .
                       .
                       .
        "notes" : "some interesting note information"
}

For our purposes, we have good documents and bad documents stored in separate folders in HDFS; for example, a “gooddocs” folder and a “baddocs” folder. The great thing about Spark is that it natively supports JSON data, we can load it directly from HDFS and create a DataFrame like so:

val theStruct = new StructType(Array(StructField("content", StringType)))
val goodDocsRdd = sqlContext
                      .read
                      .json(sc.textFile("hdfs://…/gooddocs"))
                      .persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)))
val badDocsRdd = sqlContext
                      .read
                      .json(sc.textFile("hdfs://…/baddocs"))
                      .persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)))

// create a data frame for good documents with two columns, "good" and "content"
val goodDocsDf = sqlContext
                      .createDataFrame(goodDocsRdd.map(jsonItem => Row(jsonItem)),
                                         theStruct)
                      .withColumn("good", lit("yes"))
// create a data frame for bad documents with two columns, "good" and "content"
val badDocsDf = sqlContext
                      .createDataFrame(badDocsRdd.map(jsonItem => Row(jsonItem)),
                                         theStruct)
                      .withColumn("good", lit("no"))

Above, we created two DataFrames (goodDocsDf and badDocsDf) containing the JSON data from HDFS. Please take note of the addition of a label using the withColumn method of the DataFrame. Here, each document is being labeled as “good” with a value of “yes” and is otherwise being labeled “no” for representing a bad document. In my data set, there are hundreds of thousands of documents — the more historical data that is available, the better our model should be.

Before we go any further, you might be asking why we would create separate DataFrames for good documents and bad documents. Why not have one DataFrame that contains both? For purposes of this article, I wanted to separate them out to shed light on an important subject: having a good, balanced mix of training and test data.

There are many articles and books on Spark ML, but many do not clarify an important point: You need to ensure there is a good representative historical mix of data for all labels. For example, what if bad documents represent less than 1% of actuals? Randomly splitting a combined data set might not retain the proper ratios. You will need to ensure that your training and test data have all labels represented with the right ratio of data. Your model will work best if the ratio of historical labels match what you expect to see for real.   

Now that we have our DataFrame created with the labeled historical data, let’s split it into training and test sets with a good balance of good and bad documents like so:

val goodDocsDataSetDf = goodDocsDf.randomSplit(Array(0.8, 0.2))
val badDocsDataSetDf = badDocsDf.randomSplit(Array(0.8, 0.2))

val trainingDataDf = goodDocsDataSetDf(0).unionAll(badDocsDataSetDf(0))
val testDataDf = goodDocsDataSetDf(1).unionAll(badDocsDataSetDf(1))

Above, we are separating the labeled historical data into a training set (80% of the historical data) and a test set (20% of the historical data) but also ensuring we have a proper balance of historical data for both good and bad documents since we expect the overall number of bad documents to be very small.

It is also very important to have separate sets of training and test data. There is a big difference between validating your model based on training data and testing your model with data that has not been used in training. That is the only way you can fully gauge how well your model will perform. Without separate test data, your model could perform well against the training data, but then fall short on accuracy when being used on new data.

Creating Data Features

We are going to use Spark ML to create our classification model. Spark ML nicely supports the DataFrame APIs without having to work with the lower level RDD APIs in the MLLib libraries.

Now that we have loaded and labeled training and test data, the next step is to “featurize” our labeled training data so that it can be used by a classifier and cross validator. I am handling the creation of feature data in a separate function instead of placing it into a pipeline, although you could add this into a pipeline if desired. Refer to the following featurize function to see how the data can be transformed into features:

// the parameter df is the DataFrame content we want to create features for
def featurize (df : DataFrame) : DataFrame = {

    // tokenize the content via a Tokenizer, then apply a hashing term frequencies to create 200 features
    // for purposes of our content, 200 features appears to work well, although you could test
    // varying numbers in cross validation
    val transformedDf = new HashingTF()
                          .setInputCol("contentWRD")
                          .setOutputCol("contentFTR")
                          .setNumFeatures(200)
                          .transform(new Tokenizer()
                                           .setInputCol("content")
                                           .setOutputCol("contentWRD")
                                           .transform(df))

    // apply an indexer to the label column called on_time
    // but place the index in a new column called label                                      
    if (stringIndexerModel == null) {
        stringIndexerModel = new StringIndexer()
                                     .setInputCol("good")
                                     .setOutputCol("label")   
                                    .fit(transformedDf)
        // caching our label array so we can do reverse lookup                                          
        labelMapping = stringIndexerModel.labels
    }

    // let’s index our featurized dataframe                                          
    val indexedDf = stringIndexerModel.transform(transformedDf) 

    // apply normalizer to get some accuracy “uplift”      
    val normalizedDf = new Normalizer()
                                 .setInputCol("combinedFeatures")
                                 .setOutputCol("normFeatures")
                                 .setP(1.0)
                                 .transform(assembler.transform(indexed))

    // return a data frame of label/feature rows
    normalizedDf.selectExpr("label", "normFeatures as features")
}

Now that we can generate features for our documents, we can train a classification model. Random forest classifiers perform extremely well in many diverse classification problems, although it may take some time to tune the hyperparameters to get the model to higher levels of accuracy.

In the featurize method above, I am using 200 features in hashing term frequency function. I elected to use that number after some testing using a range of different numbers, as 200 features produced the best results for me, although your results may vary based on your own data. The number of features might be something to consider calculating in a pipeline, but for our purposes here, we will separate it from the pipeline generation.

To build and train the classification model, we have a createPipelineModel method below. Comments provided in the method describe what is being done, but essentially, we are “featurizing” our training data, creating a single stage pipeline to run multiple random forest hyperparameter combinations, and running cross validation to find the best predictive model:

def createPipelineModel (trainingDataDf : DataFrame) : (PipelineModel, Transformer) = {

  // featurize our training data                         
  val featurizedTrainingDataDf = featurize(trainingDataDf)

  // create a RF classifier with a few defaults                         
  val classifier = new RandomForestClassifier()
           .setImpurity("gini")
           .setFeatureSubsetStrategy("auto")
           .setSeed(5043)

  // designate the label column name and fit the training data                               
  val model = classifier
           .setLabelCol("label")
           .fit(featurizedTrainingDataDf)

  // create a multi-class evaluator and designate its label column name                         
  val evaluator = new MulticlassClassificationEvaluator()
           .setLabelCol("good")

  // create a param grid to try varying RF hyperparameters to find best model                           
  val paramGrid = new ParamGridBuilder()
           .addGrid(classifier.maxBins, Array(10, 30))     /*(10, 30, 35, ...)*/
           .addGrid(classifier.maxDepth, Array(10, 20))    /*(10, 20, 25, ...)*/
           .addGrid(classifier.numTrees, Array(1, 35))     /*(1, 35, 38, ...)*/     
           .build()

  // we will have a single stage pipeline that will be used for cross validation                                     
  val pipeline = new Pipeline()
           .setStages(Array(classifier))

  // create a cross validator to tie together the pipeline, evaluator and param grid using 3 folds
  val cv = new CrossValidator()
           .setEstimator(pipeline)
           .setEvaluator(evaluator)
           .setEstimatorParamMaps(paramGrid)
           .setNumFolds(3)

  // fit the training data and select the best model from cross validation      
  val pipelineFittedModel = cv.fit(featurizedTrainingDataDf
                                       .selectExpr("label", 
                                                   "label as good",
                                                   "features")).bestModel

  // get the pipeline model instance and RF model
  val bestModel: Option[PipelineModel] = pipelineFittedModel match {
      case p: PipelineModel => Some(p)
      case _ => None
  }     

  val treeModel: Option[RandomForestClassificationModel] = bestModel.flatMap (x =>
        x.stages.collect {
          case t: RandomForestClassificationModel => t
        }.headOption
  )

  // this will show you what the RF tree looks like
  treeModel.foreach(x => println(x.toDebugString))

  // finally, return the pipeline model instance and RF model tuple  
  (pipelineFittedModel.asInstanceOf[PipelineModel], treeModel.getOrElse(null))
}

After the call to createPipelineModel, you will be able to generate some predictions on data with the model (the second item in the returned tuple). For example:

val (pipelineModel, model) = createPipelineModel (trainData)

model.transform(featurize(testDataDf.sample(false, 0.01))).show(1) 

The call to show(1) will show something similar to the following result, where we see the prediction results. In our case below, our prediction matched the actual label for the sample test data:

+-----+-----------------+-------------+-----------+----------+

|label|         features|rawPrediction|probability|prediction|

+-----+-----------------+-------------+-----------+----------+

|  1.0|(200,[119],[1.0])|    [0.0,1.0]|  [0.0,1.0]|       1.0|

+-----+-----------------+-------------+-----------+----------+ 

If we take the prediction value, convert it to an integer index, and look at the labelMapping array that we cached, we see that the actual label is no:

println("Good document? (yes/no): " + labelMapping(1)) 

Good document? (yes/no): no 

This prediction is correct, as the test data selected was indeed a bad document. Note: When using a StringIndexer to index labels on training data, the most prevalent label will receive the zero index. In our example, a label of 0.0 equates to yes and 1.0 equates to no. 

Validating and Tuning Your Classifier

Now that we have a classifier model, we need to validate how well it is doing against the training data. Since the classifier was trained with the training data, we would expect the classifier to be very accurate at predicting the correct label if given a data point from the training data. We can check how well the accuracy is on the training data by running it through the classifier and then creating a multiclass metrics instance on the results to see a confusion matrix, the overall precision, recall, etc. The multiclass metrics class used here is from MLLib, but it works very nicely for our purposes, and printing the confusion matrix helps to show the number of false positives/negatives if any occur.

Here is how that looks in the code:

// accepts a model (our RF model) and a data frame, which can be our test data
def validateModel(model : Transformer, data : DataFrame) : MulticlassMetrics = {

  val predsAndLabs = model.transform(featurize(data))
                            .map { row => (row.getDouble(row.fieldIndex("label")),
                                           row.getDouble(row.fieldIndex("prediction"))) }  

  // create and return the metrics
  new MulticlassMetrics(predsAndLabs)
}

// run the test data now
val metrics = validateModel(model, testDataDf)

// print out the precision and confusion matrix
// could also print recall, FMeasure, true/false positive rates if desired
println("Precision on data: " + metrics.precision)
println("========================== Confusion Matrix ===========================")
println(metrics.confusionMatrix)
println("=======================================================================")

Validating the classifier we created above with the reserved test data resulted in the following:

Precision on data: 0.992756588639712

========================== Confusion Matrix ===========================

101323.0  684.0

201.0     19972.0

======================================================================= 

Thus, the classifier provided high accuracy on the test data — 99.28% — and this classifier appears to be ready to do its job!

While you may evaluate your results in your own distinct way, I have found that a good scorecard for gauging classifier performance is by having the following grading for the accuracy of the model:

  • >= 95% is excellent
  • 95% > x > 85% is good
  • 85% > x > 75% is fair
  • 75% > x >= 70% is poor
  • < 70% is questionable to unreliable

On the training data, I would expect to see very high accuracy, but if accuracy falls short, then there are two potential issues:

  1. The classifier needs to be tuned to perform better (but watch out for overfitting).

  2. The training data does not contain the right features, or enough of them, to make a prediction model accurate.

For the latter issue, I cannot reinforce enough the importance of pertinent and accurate training data. The data must be pertinent to the problem domain; you cannot predict something about apples when using training data that is about oranges. Make sure your training (and test) data is appropriate for what you want to predict. Also, watch out for assuming too much about your training data. While you might think your data is pertinent, there may be external factors the training/test data do not reflect, hence your model will not be able to learn how to deal with those external factors that drive variance in results.

For the former issue, tuning your classifier to improve accuracy is likely going to be a common task unless you are using cross validation. In the example I provided above, the RandomForestClassifier is provided some tuning parameters up front to optimize the accuracy — this was done with trial and error. However, you can opt to use a machine learning pipeline that is available in Spark ML to determine a fitted model that will find the optimal accuracy based on the range of options you provide

Key Points to Remember

Apache Spark has very nice support for predictive analytics via Spark ML. The Spark ML APIs are easy to understand and place a lot of power into the hands of developers to create superb classification models. Whether you are running Spark on a large cluster or embedded within a single node application, Spark makes it easy to create predictive analytics with just a few lines of code.

While Spark ML makes the programmatic aspect to predictive analytics much easier to accomplish, the key to successful predictive analytics is accurately labeled historical data. Often, that is the most substantial effort required — the gathering, validation, and labeling of historical data. Keep in mind the garbage-in garbage-out principle; if your historical data isn’t labeled correctly, don’t expect your model to perform with good accuracy.

To find out how AI-Fueled APIs can increase engagement and retention, download Six Ways to Boost Engagement for Your IoT Device or App with AI today.

Topics:
apache spark ,scala ,machine learning ,predictive analytics ,ai ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}