Over a million developers have joined DZone.

Outlier Detection From Large Scale Categorical Breast Cancer Datasets Using Spark 2.0.0: Part II

DZone's Guide to

Outlier Detection From Large Scale Categorical Breast Cancer Datasets Using Spark 2.0.0: Part II

In Part II we learn how to develop a large-scale application with Spark Core.

· Big Data Zone
Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

If you missed Part I, you can read it here!

Developing Large-Scale Outlier Application with Spark Core

As I have already mentioned that the details of the attributes found in WDBC dataset: ID number, Diagnosis (M = malignant, B = benign) and ten real-valued features are computed for each cell nucleus: Radius, Texture, Perimeter, Area, Smoothness, Compactness, Concavity, Concave points, Symmetry and Fractal dimension [6]. For the dataset description in detail, please refer the first part of this title here.

These features are computed from a digitized image of a fine needle aspirate (FNA) of a breast mass. They describe characteristics of the cell nuclei present in the corresponding images. Well, we have enough knowing about the dataset up to this point, I believe.

Now, I will show how to develop a breast cancer diagnosis outlier pipeline step by step. Please note that I have used Spark 2.0.0 using Java for the demonstration. Some conceptual ideas have been referred from [7] and re-implemented with Spark 2.0.0. The complete source code including the Maven friendly pom.xml file and the dataset can be downloaded from my GitHub repositories here. 

Step-1: Import necessary packages/libraries/APIs

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import com.example.SparkSession.UtilityForSparkSession;
import scala.Tuple2;

Step-2: Create the Spark entry point by a Spark session

SparkSession spark = UtilityForSparkSession.mySession(); 

Here is the UtilityForSparkSession class that creates and returns a Spark session:

import org.apache.spark.sql.SparkSession;
public class UtilityForSparkSession {
    public static SparkSession mySession() {
        SparkSession spark = SparkSession.builder()
                .master("local[*]") //You might want to set your driver IP:PORT acordingly
                .config("spark.sql.warehouse.dir", "E:/Exp/") //set your warehouse accordingly. 
        return spark;

Step-3: create the first RDD by reading input  

Please note that the Spark Context initiated with Spark session does  not allow JavaRDD to be created but only the RDD of String. However, don't worry, I will convert the same RDD as JavaRDD when required.

Here the number of partition is 1. You should increase/decrease the number accordingly based on your dataset size or memory/disk capacity.

final String inputPath = "breastcancer/input/wdbc.data"; //Input path
RDD<String> lines_of_patient_records = spark.sparkContext().textFile(inputPath, 1);  
lines_of_patient_records.cache(); // Cache it if you have enough memory space; since, we will be using the same in next few steps

Step-4: Perform the flatMapToPair() to create the lines_of_patient_records RDD

         JavaPairRDD<String,Integer> patient = lines_of_patient_records.toJavaRDD().flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            public Iterator<Tuple2<String,Integer>> call(String row) { //For each line as variable 'row'     
                List<Tuple2<String,Integer>> results = new ArrayList<Tuple2<String,Integer>>();
                // As shown in above figure, a record has the following format: <Patient-id><,><feature_point1><,><feature_point2><,><feature_point3><,>...
                String[] tokens = row.split(","); //i.e. features
                for (int i = 1; i < tokens.length; i++) { //Except the first entry i.e. Patient-ID
                    results.add(new Tuple2<String,Integer>(tokens[i], 1));
                return results.iterator();
/* Sample output should be like this (against 1 record i.e. 1 line):

Step-5: Count the frequency of the categorical data

Now, we need to count the frequency of all the feature-point including the label of course. However, we need to keep the categorical data or features as String [7]. Since the population contains both the numerics as well as characters data.

        JavaPairRDD<String, Integer> count_frequency_by_reducebykey = patient.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;

 Now you might be interested in saving the RDD on HDFS or your local file system using:


If you read the above text file, the RDD should contain the following values:

         * This should produced the output as:

Step-6: Prepare a buffer Map for storing the Attribute Value Frequency (AVF) scores

Make sure that you have not used the HashMap, otherwise, you will miss the duplicated entries. Rather use a normal map with String and Integer so that it can hold  (key, value) pair => (feature, frequency).

final Map<String, Integer> AVPMap = count_frequency_by_reducebykey.collectAsMap(); 

Step-7: compute the AVF Scores using the buffer map (i.e. AVFMap)

Here, according to [7], for each input record of: <Patient-ID><,><feature_point1><,><feature_point2><,><feature_point3><,>.... The PairFunction<T, K, V>    T => Tuple2<K, V>: T = input record (i.e. line), K = Patient-ID, V = AVF score. 

JavaPairRDD<String,Double> AVF = lines_of_patient_records.toJavaRDD().mapToPair(new PairFunction<String, String, Double>() {
            public Tuple2<String,Double> call(String rec) {  
                String[] tokens = rec.split(",");
                String recordID = tokens[0];  //Patient ID who is getting diagnosed
                int sum = 0;
                for (int i = 1; i < tokens.length; i++) {
                    sum += AVPMap.get(tokens[i]);
                double N = (double) (tokens.length -1); //We are excluding the first entry i.e. patientID
                double AVFScore = ((double) sum) / N;
                return new Tuple2<String,Double>(recordID, AVFScore);

Step-8: Compute the lowest K AVF scores in descending order

The build in Java takeOrdered() method takes two parameters: K  and a comparator instance that returns the first K (smallest) elements from this RDD using the natural ordering for T while maintaining the order [7]. 

class TupleComparatorDescending implements Comparator<Tuple2<String,Double>>, Serializable {
final static TupleComparatorDescending INSTANCE = new TupleComparatorDescending();
   public int compare(Tuple2<String,Double> t1, Tuple2<String,Double> t2) {
      return -(t1._2.compareTo(t2._2)); // sort based on AVF Score in descending order

The TupleComparatorAscending class will be described at the end of step 10. The above code segment should print the following value for K = 30:

            Output: => (PaitentID, AVF score)
            Descending AVF Score

If you analyse the above output, you will see that the AVF scores fall in 4 clusters (i.e. 28, 27, 14 and 13). It is also clearly seen that Patient-ID 91544002 falls in the cluster 14 which is a minority cluster. Therefore, it can be referred as an outlier. This also signifies that there might have some chances of breast cancer based on the data.

Step-9: Compute the lowest K AVF scores in ascending order

The below code segments prints the lowest K AVF scores in an ascending order for K =30. 

List<Tuple2<String,Double>> outliers = AVF.takeOrdered(K, TupleComparatorAscending.INSTANCE);       
        System.out.println("Ascending AVF Score");
        System.out.println("Outliers: ");
        for(Iterator<Tuple2<String, Double>> it = outliers.iterator(); it.hasNext();)

The above code segment should print the AVF scores as follows:

Output: => (PaitentID, AVF score)
Ascending AVF Score

Step-10: Stop the Spark session

Stop the Spark session when all the computations have been completed by calling the stop() method as follows:


Auxiliary Classes

As mentioned earlier, the following two classes - i.e. TupleComparatorAscending and  TupleComparatorDescending are shown below which is re-used from [7].  I would like to thank Mr. Mahmoud Parsian for making the source code available from an excellent book title "Data Algorithm: Recipes for Scaling Up with Hadoop and Spark, O'Reilly Media, 2015". 

The TupleComparatorDescending Class

This class sorts based on the AVF Score in a descending order. 

class TupleComparatorDescending implements Comparator<Tuple2<String,Double>>, Serializable {
final static TupleComparatorDescending INSTANCE = new TupleComparatorDescending();
   public int compare(Tuple2<String,Double> t1, Tuple2<String,Double> t2) {
      return -(t1._2.compareTo(t2._2)); // sort based on AVF Score in a descending order

The TupleComparatorAscending class

This class sorts based on the AVF Score in an ascending order.

class TupleComparatorAscending implements Comparator<Tuple2<String,Double>>, Serializable {
final static TupleComparatorAscending INSTANCE = new TupleComparatorAscending();
   public int compare(Tuple2<String,Double> t1,Tuple2<String,Double> t2) {
      return (t1._2.compareTo(t2._2)); // sort based on AVF Score

The complete source codes including the Maven friendly pom.xml and the dataset is available on my GitHub repositories here. 


The outlier detection method that I have shown can be extended and applied to other sorts of applications like credit card fraud detection, network anomaly detection etc. However, to handle unstructured and multidimensional datasets non-trivial amounts of efforts need to be applied especially for the pre-processing, and feature engineering step; including feature extraction and feature selection. 

Furthermore, the performance of the above algorithm can certainly be improved with proper ML model selection and tuning. Unfortunately, the current implementation of Spark provides the Hyperparameter tuning using only model selection via Cross-validation and TranSplit. Interested readers should refer the Spark website for the regular updates. 

In my next post, I will show how to implement the same application for TCGA dataset using Dataset<Row> which is a unified data abstraction of DataFrame and Dataset now in Spark 2.0.0. The reason is, of course, I found using the Dataset is more convenient and lightweight and computationally faster compared to the RDD.

Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

big data ,apache spark ,java 1.7 ,datasets

Opinions expressed by DZone contributors are their own.


Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.


{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}