Deep Learning With Intel's BigDL and Apache Spark

DZone 's Guide to

Deep Learning With Intel's BigDL and Apache Spark

When it comes to training convolutional neural networks, Spark and BigDL are two peas in a pod. See how they can be used together to fine-tune a transfer learning strategy.

· AI Zone ·
Free Resource

Cloudera recently published a blog on how to use Deeplearning4J (DL4J) along with Apache Hadoop and Apache Spark to get state-of-the-art results on an image recognition task. Continuing on a similar stream of work, in this post we discuss a viable alternative that is specifically designed to be used with Spark, and data available in Spark and Hadoop clusters via a Scala or Python API.

The deep learning landscape is still evolving. On one hand, we have veteran frameworks like Theano and Caffe as well as more popular ones like TensorFlow, while on the other, we see the emergence of JVM-based frameworks that can perform distributed deep learning using GPUs and CPUs. These JVM-based tools can leverage existing Spark clusters to parallelize the training of models. This post will discuss Intel's BigDL, an open-source distributed deep learning framework for Big Data platform using Apache Spark. It will highlight some high-level differences between Deeplearning4j and BigDL, detail how to build a convolutional neural network (CNN) while exploring a different approach to transfer learning (fine-tuning pre-trained models), and provide suggestions on troubleshooting and tuning Apache Spark jobs.

Comparing BigDL and Deeplearning4j

Intel's BigDL is a relatively new open-source framework that provides native support for deep learning on Spark using CPUs. On the other hand, Deeplearning4j was created in 2014, is built for the JVM, and allows training deep neural networks on CPUs and GPUs. To compare these frameworks directly may not be fair, given their different life spans and the fact that both are still under development. But comparing features can help us understand when one or both are applicable to a particular problem.

Image title

Classifying Caltech-256 Images With BigDL

To further the comparison, below we present a worked example with BigDL that mirrors the previous DL4J example. That is, it walks through the steps to build a convolutional neural network that can classify images in the Caltech-256 dataset.

The Caltech-256 dataset was downloaded and separated into training and validation sets using stratified sampling and stored in HDFS (full instructions). We begin with reading the JPEG files from HDFS as binary files and then decode the compressed format into the ByteRecord format as required by BigDL. Like any other machine learning workloads, we may want to pre-process the data before feeding it into the model. In the current case, we chain three transformations using the Transformer APIs.

For computing efficiency, we would like to train and infer data in batches — in this case, 16. The mini-batch size must be a multiple of the total number of cores. All this has a role to play in how the training process is conducted within BigDL.

Transfer Learning

We are aware that convolutional neural networks (CNNs) require large datasets and a lot of computational time to train. So instead of training from scratch, transfer learning utilizes a pre-trained model on a new dataset. Previously, we extracted features up until the second last layer and trained a new prediction layer. This time, we will not only train a new prediction layer but also allow the previous layers except the last to be tweaked, if needed.

To elaborate, let's say a CNN has a set of shared parameters θs (i.e. five convolutional layers and two fully connected layers), task-specific parameters for previously learned tasks θ(i.e. the output layer for classification and corresponding weights), and randomly initialized task specific parameters for new tasks θn (i.e. scene classifiers). It is useful to think of θo and θn as classifiers that operate on features parameterized by θs. Currently, there are three common approaches to learning θn while benefiting from previously learned θs.

  • Feature extraction: Uses a pre-trained CNN to compute the features of an image. θs and θare unchanged, and the outputs of one or more layers are used as features for the new task in training θn. The extracted features are activations of one layer (usually the last hidden layer) or multiple layers given the image. This is what we accomplished in the previous blog post.
  • Fine-tuning: Modifies the parameters of an existing CNN to train a new task and the output layer is extended with randomly initialized weights for the new task. That is, θs and θn are optimized for the new task, while θis fixed. We employ this strategy in the current post. It is also possible to use a variation of fine-tuning where part of θs — the convolutional layers — are frozen and only top fully connected layers are fine-tuned. This can be seen as a compromise between fine-tuning and feature extraction.

Image title

Figure 1

Using Pre-Trained Models With BigDL

BigDL supports loading pre-trained models; all it needs is a model checkpoint file (pre-trained model), a definition file, and a BigDL Module (network) that represents the layers of the neural network. The Caffe library has a Model Zoo where people share their checkpoints, also known as network weights. For our learning task, we use the VGG16-layer model checkpoint available here.

// Note that if matchAll = false, then only layers with same name will be loaded, the rest will use initialized parameters
val model = Module.loadCaffe[Float](VGG16NetCaltech(257), 
    modelDefPath = "./VGG_ILSVRC_16_layers.caffemodel", 
    modelPath = "./VGG_ILSVRC_16_layers_deploy.prototxt", matchAll = false)

To reuse the existing weights, we first need to make sure that the layer names in the VGG16NetCaltech module match the original network's layer name found in the model definition file as shown below.

However, we know that the VGG16-layer model was trained on the ImageNet dataset that has 1,000 different classes and would want to change it for the current application. It can be achieved by simply renaming the last fully connected layer from "fc8" to "fc8_caltech256" and changing the output size to the number of classes in the Caltech-256 dataset, that is, 257. Thus, we have defined a new hybrid model that uses weights from a pre-trained model for all the layers except for the last one saving tremendous resources and training time.

Distributed Training

The training process requires you to specify the model, training dataset, and the loss criterion. In this case, it's the negative log likelihood, which is used often for classification problems.

val optimizer = Optimizer(
 model = model,
 dataset = trainSet,
 criterion = new ClassNLLCriterion[Float]()

Set the hyperparameters for training.

val optim = new SGD(
 learningRate = 0.0001,
 learningRateDecay = 0.0,
 weightDecay = 0.0001,
 momentum = 0.8,
 learningRateSchedule = SGD.EpochStep(20, 0.1),
 dampening = 0.0,
 nesterov = true

Along with the optimization method and associated hyperparameters, we specify how frequently to test the validation set accuracy, how many passes to make over the training data, when and where to save the model checkpoint, and so on.

 // Validation set accuracy to be tested after every epoch
   valSet, Array(new Top1Accuracy[Float], new Top5Accuracy[Float]))
 // Save model checkpoint after every epoch
 .setCheckpoint("./bigdl-models/vgg16-net", Trigger.severalIteration(1301))
 // End training after 15 epochs

Now, you run the Spark application on the cluster (details on submitting the Spark application and the entire code are available here), you'll notice that it begins training the network in batches of 16 images and outputs the loss and other hyperparameters used.

17/06/10 12:26:49 INFO optim.DistriOptimizer$: [Epoch 1 0/20812][Iteration 1][Wall Clock 0.0s] Train 16 in 10.541978735seconds. Throughput is 1.5177416 records/second. Loss is 5.750294. Current learning rate is 1.0E-4. Current weight decay is 1.0E-4. Current momentum is 0.8. Current nesterov is true.

Visualizing Learning

Training a neural network can be complex and confusing. One may want to keep a tab on how the learning progresses to help understand, debug, or even optimize it further. For example, if in a run we choose to randomize the order in which the images appear within the mini-batches and in another one we do not (while keeping the rest of the hyperparameters unchanged), we would find that the prior approach reduces the training loss. The orange line in Figure 2 below depicts the training loss after randomization.

Image title

Figure 2

The process of enabling visualization via TensorBoard is pretty straightforward. After installing TensorBoard, simply specify the location and a name (that describes the run) to store the training and validation summary results. Note that by default, the TrainSummary will display the "loss" and "throughput" at each iteration, one can also enable "Parameters" or "LearningRate" — but keep in mind that this would cause additional overhead and might slow down the training process.

Finally, in order to view the training, use the command:

$ tensorboard --logdir=./logs/run1
Starting TensorBoard 41 on port 6006

The TensorFlow UI is then accessible at http://<ip>:6006, and it displays the loss, throughput, accuracy, and other statistics.

Image title

Figure 3

~5% Top1 Accuracy on validation set is pretty low to begin with, but as shown in Figure 3, at the end of the 15th Epoch, the loss goes down and the training Top1 accuracy goes up to 30%.

Model Checkpoints and Performance

We can also independently test the model performance on a test set using any of the trained model snapshots saved at the checkpoint location.

// Load validation set
val testSet = 
 DataSet.rdd(Utils.readJpegs(sc, "hdfs:///user/leon/Caltech-256/test", 224)) ->
 BytesToBGRImg() ->
 BGRImgNormalizer(104, 117, 123, 1, 1, 1) ->
// Load model
val model = Module.load[Float]("./bigdl-models/20170729_104304/model.20816")
val validator = Validator(model, testSet)

val result = validator.test(Array(new Top1Accuracy[Float], new Top5Accuracy[Float]))
Result.foreach { case (value, metric) => println(s"$metric is $value") }
Top1Accuracy is Accuracy(correct: 1910, count: 6122, accuracy: 0.3119895459000327)
Top5Accuracy is Accuracy(correct: 3283, count: 6122, accuracy: 0.5362626592616792)

If ever the model performance improves initially and then starts to flatten or decrease, it might be a good idea to reduce the learning rate at that point while resuming training from where it left off. All one would need to do is use the model snapshot from the 15th epoch, which would be a minor change to the code above.

val model = Module.load[Float]("./bigdl-models/vgg16-net/20170611_101342/model.20816")

And then specify a new set of hyperparameters:

val optim = new SGD(
 learningRate = 0.00001,
 learningRateDecay = 0.0,
 weightDecay = 0.0001,
 momentum = 0.8,
 learningRateSchedule = SGD.EpochStep(20, 0.1),
 dampening = 0.0,
 nesterov = true

You can also choose to retrieve the existing optimization snapshot file:

val optim = OptimMethod.load[Float]("./bigdl-models/20170729_104304/modelOptim.20816")

Troubleshooting and Tuning Spark Jobs

When trying to run BigDL Spark applications we have generally encountered out-of-memory (OOM) errors. Some of these errors tend to occur on the driver side while the others on the executor side. For example, at times the job fails when trying to load the VGG16 model, and the Spark web UI logs provide no other details in this case. Increasing the driver memory option should resolve this and is controlled by the --driver-memory option while submitting the Spark application.

17/07/25 15:46:16 INFO caffe.CaffeLoader: start loading caffe model from ./VGG_ILSVRC_16_layers.caffemodel
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3181)

Later, when the training process tries to cache the model on the executors, it might fail during initialization. The stack trace as shown below points to the issue. Increasing the executor memory does the trick here and is controlled by the -executor-memory option while submitting the Spark application. The error trace may look like the following:

17/07/25 21:04:02 INFO optim.DistriOptimizer$: caching training rdd ...
[Stage 4:>   (0 + 4) / 4]Exception in thread "dispatcher-event-loop-17" java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)

Furthermore, at some point during the training iterations, like at the end of an epoch, one may have a trigger that would save the model at a specified checkpoint location. The job might fail, with messages about the YARN scheduler losing executors again. It might be caused when the application tries to fetch the current model from the executors to the driver or the driver trying to save the model (which is now >1 GB in size) at the specified checkpoint directory. Both these are also situations that result from running into out-of-memory can be fixed by either increasing the executor or the driver memory.


In this post, we used Apache Spark and Intel's BigDL to train a convolutional neural network employing the fine-tuning transfer learning strategy. We found how well they work together and how this framework makes it easier for data scientists and analysts to continue to use their existing Apache Hadoop and Apache Spark platform as a unified data analytics platform. Intel's BigDL is natively built on Spark, hence its vernacular Scala and Python API allows users to adopt it, making it well-suited for deep learning on Cloudera.

ai, apache spark, bigdl, deep learning, image classification, machine learning, neural networks, transfer learning, tutorial

Published at DZone with permission of Nisha Muktewar , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}