Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Deep Learning on Qubole Using BigDL for Apache Spark (Part 2)

DZone's Guide to

Deep Learning on Qubole Using BigDL for Apache Spark (Part 2)

Learn how to write a deep learning application on Qubole that is trained to use BigDL to identify handwritten digits (0 to 9).

· AI Zone ·
Free Resource

Insight for I&O leaders on deploying AIOps platforms to enhance performance monitoring today. Read the Guide.

In Part 1, you learned how to get started with installing distributed deep learning library BigDL on Qubole.

In this Part 2 of this two-part series, you will learn how to write a deep learning application on Qubole that uses BigDL to identify handwritten digits (0 to 9).

In this application, you will:

  1. Train and validate a LeNet-5 (convolutional neural networks) model using the MNIST database and then save the model.
  2. Load the saved model and identify (“recognize”) digits in a given set of arbitrary images with >= 98% accuracy.

To write the application, you will use Qubole Notebooks, which are great for developing applications in Scala, Python, and R, running ETL jobs in Spark, and visualizing results of SQL in a single collaborative environment. Qubole Notebooks give data scientists and data analysts an easy way to interact with data stored in Cloud data stores such as Amazon S3, Microsoft Azure Blob Store, Oracle BMC Object Storage, etc.

Steps

Start the Spark cluster that you configured in Part 1. Switch over to the Notebooks interface. Create a new Notebook by entering respective values and select the Spark cluster that you configured in Part 1.

Set spark.executor.instances and spark.qubole.max.executors to 2. (Note: At the time of writing this, BigDL required these two variables to have the same value.) To do this, click the Interpreter link located on the top right in your Notebook you just created. Then, click on Edit on the top right within the Interpreter interface and scroll down to the spark interpreter. After setting the above two parameters to 2, click on Save. Finally, click on the Back to Note button located at the top of Interpreter interface.

In the first paragraph, copy-and-paste the following Scala code. When executed, it will train, validate, and save the trained model.

import com.intel.analytics.bigdl.utils.Engine
import com.intel.analytics.bigdl.dataset._
import com.intel.analytics.bigdl.dataset.image._
import com.intel.analytics.bigdl.models.lenet.Utils
import com.intel.analytics.bigdl.DataSet
import com.intel.analytics.bigdl.models.lenet._
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.utils._
import com.intel.analytics.bigdl.optim.{LocalValidator, Top1Accuracy, Validator}

import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}

val numberOfMinNodes = 4
val numberOfCoresPerNode = 8
val mult = 64
val batchSize = numberOfMinNodes * numberOfCoresPerNode * mult

Engine.init

def load(featureFile: Path, labelFile: Path): Array[ByteRecord] = {
  val labelBuffer = ByteBuffer.wrap(Files.readAllBytes(labelFile))
  val featureBuffer = ByteBuffer.wrap(Files.readAllBytes(featureFile))

  val labelMagicNumber = labelBuffer.getInt()

  require(labelMagicNumber == 2049)
  val featureMagicNumber = featureBuffer.getInt()
  require(featureMagicNumber == 2051)

  val labelCount = labelBuffer.getInt()
  val featureCount = featureBuffer.getInt()
  require(labelCount == featureCount)

  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()

  val result = new Array[ByteRecord](featureCount)
  var i = 0
  while (i < featureCount) {
     val img = new Array[Byte]((rowNum * colNum))
     var y = 0
     while (y < rowNum) {
       var x = 0
       while (x < colNum) {
          img(x + y * colNum) = featureBuffer.get()
          x += 1
       }
       y += 1
    }
    result(i) = ByteRecord(img, labelBuffer.get().toFloat + 1.0f)
    i += 1
  }
  result
}

val baseDir = "/media/ephemeral0/bigdl/mnist/data/"
val trainDataFile = "train-images-idx3-ubyte"
val trainLabelFile = "train-labels-idx1-ubyte"
val validationDataFile = "t10k-images-idx3-ubyte"
val validationLabelFile = "t10k-labels-idx1-ubyte"

val trainData = Paths.get(baseDir, trainDataFile)
val trainLabel = Paths.get(baseDir, trainLabelFile)
val validationData = Paths.get(baseDir, validationDataFile)
val validationLabel = Paths.get(baseDir, validationLabelFile)

val trainMean = 0.13066047740239506
val trainStd = 0.3081078
val trainSet = DataSet.array(load(trainData, trainLabel), sc) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(batchSize)

val testMean = 0.13251460696903547
val testStd = 0.31048024
val validationSet = DataSet.array(load(validationData, validationLabel), sc) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToBatch(batchSize)

val state = T("learningRate" -> 0.05 / 4 * mult)
val maxEpoch = 15

val initialModel = LeNet5(10) // 10 digit classes
val optimizer = Optimizer(
 model = initialModel, // training modes
 dataset = trainSet, // training dataset
 criterion = ClassNLLCriterion[Float]()) // loss function

val trainedModel = optimizer.setValidation(
 trigger = Trigger.everyEpoch,
 dataset = validationSet,
 vMethods = Array(new Top1Accuracy)).setState(state).setEndWhen(Trigger.maxEpoch(maxEpoch)).optimize()

val validator = Validator(trainedModel, validationSet)
val result = validator.test(Array(new Top1Accuracy[Float]))
result.foreach(r => {
  println(s"${r._2} is ${r._1}")
})

trainedModel.save("/media/ephemeral0/bigdl/mnist/model/trainedModel",true)

Here’s what’s happening in the code:

  • Import BigDL and other Java libraries needed for the application.
  • Call Engine.init.
    • Engine is part of BigDL and it appropriately initializes executor environment variables and Spark properties in order to get the best performance for deep learning applications on Spark clusters.
  • Define the load function.
    • This function helps prepare our MNIST dataset. Recall that we downloaded MNIST dataset in folder /media/ephemeral0/bigdl/mnist/data/ on the cluster via a bootstrap script in Part 1. This function accepts file path and returns Array[ByteRecord].
  • Load data from MNIST by creating the BigDL dataset and then apply a series of transformers to preprocess data into the mini-batch.
    • Preprocess data, convert byte records into a gray image, normalize it, and convert a batch of labeled grey images into a mini-batch.
  • Train model.
    • Before training the model, we need to set hyperparameters which determine how the network is trained. For more details on hyperparameters, see here.
    • We use LeNet-5 as our training model. LeNet-5 is a classical CNN model used in digital number classification. For detailed information, see here.
    • Then we create the optimizer by specifying the dataset, the model, and the criterion — which, given input and target, computes gradient per given loss function.
  • Validate model.
    • BigDL provides a set of metrics to evaluate the model via Validator and ValidationMethod classes. We use Top1Accuracy as validationMethod, which will calculate top 1 accuracy and should yield accuracy of >= 98%.
  • Save model.
    • Lastly, we save the trained and validated model so we can use it for our image recognition application.

Run the above paragraph — it will take a few mins to train, validate and save the model. Once completed, proceed to the next step.

In a new paragraph, copy-and-paste the following Scala code. When executed, it will load the saved trained model in the previous step to identify (“recognize”) digits in a given set of arbitrary images.

import com.intel.analytics.bigdl.utils.Engine
import com.intel.analytics.bigdl.dataset._
import com.intel.analytics.bigdl.dataset.image._
import com.intel.analytics.bigdl.models.lenet.Utils
import com.intel.analytics.bigdl.DataSet
import com.intel.analytics.bigdl.models.lenet._
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.utils._

import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}
import javax.imageio.ImageIO
import java.io.File
import java.awt.image.{BufferedImage, DataBufferByte}
import java.awt.Color

Engine.init

def image_to_byte_array(imageName: String): Array[Byte] = {
  val imgPath = new File(imageName)
  val bufferedImage = ImageIO.read(imgPath)
  val scaledImage = bufferedImage.getScaledInstance(28, 28, java.awt.Image.SCALE_SMOOTH)
  val scaledBufferedImage = new BufferedImage(28, 28, BufferedImage.TYPE_BYTE_GRAY)
  scaledBufferedImage.getGraphics.drawImage(scaledImage, 0, 0, new Color(0, 0, 0), null)

  // get DataBufferBytes from Raster
  val raster = scaledBufferedImage.getRaster
  val data = raster.getDataBuffer.asInstanceOf[DataBufferByte].getData
  val bytes = new Array[Byte](8 + data.length)
  val byteBuffer = ByteBuffer.wrap(bytes)
  byteBuffer.putInt(scaledBufferedImage.getWidth)
  byteBuffer.putInt(scaledBufferedImage.getHeight)
  System.arraycopy(data, 0, bytes, 8, data.length)

  bytes
}

def load_image(imageName: String): Array[ByteRecord] = {
  val byteArray = image_to_byte_array(imageName)
  val featureBuffer = ByteBuffer.wrap(byteArray)
  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()
  val result = new Array[ByteRecord](1)
  val img = new Array[Byte]((rowNum * colNum))
  var y = 0
  while (y < rowNum) {
    var x = 0
    while (x < colNum) {
      img(x + y * colNum) = featureBuffer.get()
      x += 1
    }
    y += 1
  }
  result(0) = ByteRecord(img, 1.1f)
  result
}

val testMean = 0.13251460696903547
val testStd = 0.31048024

val trainedModel = Module.load[Float]("/media/ephemeral0/bigdl/mnist/model/trainedModel")

val dir = "/media/ephemeral0/bigdl/"
val images = Seq("zero_28x28.png", "two_28x28.png", "three.png", "four_28x28.png", "seven.png", "nine_28x28.png")

for (img <- images) {
  val load_result = load_image(dir + img)
  val rddData = sc.parallelize(load_result)
  val transformer = BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToSample()
  val predictDataSet = transformer(rddData)
  val predictRDD = trainedModel.predictClass(predictDataSet)
  val predictedDigit = predictRDD.take(1)(0) - 1  // deduct 1 because the first label is always 0
  println("\r\n")
  println("***** Digit in file " + img + " is: " + predictedDigit)
}

Replace YOUR_S3_BUCKET with S3 bucket in your AWS account where you uploaded the test images.

Run the above paragraph. Once completed, you should see output similar to the following:

As you can see from the results, the trained model was able to identify digits (0, 2, 3, 4, 7, 9) in our test image files.

In Summary

BigDL is a great open-source deep learning library built to run natively on Apache Spark, and the autonomous big data platform Qubole provides for a perfect deployment platform for BigDL as well as other deep learning and machine learning libraries.

To learn how you can use Qubole for various workload types, click here.

TrueSight is an AIOps platform, powered by machine learning and analytics, that elevates IT operations to address multi-cloud complexity and the speed of digital transformation.

Topics:
big data ,deep learning ,ai ,qubole ,bigdl ,apache spark ,tutorial ,neural networks

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}