Over a million developers have joined DZone.

Machine Learning with InsightEdge: Part II

Here is Part II to Oleksiy Dyagilev's introduction to using machine learning algorithms with InsightEdge.

· Big Data Zone

Learn how you can maximize big data in the cloud with Apache Hadoop. Download this eBook now. Brought to you in partnership with Hortonworks.

Did you miss Part I? Quick! Go read it now!

A Simple Algorithm

Now that we have training and test datasets sampled, initially preprocessed and available in the data grid, we can close Web Notebook and start experimenting with
 different techniques and algorithms by submitting Spark applications.

For our first baseline approach let’s take a single feature device_conn_type and logistic regression algorithm:

import com.gigaspaces.spark.context.GigaSpacesConfig
import com.gigaspaces.spark.implicits._
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.insightedge._
import org.apache.spark.{SparkConf, SparkContext}

object CtrDemo1 {

  def main(args: Array[String]): Unit = {
    if (args.length < 3) {
      System.err.println("Usage: CtrDemo1 <spark master url> <grid locator> <train collection>")
      System.exit(1)
    }

    val Array(master, gridLocator, trainCollection) = args

    // Configure InsightEdge settings
    val gsConfig = GigaSpacesConfig("insightedge-space", None, Some(gridLocator))
    val sc = new SparkContext(new SparkConf().setAppName("CtrDemo1").setMaster(master).setGigaSpaceConfig(gsConfig))
    val sqlContext = new SQLContext(sc)

    // load training collection from data grid
    val trainDf = sqlContext.read.grid.load(trainCollection)
    trainDf.cache()

    // use one-hot-encoder to convert 'device_conn_type' categorical feature into a vector
    val indexed = new StringIndexer()
      .setInputCol("device_conn_type")
      .setOutputCol("device_conn_type_index")
      .fit(trainDf)
      .transform(trainDf)

    val encodedDf = new OneHotEncoder()
      .setDropLast(false)
      .setInputCol("device_conn_type_index")
      .setOutputCol("device_conn_type_vector")
      .transform(indexed)

    // convert dataframe to a label points RDD
    val encodedRdd = encodedDf.map { row =>
      val label = row.getAs[Double]("click")
      val features = row.getAs[Vector]("device_conn_type_vector")
      LabeledPoint(label, features)
    }

    // Split data into training (60%) and test (40%)
    val Array(trainingRdd, testRdd) = encodedRdd.randomSplit(Array(0.6, 0.4), seed = 11L)
    trainingRdd.cache()

    // Run training algorithm to build the model
    val model = new LogisticRegressionWithLBFGS()
      .setNumClasses(2)
      .run(trainingRdd)

    // Clear the prediction threshold so the model will return probabilities
    model.clearThreshold

    // Compute raw scores on the test set
    val predictionAndLabels = testRdd.map { case LabeledPoint(label, features) =>
      val prediction = model.predict(features)
      (prediction, label)
    }

    // Instantiate metrics object
    val metrics = new BinaryClassificationMetrics(predictionAndLabels)

    val auROC = metrics.areaUnderROC
    println("Area under ROC = " + auROC)
  }
}

We will explain a little bit more what happens here.

At first, we load the training dataset from the data grid, which we prepared and saved earlier with Web Notebook.

Then we use StringIndexer and OneHotEncoder to map a column of categories to a column of binary vectors. For example, with 4 categories of device_conn_type, an input value
 of the second category would map to an output vector of [0.0, 1.0, 0.0, 0.0, 0.0].

Then we convert a dataframe to an RDD[LabeledPoint] since the LogisticRegressionWithLBFGS expects RDD as a training parameter.
 We train the logistic regression and use it to predict the click for the test dataset. Finally we compute the metrics of our classifier comparing the predicted labels with actual ones.

To build this application and submit it to the InsightEdge cluster:

sbt clean assembly
./bin/insightedge-submit --class io.insightedge.demo.ctr.CtrDemo1 --master spark://10.8.1.115:7077 --executor-memory 16G  ~/avazu_ctr/insightedge-ctr-demo-assembly-1.0.0.jar spark://10.8.1.115:7077 10.8.1.115:4174 day_21

It takes about 2 minutes for the application to complete and output the following:

Area under ROC = 0.5177127622153417

We get AUROC slightly better than a random guess (AUROC = 0.5), which is not so bad for our first approach, but we can definitely do better.

Experimenting with More Features

Let’s try to select more features and see how it affects our metrics.

For this, we created a new version of our app CtrDemo2 where we can easily select features we want to include. We use VectorAssembler to assemble multiple feature vectors into a single features one:

val assembledDf = new VectorAssembler()
  .setInputCols(categoricalColumnsVectors.toArray)
  .setOutputCol("features")
  .transform(encodedDf)

The results are the following:

  • with additionally included device_type: AUROC = 0.531015564807053
  • + time_day and time_hour: AUROC = 0.5555488992624483
  • + C15, C16, C17, C18, C19, C20, C21: AUROC = 0.7000630113145946

You can notice how the AUROC is being improved as we add more and more features. This comes with the cost of the training time:

13_application_time

We didn’t include high-cardinality features such as device_ip and device_id as they will blow up the feature vector size. One may consider applying techniques such as feature hashing
 to reduce the dimension. We will leave it out of this blog post’s scope.

Tuning Algorithm Parameters

Tuning algorithm parameters is a search problem. We will use Spark Pipeline API with a Grid Search technique.

Grid search evaluates a model for each combination of algorithm parameters specified in a grid (do not confuse with data grid).

Pipeline API supports model selection using cross-validation technique. For each set of parameters, it trains the given Estimator and evaluates it using the given Evaluator.

We will use BinaryClassificationEvaluator that has AUROC as a metric by default.

val lr = new LogisticRegression().setLabelCol("click")
val pipeline = new Pipeline().setStages(Array(assembler, lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.01 , 0.1 /*, 1.0 */))
  .addGrid(lr.elasticNetParam, Array(0.0 /*, 0.5 , 1.0 */))
  .addGrid(lr.fitIntercept, Array(false /*, true */))
  .build()

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator().setLabelCol("click"))
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)

val cvModel = cv.fit(encodedTrainDf)

We included two regularization parameters 0.01 and 0.1 in our search grid for now, others are commented out for now.

Output the best set of parameters:

println("Grid search results:")
cvModel.getEstimatorParamMaps.zip(cvModel.avgMetrics).foreach(println)

println("Best set of parameters found:" + cvModel.getEstimatorParamMaps
  .zip(cvModel.avgMetrics)
  .maxBy(_._2)
  ._1)

Use the best model to predict test dataset loaded from the data grid:

val predictionDf = cvModel.transform(encodedTestDf).select("id", "probability").map {
  case Row(id: String, probability: Vector) => (id, probability(1))
}.toDF("id", "click")

Then the results are saved back to csv on hdfs, so we can submit them to Kaggle, see the complete listing in CtrDemo3.

It takes about 27 mins to train and compare models for two regularization parameters 0.01 and 0.1. The results are:

Grid search results:
({
    logreg_2b70e3edf1f0-elasticNetParam: 0.0,
    logreg_2b70e3edf1f0-fitIntercept: false,
    logreg_2b70e3edf1f0-regParam: 0.01
},0.7000655195682837)
({
    logreg_2b70e3edf1f0-elasticNetParam: 0.0,
    logreg_2b70e3edf1f0-fitIntercept: false,
    logreg_2b70e3edf1f0-regParam: 0.1
},0.697680175815199)

Best params found:{
    logreg_2b70e3edf1f0-elasticNetParam: 0.0,
    logreg_2b70e3edf1f0-fitIntercept: false,
    logreg_2b70e3edf1f0-regParam: 0.01
}

This simple logistic regression model has a rank of 1109 out of 1603 competitors in Kaggle.

The future improvements are only limited by data science skills and creativity. One may consider:

  • implement Logarithmic Loss function as an Evaluator since it’s used by Kaggle to calculate the model score. In our example we used AUROC
  • include other features that we didn’t select
  • generate additional features such click history of a user
  • use a hashing trick to reduce the features vector dimension
  • try other machine learning algorithms, the winner of competition used Field-aware Factorization Machines

Architecture

The following diagram demonstrates the design of machine learning application with InsightEdge.

arch

The key design advantages are:

  • the single platform converges analytical processing (machine learning) powered by Spark with transactional processing powered by custom real-time applications;
  • real-time applications can execute any OLTP query (read, insert, update, delete) on training data that is immediately available for Spark analytical queries or machine learning routines. There is no need to build a complex ETL pipeline that extracts training data from OLTP database with Kafka/Flume/HDFS. Besides the complexity, an ETL pipeline introduces unwanted latency that can be a stopper for reactive machine learning apps.
     With InsightEdge, Spark applications can view the live data;
  • the training data lives in the memory of data grid, which acts as an extension of Spark memory. This way we can load the data quicker;
  • An in-memory data grid is a general-purpose highly available and fault tolerant storage. With support of ACID transactions and SQL queries it becomes the primary storage for the application;
  • InsightEdge stack is scalable in both computation (Spark) and storage (data grid) tiers. This makes it attractive for large-scale machine learning.

Summary

In this blog post, we demonstrated how to use machine learning algorithms with InsightEdge. We went through typical stages:

  • interactive data exploration with Zeppelin
  • feature selection and transformation
  • training predictive models
  • calculating model metrics
  • tuning parameters

We didn’t have a goal to build a perfect predictive model, so there is great room for improvement.

In the architecture section we discussed how the typical design may look like, what are the benefits of using InsightEdge for machine learning.

The Zeppelin notebook can be found here and submittable Spark apps here.

Hortonworks DataFlow is an integrated platform that makes data ingestion fast, easy, and secure. Download the white paper now.  Brought to you in partnership with Hortonworks

Topics:
search engine ,machine learning ,spark ,big data ,regression

Published at DZone with permission of Oleksiy Dyagilev, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}