Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Machine Learning with InsightEdge: Part I

DZone's Guide to

Machine Learning with InsightEdge: Part I

Here is an introduction to using machine learning algorithms with InsightEdge and an exercise to predict mobile advertisement click-through rate with Avazu’s dataset to better understand machine learning algorithms.

· Big Data Zone
Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

Machine learning has become quite the popular buzzword these days. Within the field of data analytics, machine learning is a method used to devise complex models and algorithms that lend themselves to prediction – in commercial use, this is known as predictive analytics.

In this blog post, I will provide an introduction to using machine learning algorithms with InsightEdge. We will also go through an exercise to predict mobile advertisement click-through rate with Avazu’s dataset to better understand machine learning algorithms.

Scalable Machine Learning with InsightEdge: Mobile Ads Clicks Prediction

There are several compensation models in online advertising industry, probably the most notable is CPC (Cost Per Click), in which an advertiser pays a publisher when the ad is clicked.

Search engine advertising is one of the most popular forms of CPC. It allows advertisers to bid for ad placement in a search engine’s sponsored links when someone searches on a keyword that is related to their business offering.

For the search engines like Google, advertising is one of the main sources of their revenue. The challenge for the advertising system is to determine what ad should be displayed for each query that the search engine receives.

The revenue associated with such can be quantified as:

revenue = bid * probability_of_click

The goal is to maximize the revenue for every search engine query. While the bid is a known value, the probability_of_click is not. Thus, predicting the probability of click becomes the key task.

Working on a machine learning problem involves a lot of experiments with feature selection, feature transformation, training different models and tuning parameters.
While there are a few excellent machine learning libraries for Python and R, like scikit-learn, their capabilities are typically limited to relatively small datasets that you fit on a single machine.

With the large datasets and/or CPU intensive workloads you may want to scale out beyond a single machine. This is one of the key benefits of InsightEdge, since it’s able to scale the computation and data storage layers across many machines under one unified cluster.

Loading Data Using Avazu Dataset

The dataset consists of:

  • train (5.9G) – Training set. 10 days of click-through data, ordered chronologically. Non-clicks and clicks are subsampled according to different strategies.
  • test (674M) – Test set. 1 day of ads to test model predictions.

At first, we want to launch InsightEdge.

To explore the data quickly, one can launch InsightEdge on a single node.
Though for the big datasets or compute-intensive tasks the resources of a single node might not be enough.

For this problem we will setup a cluster with four workers and place the downloaded files on HDFS.

0_cluster

Let’s open the interactive Web Notebook and start exploring our dataset.

The dataset is in CSV format, so we will use databricks CSV library to load it from HDFS into the Spark dataframe:

%dep
z.load("com.databricks:spark-csv_2.10:1.3.0")

Load the dataframe into Spark memory and cache:

val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "false")
      .load("hdfs://10.8.1.116/data/avazu_ctr/train")

df.cache()

Exploring the Data

Now that we have the dataset in Spark memory, we can read the first rows:

+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
|                  id|click|    hour|  C1|banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|  C14|C15|C16| C17|C18|C19|   C20|C21|
+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
| 1000009418151094273|    0|14102100|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| ddd2926e|    44956a24|          1|               2|15706|320| 50|1722|  0| 35|    -1| 79|
|10000169349117863715|    0|14102100|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| 96809ac8|    711ee120|          1|               0|15704|320| 50|1722|  0| 35|100084| 79|
|10000371904215119486|    0|14102100|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| b3cf8def|    8a4875bd|          1|               0|15704|320| 50|1722|  0| 35|100084| 79|
|10000640724480838376|    0|14102100|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| e8275b8f|    6332421a|          1|               0|15706|320| 50|1722|  0| 35|100084| 79|
|10000679056417042096|    0|14102100|1005|         1|fe8cc448|   9166c161|     0569f928|ecad2386|  7801e8d9|    07d7df22| a99f214a| 9644d0bf|    779d90c2|          1|               0|18993|320| 50|2161|  0| 35|    -1|157|
|10000720757801103869|    0|14102100|1005|         0|d6137915|   bb1ef334|     f028772b|ecad2386|  7801e8d9|    07d7df22| a99f214a| 05241af0|    8a4875bd|          1|               0|16920|320| 50|1899|  0|431|100077|117|
|10000724729988544911|    0|14102100|1005|         0|8fda644b|   25d4cfcd|     f028772b|ecad2386|  7801e8d9|    07d7df22| a99f214a| b264c159|    be6db1d7|          1|               0|20362|320| 50|2333|  0| 39|    -1|157|
|10000918755742328737|    0|14102100|1005|         1|e151e245|   7e091613|     f028772b|ecad2386|  7801e8d9|    07d7df22| a99f214a| e6f67278|    be74e6fe|          1|               0|20632|320| 50|2374|  3| 39|    -1| 23|
|10000949271186029916|    1|14102100|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| 37e8da74|    5db079b5|          1|               2|15707|320| 50|1722|  0| 35|    -1| 79|
|10001264480619467364|    0|14102100|1002|         0|84c7ba46|   c4e18dd6|     50e219e0|ecad2386|  7801e8d9|    07d7df22| c357dbff| f1ac7184|    373ecbe6|          0|               0|21689|320| 50|2496|  3|167|100191| 23|
+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
only showing top 10 rows

The data fields are:

  • id: ad identifier
  • click: 0/1 for non-click/click
  • hour: format is YYMMDDHH
  • C1: anonymized categorical variable
  • banner_pos
  • site_id
  • site_domain
  • site_category
  • app_id
  • app_domain
  • app_category
  • device_id
  • device_ip
  • device_model
  • device_type
  • device_conn_type
  • C14-C21 — anonymized categorical variables

Let’s see how many rows are in the training dataset:

val totalCount = df.count()

totalCount: Long = 40428967

There are about 40M+ rows in the dataset.

Let’s now calculate the CTR (click-through rate) of the dataset. The click-through rate is the number of times a click is made on the advertisement divided by the total impressions (the number of times an advertisement was served):

val clicks = df.filter("click = 1").count()
val ctr = clicks.toFloat / totalCount

clicks: Long = 6865066
ctr: Float = 0.16980562

The CTR is 0.169 (or 16.9%) which is quite a high number, the common value in the industry is about 0.2-0.3%. So a high value is probably because non-clicks and clicks are sub-sampled according to different strategies, as stated by Avazu.

Now, the question is which features should we use to create a predictive model? This is a difficult question that requires a deep knowledge of the problem domain. Let’s try to learn it from the dataset we have.

For example, let’s explore the device_conn_type feature. Our assumption might be that this is a categorical variable like Wi-Fi, 2G, 3G or LTE. This might be a relevant feature since clicking on an ad with a slow connection is not something common.

At first, we register the dataframe as a SQL table:

df.registerTempTable("training")

and run the SQL query:

%sql
SELECT device_conn_type, SUM(click) as clicks_num, COUNT(click) as impression, SUM(click)/COUNT(click) as ctr
FROM training
GROUP BY device_conn_type

6_device_conn_type

7_device_conn_type_2

We see that there are four connection type categories. Two categories with CTR 18% and 13%, and the first one is almost 90% of the whole dataset. The other two categories have significantly lower CTR.

Another observation we may notice is that features C15 and C16 look like the ad size:

%sql
SELECT C15, C16, COUNT(click) as impression, SUM(click)/COUNT(click) as ctr
FROM training
GROUP BY C15, C16
ORDER BY ctr DESC

11_banner_dimension

We can notice some correlation between the ad size and its performance. The most common one appears to be 320x50px known as “mobile leaderboard” in Google AdSense.

What about other features? All of them represent categorical values, how many unique categories for each feature?

df.columns.map(c => (c, df.select(c).distinct().count()))

res14: Array[(String, Long)] = Array((id,40428967), (click,2), (hour,240), (C1,7), (banner_pos,7), (site_id,4737), (site_domain,7745), (site_category,26), (app_id,8552), (app_domain,559), (app_category,36), (device_id,2686408), (device_ip,6729486), (device_model,8251), (device_type,5), (device_conn_type,4), (C14,2626), (C15,8), (C16,9), (C17,435), (C18,4), (C19,68), (C20,172), (C21,60))

We see that there are some features with a lot of unique values, for example, device_ip has 6M+ different values.

Machine learning algorithms are typically defined in terms of numerical vectors rather than categorical values. Converting such categorical features will result in a high dimensional vector which might be very expensive.

We will need to deal with this later.

Processing and Transforming the Data

Looking further at the dataset, we can see that the hour feature is in YYMMDDHH format.
 To allow the predictive model to effectively learn from this feature it makes sense to transform it into three features: year, month and hour.

Let’s develop the function to transform the data-frame:

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.apache.spark.sql.DataFrame

object DateUtils {
  val dateFormat = new ThreadLocal[SimpleDateFormat]() {
    override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyMMddHH")
  }

  def parse(s: String, field: Int): Int = {
    val date = dateFormat.get().parse(s)
    val cal = Calendar.getInstance()
    cal.setTime(date)
    cal.get(field)
  }
}

def transformHour(df: DataFrame): DataFrame = {
  val toYear = udf[Int, String](s => DateUtils.parse(s, Calendar.YEAR))
  val toMonth = udf[Int, String](s => DateUtils.parse(s, Calendar.MONTH))
  val toDay = udf[Int, String](s => DateUtils.parse(s, Calendar.DAY_OF_MONTH))
  val toHour = udf[Int, String](s => DateUtils.parse(s, Calendar.HOUR_OF_DAY))

  df.withColumn("time_year", toYear(df("hour")))
  .withColumn("time_month", toMonth(df("hour")))
  .withColumn("time_day", toDay(df("hour")))
  .withColumn("time_hour", toHour(df("hour")))
  .drop("hour")
}

We can now apply this transformation to our data-frame and see the result:

val hourDecoded = transformHour(df)
hourDecoded.cache()
hourDecoded.show(10)
+--------------------+-----+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+---------+----------+--------+---------+
|                  id|click|  C1|banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|  C14|C15|C16| C17|C18|C19|   C20|C21|time_year|time_month|time_day|time_hour|
+--------------------+-----+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+---------+----------+--------+---------+
| 1000009418151094273|    0|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| ddd2926e|    44956a24|          1|               2|15706|320| 50|1722|  0| 35|    -1| 79|     2014|         9|      21|        0|
|10000169349117863715|    0|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| 96809ac8|    711ee120|          1|               0|15704|320| 50|1722|  0| 35|100084| 79|     2014|         9|      21|        0|
|10000371904215119486|    0|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| b3cf8def|    8a4875bd|          1|               0|15704|320| 50|1722|  0| 35|100084| 79|     2014|         9|      21|        0|
|10000640724480838376|    0|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| e8275b8f|    6332421a|          1|               0|15706|320| 50|1722|  0| 35|100084| 79|     2014|         9|      21|        0|
|10000679056417042096|    0|1005|         1|fe8cc448|   9166c161|     0569f928|ecad2386|  7801e8d9|    07d7df22| a99f214a| 9644d0bf|    779d90c2|          1|               0|18993|320| 50|2161|  0| 35|    -1|157|     2014|         9|      21|        0|
|10000720757801103869|    0|1005|         0|d6137915|   bb1ef334|     f028772b|ecad2386|  7801e8d9|    07d7df22| a99f214a| 05241af0|    8a4875bd|          1|               0|16920|320| 50|1899|  0|431|100077|117|     2014|         9|      21|        0|
|10000724729988544911|    0|1005|         0|8fda644b|   25d4cfcd|     f028772b|ecad2386|  7801e8d9|    07d7df22| a99f214a| b264c159|    be6db1d7|          1|               0|20362|320| 50|2333|  0| 39|    -1|157|     2014|         9|      21|        0|
|10000918755742328737|    0|1005|         1|e151e245|   7e091613|     f028772b|ecad2386|  7801e8d9|    07d7df22| a99f214a| e6f67278|    be74e6fe|          1|               0|20632|320| 50|2374|  3| 39|    -1| 23|     2014|         9|      21|        0|
|10000949271186029916|    1|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| 37e8da74|    5db079b5|          1|               2|15707|320| 50|1722|  0| 35|    -1| 79|     2014|         9|      21|        0|
|10001264480619467364|    0|1002|         0|84c7ba46|   c4e18dd6|     50e219e0|ecad2386|  7801e8d9|    07d7df22| c357dbff| f1ac7184|    373ecbe6|          0|               0|21689|320| 50|2496|  3|167|100191| 23|     2014|         9|      21|        0|
+--------------------+-----+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+---------+----------+--------+---------+

It looks like the year and month have only one value, let’s verify it:

hourDecoded.select("time_month").distinct.count()
hourDecoded.select("time_year").distinct.count()

res20: Long = 1
res21: Long = 1

We can safely drop these columns as they don’t bring any knowledge to our model:

val hourDecoded2 = hourDecoded.drop("time_month").drop("time_year")

Let’s also convert click from String to Double type.

import org.apache.spark.sql.types.DoubleType

val prepared = hourDecoded2
    .withColumn("clickTmp", hourDecoded2("click").cast(DoubleType))
    .drop("click")
    .withColumnRenamed("clickTmp", "click")

Saving Pre-processed Data to the Data Grid

The entire training dataset contains 40M+ rows, it takes quite a long time to experiment with different algorithms and approaches even in a clustered environment.
 We want to sample the dataset and checkpoint it to the in-memory data grid that is running collocated with Spark.

This way we can:

  • quickly iterate through different approaches
  • restart the Zeppelin session or launch other Spark applications and pick up the dataset more quickly from memory

Since the training dataset contains the data for the 10 days, we can pick any day and sample it:

prepared.filter("time_day = 21").count()

res51: Long = 4122995

There are 4M+ rows for this day, which is about 10% of the entire dataset.

Now let’s save it to the data grid. This can be done with two lines of code:

import org.apache.spark.sql.insightedge._
prepared.filter("time_day = 21").write.grid.save("day_21")

Any time later in another Spark context we can bring the collection to the Spark memory with:

val df = sqlContext.read.grid.load("day_21")

Also, we want to transform the test dataset that we will use for prediction in a similar way.

val testDf = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "false")
      .load("hdfs://10.8.1.116/data/avazu_ctr/test")

transformHour(testDf)
    .drop("time_month")
    .drop("time_year")
    .write.grid.save("test")

The complete listing of notebook can be found on GitHub. You can import it to Zeppelin and play with it on your own.

Stay tuned for Part II.

Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:
search engine ,machine learning ,regression ,spark ,big data

Published at DZone with permission of Oleksiy Dyagilev, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}