Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Distributed Deep Learning With Keras on Apache Spark

DZone's Guide to

Distributed Deep Learning With Keras on Apache Spark

Learn how easy it is to configure, train, and evaluate any distributed deep learning model described in the Keras framework!

· AI Zone ·
Free Resource

Did you know that 50- 80% of your enterprise business processes can be automated with AssistEdge?  Identify processes, deploy bots and scale effortlessly with AssistEdge.

Deep learning has been shown to produce highly effective machine learning models in a diverse group of fields. Some of the most interesting are pharmaceutical drug discovery, detection of illegal fishing cargo, mapping dark matter, tracking deforestation in the Amazon, taxi destination prediction, predicting lift and grasp movements from EEG recordings, and medical diagnosis for cancer (you can also read more about that here). Distributed deep learning allows for internet-scale dataset sizes, as exemplified by companies like Facebook, Google, Microsoft, and other huge enterprises. This blog post demonstrates how any organization of any size can leverage distributed deep learning thanks to the Qubole Data Service (QDS).

This demonstration utilizes the Keras framework for describing the structure of a deep neural network and subsequently leverages the Dist-Keras framework to achieve data parallel model training on Apache Spark. Keras was chosen in large part due to it being the dominant library for deep learning at the time of this writing (see here, here, and here). 

The Goal: Distributed Deep Learning Integrated With Spark ML Pipelines

Upon completion of this blog post, an enterprising data scientist should be able to extend this demonstration to their application-specific modeling needs. Within a Qubole notebook, you should be able to cross-validate your deep neural networks using the Spark ML Pipeline interface, with an application-specific parameter grid similar to the following:

df_train = process_csv("/fully/qualified/path/to/training/data")
df_test  = process_csv("/fully/qualified/path/to/test/data")

param_grid = tuning.ParamGridBuilder() \
                   .baseOn(['regularizer', regularizers.l1_l2]) \
                   .addGrid('activations', [['tanh', 'relu']]) \
                   .addGrid('initializers', [['glorot_normal',
                                              'glorot_uniform']]) \
                   .addGrid('layer_dims', [[input_dim, 2000, 300, 1]]) \
                   .addGrid('metrics', [['mae']]) \
                   .baseOn(['learning_rate', 1e-2]) \
                   .baseOn(['reg_strength', 1e-2]) \
                   .baseOn(['reg_decay', 0.25]) \
                   .baseOn(['lr_decay', 0.90]) \
                   .addGrid('dropout_rate', [0.20, 0.35, 0.50, 0.65, 0.80]) \
                   .addGrid('loss', ['mse', 'msle']) \
                   .build()

estimator = DistKeras(trainers.ADAG,
                      {'batch_size': 256,
                       'communication_window': 3,
                       'num_epoch': 10,
                       'num_workers': 50},
                      **param_grid[0])

evaluator = evaluation.RegressionEvaluator(metricName='r2')

cv_estimator = tuning.CrossValidator(estimator=estimator,
                                     estimatorParamMaps=param_grid,
                                     evaluator=evaluator,
                                     numFolds=5)
cv_model = cv_estimator.fit(df_train)

df_pred_train = cv_model.transform(df_train)
df_pred_test  = cv_model.transform(df_test)

In order to make your cross-validation work as illustrated above, you need to first configure your Qubole cluster, and secondly set up your Qubole notebook. The instructions for doing so are contained in the remainder of this blog.

Configuring Your QDS Cluster

Add the following lines to your node_boostrap script:

# automatically installs latest version of Keras as dependency
pip install dist-keras
# for GPU clusters, swap out default dependency tensorflow
# with tensorflow for GPU nodes
pip uninstall tensorflow
pip install tensorflow-gpu

And restart your cluster. Note the default back-end for Keras is Tensorflow. It supports any of the following back-ends as well: CNTK, MXNET, Theano. To use any of the other back-ends, you must pip install them in the node_bootstrap script and subsequently tell Keras to which back-end to switch.

Setting Up Your QDS Notebook

First, import the necessary libraries:

from keras import layers, models, optimizers, regularizers, utils
from pyspark.ml import evaluation, feature, tuning
from distkeras import predictors, trainers
from pyspark.sql import functions, types
from pyspark import ml
import numpy as np 
import matplotlib 
import StringIO

...after which you should define these wrappers to tightly integrate with Spark ML pipelines. The wrappers are taken directly from an open-source gist.

class DistKeras(ml.Estimator):

    def __init__(self, *args, **kwargs):
        self.__trainer_klass = args[0]
        self.__trainer_params = args[1]
        self.__build_trainer(**kwargs)
        super(DistKeras, self).__init__()

    @classmethod
    def __build_keras_model(klass, *args, **kwargs):
        loss = kwargs['loss']
        metrics = kwargs['metrics']
        layer_dims = kwargs['layer_dims']
        hidden_activation, output_activation = kwargs['activations']
        hidden_init, output_init = kwargs['initializers']
        dropout_rate = kwargs['dropout_rate']
        alpha = kwargs['reg_strength']
        reg_decay = kwargs['reg_decay']
        reg = kwargs['regularizer']
        keras_model = models.Sequential()
        for idx in range(1, len(layer_dims)-1, 1):
            keras_model.add(layers.Dense(layer_dims[idx],
                                         input_dim=layer_dims[idx-1],
                                         bias_initializer=hidden_init,
                                         kernel_initializer=hidden_init,
                                         kernel_regularizer=reg(alpha)))
            keras_model.add(layers.Activation(hidden_activation))
            keras_model.add(layers.Dropout(dropout_rate))
            alpha *= reg_decay
        keras_model.add(layers.Dense(layer_dims[-1],
                                     input_dim=layer_dims[-2],
                                     bias_initializer=output_init,
                                     kernel_initializer=output_init,
                                     kernel_regularizer=reg(alpha)))
        keras_model.add(layers.Activation(output_activation))
        return keras_model

    def __build_trainer(self, *args, **kwargs):
        loss = kwargs['loss']
        learning_rate = kwargs['learning_rate']
        lr_decay = kwargs['lr_decay']
        keras_optimizer = optimizers.SGD(learning_rate, decay=lr_decay)
        keras_model = DistKeras.__build_keras_model(**kwargs)
        self._trainer = self.__trainer_klass(keras_model, keras_optimizer,
                                             loss, **self.__trainer_params)

    def _fit(self, *args, **kwargs):
        data_frame = args[0]
        if len(args) > 1:
            params = args[1]
            self.__build_trainer(**params)
        keras_model = self._trainer.train(data_frame)
        return DistKerasModel(keras_model)


class DistKerasModel(ml.Model):

    def __init__(self, *args, **kwargs):
        self._keras_model = args[0]
        self._predictor = predictors.ModelPredictor(self._keras_model)
        super(DistKerasModel, self).__init__()

    def _transform(self, *args, **kwargs):
        data_frame = args[0]
        pred_col = self._predictor.output_column
        preds = self._predictor.predict(data_frame)
        return preds.withColumn(pred_col,
                                cast_to_double(preds[pred_col]))


cast_to_double = functions.udf(lambda row: float(row[0]), types.DoubleType())

Last but not least, you should define some important helper functions, starting with the show() function which displays arbitrary and generic matplotlib figures. This function is adapted from the Qubole blog on integrating the alternate library Plotly into our notebooks.

# must do before importing pyplot or pylab
matplotlib.use('Agg')
from matplotlib import pyplot as plt

def show(fig):
    image = StringIO.StringIO()
    fig.savefig(image, format='svg')
    image.seek(0)
    print("%html <div style='width:1200px'>"+ image.buf +"</div>")

Another important helper function is process_csv(), which automates the highly redundant task of creating a data frame with renamed columns (such as label for the label column) and with excluded columns (such as unused ID columns) from a CSV file in cloud storage.

def process_csv(fully_qualified_path, columns_renamed=tuple(),
                excluded_columns=tuple(), num_workers=None):
    if num_workers is None:
        raise NotImplementedError

    excluded_columns = frozenset(excluded_columns)
    data_frame = sqlContext.read.format('com.databricks.spark.csv') \
                           .options(header='true', inferSchema='true') \
                           .load(fully_qualified_path)
    for (old_name, new_name) in columns_renamed:
        data_frame = data_frame.withColumnRenamed(old_name, new_name)
    data_frame = data_frame.repartition(num_workers)

    feature_columns = tuple(frozenset(data_frame.columns) \
                            .difference(excluded_columns))
    transformer = feature.VectorAssembler(inputCols=feature_columns,
                                          outputCol='features')
    data_frame = transformer.transform(data_frame) \
                            .drop(*feature_columns).cache()

    return data_frame

Now you are ready to configure, train, and evaluate any distributed deep learning model described in Keras!

Consuming AI in byte sized applications is the best way to transform digitally. #BuiltOnAI, EdgeVerve’s business application, provides you with everything you need to plug & play AI into your enterprise.  Learn more.

Topics:
deep learning ,apache spark ,keras ,machine learning ,ai

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}