Building Sales Recommendation Engine With Apache Spark

DZone 's Guide to

Building Sales Recommendation Engine With Apache Spark

Recommendations are considered to be collaborative filtering problems, and Apache Spark has a built-in algorithm to implement just that.

· AI Zone ·
Free Resource

Getting started with machine learning could be as simple as Hello World if conquered with a simple use case like recommendation engines.

The most popular choice for starting machine learning in Java is Apache Spark, as it comes with a special ML library/module with lots of simple to advance level algorithms.

Recommendations are considered as a collaborative filtering problem, and Apache Spark has a built-in algorithm to implement them.

What Is Collaborative Filtering?

As per the Apache Spark website:

These techniques aim to fill in the missing entries of a user-item association matrix. spark.mllib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.mllib uses the alternating least squares (ALS) algorithm to learn these latent factors. The implementation in spark.mllib has the following parameters:

  • numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).

  • rank is the number of features to use (also referred to as the number of latent factors).

  • iterations is the number of iterations of ALS to run. ALS typically converges to a reasonable solution in 20 iterations or less.

  • lambda specifies the regularization parameter in ALS.

  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.

  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

Getting Started

Apache Spark mllib is available as a Maven dependency on the central repository. You need to set up the below module to get it started.


Preparing the Datasets

Now, before getting your hands dirty with some code, you need to build valid datasets.

In our case, we are building a sample sales lead prediction model based on past sales orders.

Here are few sample records from both datasets.

Sales orders:

UserId UserName ProductId ProductName Rate Quantity Amount
1 User 1 1 Product 1 10 5 50
User 1 2 Product 2 20 10 200
1 User 1 3 Product 3 10 15 150
2 User 2 1 Product 1 10 5 50
2 User 2 2 Product 2 20 20 400
2 User 2  4 Product 4 10 15 150

Sales leads:

UserId UserName ProductId ProductName
1 User 1 4 Product 4
1 User 1 5 Product 5
2 User 2 3 Product 3
2 User 2 6 Product 6

We need to predict/recommend the most relevant product for both the users based on their past order history. Here, we can see that both User 1 and User 2 ordered Product 1 and Product 2; also, they ordered one item separately.

Now, we predict their rating for alternate products and one new product.


Step 1

The first step is to load the training model and convert it to the rating format using JavaRDD API.

JavaRDD < String > salesOrdersFile = sc.textFile("target/classes/data/sales_orders.csv");

// Map file to Ratings(user, item, rating) tuples
JavaRDD < Rating > ratings = salesOrdersFile.map(new Function < String, Rating > () {
 public Rating call(String order) {
  String data[] = order.split(",");
  return new Rating(Integer.parseInt(data[userIdIndex]), Integer.parseInt(data[productIdIndex]), Double.parseDouble(data[ratingIndex]));

Step 2

Next step is to train the matrix factorization model using the ALS algorithm.

MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations);

Step 3

Now, we load the sales lead file and convert it to tupple format.

// file format - user, product
JavaRDD < String > salesLeadsFile = sc.textFile("target/classes/data/sales_leads.csv");

// Create user-product tuples from leads
JavaRDD < Tuple2 < Object, Object >> userProducts = salesLeadsFile.map(new Function < String, Tuple2 < Object, Object >> () {

 public Tuple2 < Object, Object > call(String lead) {
  String data[] = lead.split(",");
  return new Tuple2 < Object, Object > (Integer.parseInt(data[userIdIndex]), Integer.parseInt(data[productIdIndex]));

Step 4

Finally, we can predict the future rating using a simple API.

// Predict the ratings of the products not rated by user
JavaRDD<Rating> recomondations = model.predict(userProducts.rdd()).toJavaRDD().distinct();

Step 5

Optionally, you can sort the output using basic pipeline operation:

// Sort the recommendations by rating in descending order

recomondations = recomondations.sortBy(new Function < Rating, Double > () {

 public Double call(Rating v1) throws Exception {
  return v1.rating();
}, false, 1);

Step 6

Now, you can display your result using the basic JavaRDD API.

// Print the recommendations .

recomondations.foreach(new VoidFunction < Rating > () {

 public void call(Rating rating) throws Exception {
  String str = "User : " + rating.user() + // " Product : " + rating.product() + // " Rating : " + rating.rating();


User : 2 Product : 3 Rating : 54.54927015541634
User : 1 Product : 4 Rating : 49.93948224984236


vinodmrhcl/ml-starter: Machine Learning Starter

The above output recommends that User 2 would like to buy Product 3 and User 1 would go for Product 4.

This also recommends that there is no recommendation for new products, as they do not match any similarity criteria in past.

machine learning ,recommendation engine ,spark ,mllib ,ai ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}