Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Java-Based Fraud Detection With Spark MLlib

DZone's Guide to

Java-Based Fraud Detection With Spark MLlib

Learn how to develop an algorithm with Java and Spark MLlib that can detect fraud based on a dataset with seven million records.

· AI Zone ·
Free Resource

Did you know that 50- 80% of your enterprise business processes can be automated with AssistEdge?  Identify processes, deploy bots and scale effortlessly with AssistEdge.

In this post, we are going to develop an algorithm in Java using Spark MLlib. The full working code can be download from GitHub. It is possible to run the code with several different configurations and experiments on your own without deep Java knowledge (using the configuration file).

In a previous post, we implemented the same anomaly detection algorithm using Octave. We filtered out 500,000 records (only of type TRANSFER) from seven million to investigate and get insight into the available data. Also, several graphs were plotted to show what the data and anomalies (frauds) look like. Since Octave loads all the data in-memory, it has limitations for large data. For this reason, we'll use Spark to run anomaly detection on a larger dataset of seven million.

Gaussian Distribution

This section provides a brief description of how the Gaussian function is used for anomaly detection. For a more detailed view, please refer to my previous post. The Gaussian density function has a bell-shaped curve shape, as seen below:

Regular data, which are the majority of the data, tend to be in the center of the bell-shaped curve, while anomalies on the edge are rarer. At the same time, we can see that points on the edge have lower function values (or probability, less than 0.1) compared to those in the center (close to 0.4).

Following this example, we can say that every coming example that has a probability density function lower than 0.05 is an anomaly. Of course, we can control the threshold value depending on our needs. Big values mean that more anomalies are flagged and most of them are probably not anomalies. On the other hand, small values mean we may miss anomalies as the algorithm becomes more tolerant. There are several ways to calculate an optimal value, and one of the ways is described in my previous post.

The above example is one-dimensional, with data having only one feature. In reality, we have data with a lot more features and dimensions. To plot our data into the graph, we reduce the dimension of data using principal component analysis (PCA) to two-dimensional (2D) or even three-dimensional (3D). Here's an example with two dimensions:

Notice how normal data tend to stay together in the middle of the first and second circles and anomalies are on the edges of the third circle. Circles on the graph show how the Gaussian bell curve is distributed among data (normally, it will be bell-shaped in 3D, but for simplicity, it is shown in 2D).

To place an example on a certain position in the bell-shaped graph, we need to calculate two components: µ (mean) and σ2 (variance).Once we have calculated mean and variance, we can apply a fairly simple formula to get the density probability for new coming examples. If the probability is lower than a certain value (sigma), we flag it as an anomaly; otherwise, it is normal. Find details about exploitation in my previous post.

Spark and MLlib

This section provides a brief description of both Spark and MLlib. For more detailed explanation and tutorial, check out the official website.

Spark

Apache Spark is a cluster computing framework. Spark help us execute jobs in parallel across different nodes in a cluster and then combine those results in one single result/response. It transforms our collection of data into a collection of elements distributed across nodes of the cluster called an RDD (resilient distributed dataset). For example, in a Java program, we can transform a collection into an RDD capable of parallel operations like this:

JavaRDD<LabeledPoint> paralleledTestData = sc.parallelize(collection);

Parallel collections are cut into partitions and Spark executes one task per partition, so we want to have two to four partitions per CPU. We can control the number of partitions Spark created by defining another argument to the method with sc.parallelize(collection,partitionNumber). Besides collections coming from the application, Spark is also capable of transforming data from the storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, and Amazon S3.

After our data is transformed to an RDD, we can perform two kinds of parallel operations on cluster nodes. The transforming operation takes an RDD collection as input and returns a new RDD collection, like maps and actions, which take an RDD and return a single result like reduce, count, etc. Regardless of the type, actions are lazy, similar to Java 8 in the way that they do not run when defined but rather when requested. So, we can have an operation calculated several times when requested, and to avoid that, we can persist in memory or cache.

MLlib

Spark supports APIs in Java, Scala, Python, and R. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

MLlib is Spark’s machine learning (ML) library. It provided several ready-to-use ML tools like:

  • ML algorithms
    • Classification
    • Regression
    • Clustering
    • Collaborative filtering
  • Featurization
    • Feature extraction
    • Transformation
    • Dimensionality reduction
    • Selection
  • Utilities
    • Linear algebra
    • Statistics
    • Data handling

Data Preparation

To get some insight on the data and how anomalies are distributed across regular data, see here. Similar to the previous post, we need to prepare the data for the algorithm execution. Here's what the data looks like:

We need to convert everything into numbers. Fortunately, most of the data are numbers — only nameOrig and nameDest start with a character like CD, or M. We simply replace C with 1, D with 2, and M with 3. Also, we convert types from chars to numbers like below:

  • PAYMENT = 1
  • TRANSFER = 2
  • CASH_OUT = 3
  • DEBIT = 4
  • CASH_IN = 5

All the preparation is done with Java code using the Spark transformation operation map:

     File file = new File(algorithmConfiguration.getFileName());

        return sc.textFile(file.getPath()).
                map(line -> {
                    line = line.replace(TransactionType.PAYMENT.name(), "1")
                            .replace(TransactionType.TRANSFER.name(), "2")
                            .replace(TransactionType.CASH_OUT.name(), "3")
                            .replace(TransactionType.DEBIT.name(), "4")
                            .replace(TransactionType.CASH_IN.name(), "5")
                            .replace("C", "1")
                            .replace("M", "2");
                    String[] split = line.split(",");
                    //skip header
                    if (split[0].equalsIgnoreCase("step")) {
                        return null;
                    }
                    double[] featureValues = Stream.of(split)
                            .mapToDouble(e -> Double.parseDouble(e)).toArray();
                    if (algorithmConfiguration.isMakeFeaturesMoreGaussian()) {
                        FraudDetectionAlgorithmSpark.this.makeFeaturesMoreGaussian(featureValues);
                    }
                    //always skip 9 and 10 because they are labels fraud or not fraud
                    double label = featureValues[9];
                    featureValues = Arrays.copyOfRange(featureValues, 0, 9);
                    return new LabeledPoint(label, Vectors.dense(featureValues));
                }).cache();
    }
}

After that, the file should look like this:

Because of the large file size and GitHub file size limitation, data are not provided within the code. You can download the file from here, rename it allData.csv (change constant FILE_NAME for the different name), and copy it into the folder data/.

Executing the Algorithm

Let's see step-by-step how we can execute the anomaly detection algorithm.

  1. From all the data (seven million) we need to randomly choose a percentage for training, cross-validation, and test data. The code that will randomly pick up regular and fraudulent data for a dataset looks like this:
    Collections.shuffle(regularData);//randomly re order data
    Collections.shuffle(anomalies);
    List<LabeledPoint> regular = regularData.stream().parallel().limit(normalSize).collect(toList());
    List<LabeledPoint> fraud = anomalies.stream().parallel().limit(fraudSize).collect(toList());
    We run this code two times to get training and cross-validation data. What is left is the test data. We will see several percentage choices later on.
  2. Next, we will need µ (mean) and σ(variance) calculations, as they are crucial to getting the probability of new examples. The code looks like this:
    @Override
    protected MultivariateStatisticalSummary getMultivariateSummary(GeneratedData<JavaRDD<LabeledPoint>> trainData) {
        return Statistics.colStats(trainData.regularAndAnomalyData.map(e -> e.features()).rdd());
    }
  3. As mentioned earlier, once we have mean and variance using the Gaussian formula, we can calculate the probability value. Based on the probability value, we decide if it is an anomaly or a regular example. We compare the value with some threshold (epsilon); if it is lower, then we mark it as an anomaly, and if greater, we mark it as regular. Choosing epsilon is crucial, as having small value can cause the algorithm to flag a lot of false fraud. On the other hand, with large values, we can miss fraud. We use cross-validation data with precision and recall to choose best epsilon.
    Double bestEpsilon = findBestEpsilon(sc, crossData, summary);
    resultsSummary.setEpsilon(bestEpsilon);
  4. Now, we are ready to evaluate our algorithm on test data (we also do an optional evaluation on cross-validation data).
    TestResult testResultFromTestData = testAlgorithmWithData(sc, getTestData(crossData), summary, bestEpsilon);
    fillTestDataResults(resultsSummary, testResultFromTestData);
    
    TestResult testResultFromCrossData = testAlgorithmWithData(sc, crossData.regularAndAnomalyData, summary, bestEpsilon);
    fillCrossDataResults(resultsSummary, testResultFromCrossData);

Before executing the algorithm, we need need to download data (it's not packed because of GitHub's file size limitation), extract it, and copy and paste it as allData.csv to the folder data/allData.csv. The file location is configurable, as well as the file name. The algorithm can be tested with the data and various options through the configuration file at config/algorithm.properties, as shown below:

#60% of regular data used for training
trainDataNormalPercentage=60

#0% of fraud data used for training
trainDataFraudPercentage=0

#50% of frauds used as test data
testDataFraudPercentage=30
#20% of regular data used as test data
testDataNormalPercentage=20

#50% of frauds used as cross data
crossDataFraudPercentage=70
#20% of regular data used as cross data
crossDataNormalPercentage=20

#We can skip 11 features indexed from 0 to 10 ex 1,2,6,7
skipFeatures=1,2,3,6,7,8

#Possible values :
#0->ALL
#1->PAYMENT
#2->TRANSFER
#3->CASH_OUT
#4->DEBIT
#5->CASH_IN
transactionTypes=CASH_OUT

#Possible values SPARK and JAVA_STREAM
runsWith=SPARK

#How many times you want the algorithm to run
runsTime=1

#make features more gaussian by powering current values
makeFeaturesMoreGaussian=true

fileName=data/allData.csv

hadoopApplicationPath=winutils-master/hadoop-2.8.1

After the configuration is changed, the application can be run on the Java IDE or in Maven by running:

mvn clean install exec:java

Depending on your machine and configuration, it may take some time (for me, it takes two minutes) for the application to finish. Also, your computer may freeze a bit as, as Spark gets the CPU to 100% at a certain point. Also, expect a lot of memory to be used by the application (2-3 GB for me). You can see the result printed on the console or by looking at folder out/; there will a generated file *.txt with the output. As explained in my previous post, the algorithm is based on randomness, so you can configure it to run several times and expect one file per each execution.

Experiments and Results

From my experiments, I see that frauds are available only for two types: TRANSFER and CASH_OUT. TRANSFER was investigated in details in my previous post. We achieved a pretty high rate: 99.7%.

When run only for the CASH_OUT type and without skipping any columns/features, we get poor results:

ResultsSummary{
, RUN =0
, successPercentage=0.13532555879494654
, failPercentage=0.8646744412050534
trainRegularSize=1340688
, trainFraudSize=0
, trainTotalDataSize=0
, transactionTypes=[CASH_OUT]
, timeInMilliseconds=58914
, testNotFoundFraudSize=1076
, testFoundFraudSize=180
, testFlaggedAsFraud=4873
, testFraudSize=1256
, testRegularSize=446023
, testTotalDataSize=447279....

We are only able to find ~14% of frauds for this type. Previously, we were able to improve a lot by making the feature more look like the Gaussian bell curve, but unfortunately, that is not the case this time.

What we can do is look at our features and see if we can add or skip some features since features cam=n introduce confusion and noise rather than benefits. Looking at the data source, we have following description of fraud, which can help:

" isFraud  is the transactions made by the fraudulent agents inside the simulation. In this specific dataset, the fraudulent behavior of the agents aims to profit by taking control of customers' accounts and trying to empty the funds by transferring them to another account and then cashing out of the system."

It looks like fraud here is when a large amount or all of the funds are cashed out from the account. Slowly, we start removing not-needed features; I found good results by removing features [1,2,3,7,8] or type (amount, nameOrigoldBalanceDestnewBalanceDest). When cashing out, the account from which the money is being taken is more important than the destination because the account may already have money and look pretty normal, but an empty source account may signal fraudulent behavior. We leave the destination account name, as it may help in case of fraudulent account names. The results look like :

Finish within 70027
ResultsSummary{
, RUN =0
, successPercentage=0.8277453838678328
, failPercentage=0.17225461613216717
trainRegularSize=1340058
, trainFraudSize=0
, trainTotalDataSize=0
, transactionTypes=[CASH_OUT]
, timeInMilliseconds=67386
, testNotFoundFraudSize=218
, testFoundFraudSize=1016
, testFlaggedAsFraud=139467
, testFraudSize=1234
, testRegularSize=446808
, testTotalDataSize=448042

This is a huge improvement — we were able to go from14% to 82.77% by running all types together. Also, it does not bring any better results with different skipped features (feel free to experiment, as not all of this is explored). I was able to get some results by skipping only amount (2), but this is still not satisfactory, as a lot of non-fraudulent activity was flagged(1,040,950).

Finish within 128117
ResultsSummary{
, RUN =0
, successPercentage=0.8700840131498844
, failPercentage=0.12991598685011568
trainRegularSize=3811452
, trainFraudSize=0
, trainTotalDataSize=0
, transactionTypes=[ALL]
, timeInMilliseconds=125252
, testNotFoundFraudSize=325
, testFoundFraudSize=2153
, testFlaggedAsFraud=1040950
, testFraudSize=2478
, testRegularSize=1272665
, testTotalDataSize=1275143

In this case, it's probably better to run the algorithm for each type. When a possible transaction is made, we run against its type. In this way, we will be able to detect fraudulent activity more appropriately, as TRANSFER has a 99.7% rate and CASH_OUT, 87% percent. Still, for CASH_OUT, we can say that the rate is not that satisfactory and maybe other approaches are worth try — but this has to be investigated first, and usually, intuition is wrong and costs a lot of time. Since it's difficult to get more data in a finance application because of privacy, I would rather go in the direction of applying different algorithms here. When the data for CASH_OUT were plotted, we got a view like below:

Red = normal data,  magenta = not-found fraud, green = found frauds, and blue = wrongly flagged as fraud.

This graph shows that the problem is that the majority of fraud is contained in the center of the normal data and the algorithm struggles to detect them. Still, I believe there could be other ways to mix features or even add more.

Java Stream vs. Spark

We can configure the algorithm (see property runsWith) to run on Spark or Java 8 Streams to manipulate the data. Spark is a great framework if you want to run your code on several remote nodes on the cluster and aggregate results to a requested machine. In this post, the algorithm is executed locally and Spark treats local resources like the number of CPU as target cluster resources. On the other hand, Java 8 streams easily provides parallelism with collection.stram().parallel() (of course, on the running machine locally). So, as part of the experiment, Java 8 streams were compared to Spark on a single machine.

Results show that Java 8 streams are faster locally, even if not by much. Java = 111,927 seconds and Spark = 128,117 seconds — so basically, 16-25 seconds faster streams when it's run with all the data. Please note that your computer results may differ; feel free to suggest new results.

Since Spark is optimized for distributed computing, it has some overhead with partitioning, tasks, and so on compared to Java Streams, which only need to think for the local machine and have the luxury to optimize a lot there. Anyway, I can see the gap closing with the amount of data increasing even locally.

For a small amount of data, Java 8 Stream fits better, but for huge amounts of data, Spark scales and fits better. Maybe it's worth it to try Spark configured on a cluster running maybe on AWS rather than locally. For more details, please see the code for two Java implementations handling the same exact algorithm but with nonessential small differences: FraudDetectionAlgorithmJavaStream and FraudDetectionAlgorithmSpark.

Consuming AI in byte sized applications is the best way to transform digitally. #BuiltOnAI, EdgeVerve’s business application, provides you with everything you need to plug & play AI into your enterprise.  Learn more.

Topics:
ai ,tutorial ,fraud detection ,spark mllib ,machine learning ,algorithm ,gaussian distribution

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}