DZone
AI Zone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
  • Refcardz
  • Trend Reports
  • Webinars
  • Zones
  • |
    • Agile
    • AI
    • Big Data
    • Cloud
    • Database
    • DevOps
    • Integration
    • IoT
    • Java
    • Microservices
    • Open Source
    • Performance
    • Security
    • Web Dev
DZone > AI Zone > Distributed Deep Learning With Keras on Apache Spark

Distributed Deep Learning With Keras on Apache Spark

Learn how easy it is to configure, train, and evaluate any distributed deep learning model described in the Keras framework!

Horia Margarit user avatar by
Horia Margarit
·
Nov. 20, 17 · AI Zone · Tutorial
Like (4)
Save
Tweet
9.70K Views

Join the DZone community and get the full member experience.

Join For Free

Deep learning has been shown to produce highly effective machine learning models in a diverse group of fields. Some of the most interesting are pharmaceutical drug discovery, detection of illegal fishing cargo, mapping dark matter, tracking deforestation in the Amazon, taxi destination prediction, predicting lift and grasp movements from EEG recordings, and medical diagnosis for cancer (you can also read more about that here). Distributed deep learning allows for internet-scale dataset sizes, as exemplified by companies like Facebook, Google, Microsoft, and other huge enterprises. This blog post demonstrates how any organization of any size can leverage distributed deep learning thanks to the Qubole Data Service (QDS).

This demonstration utilizes the Keras framework for describing the structure of a deep neural network and subsequently leverages the Dist-Keras framework to achieve data parallel model training on Apache Spark. Keras was chosen in large part due to it being the dominant library for deep learning at the time of this writing (see here, here, and here). 

The Goal: Distributed Deep Learning Integrated With Spark ML Pipelines

Upon completion of this blog post, an enterprising data scientist should be able to extend this demonstration to their application-specific modeling needs. Within a Qubole notebook, you should be able to cross-validate your deep neural networks using the Spark ML Pipeline interface, with an application-specific parameter grid similar to the following:

df_train = process_csv("/fully/qualified/path/to/training/data")
df_test  = process_csv("/fully/qualified/path/to/test/data")

param_grid = tuning.ParamGridBuilder() \
                   .baseOn(['regularizer', regularizers.l1_l2]) \
                   .addGrid('activations', [['tanh', 'relu']]) \
                   .addGrid('initializers', [['glorot_normal',
                                              'glorot_uniform']]) \
                   .addGrid('layer_dims', [[input_dim, 2000, 300, 1]]) \
                   .addGrid('metrics', [['mae']]) \
                   .baseOn(['learning_rate', 1e-2]) \
                   .baseOn(['reg_strength', 1e-2]) \
                   .baseOn(['reg_decay', 0.25]) \
                   .baseOn(['lr_decay', 0.90]) \
                   .addGrid('dropout_rate', [0.20, 0.35, 0.50, 0.65, 0.80]) \
                   .addGrid('loss', ['mse', 'msle']) \
                   .build()

estimator = DistKeras(trainers.ADAG,
                      {'batch_size': 256,
                       'communication_window': 3,
                       'num_epoch': 10,
                       'num_workers': 50},
                      **param_grid[0])

evaluator = evaluation.RegressionEvaluator(metricName='r2')

cv_estimator = tuning.CrossValidator(estimator=estimator,
                                     estimatorParamMaps=param_grid,
                                     evaluator=evaluator,
                                     numFolds=5)
cv_model = cv_estimator.fit(df_train)

df_pred_train = cv_model.transform(df_train)
df_pred_test  = cv_model.transform(df_test)

In order to make your cross-validation work as illustrated above, you need to first configure your Qubole cluster, and secondly set up your Qubole notebook. The instructions for doing so are contained in the remainder of this blog.

Configuring Your QDS Cluster

Add the following lines to your node_boostrap script:

# automatically installs latest version of Keras as dependency
pip install dist-keras
# for GPU clusters, swap out default dependency tensorflow
# with tensorflow for GPU nodes
pip uninstall tensorflow
pip install tensorflow-gpu

And restart your cluster. Note the default back-end for Keras is Tensorflow. It supports any of the following back-ends as well: CNTK, MXNET, Theano. To use any of the other back-ends, you must pip install them in the node_bootstrap script and subsequently tell Keras to which back-end to switch.

Setting Up Your QDS Notebook

First, import the necessary libraries:

from keras import layers, models, optimizers, regularizers, utils
from pyspark.ml import evaluation, feature, tuning
from distkeras import predictors, trainers
from pyspark.sql import functions, types
from pyspark import ml
import numpy as np 
import matplotlib 
import StringIO

...after which you should define these wrappers to tightly integrate with Spark ML pipelines. The wrappers are taken directly from an open-source gist.

class DistKeras(ml.Estimator):

    def __init__(self, *args, **kwargs):
        self.__trainer_klass = args[0]
        self.__trainer_params = args[1]
        self.__build_trainer(**kwargs)
        super(DistKeras, self).__init__()

    @classmethod
    def __build_keras_model(klass, *args, **kwargs):
        loss = kwargs['loss']
        metrics = kwargs['metrics']
        layer_dims = kwargs['layer_dims']
        hidden_activation, output_activation = kwargs['activations']
        hidden_init, output_init = kwargs['initializers']
        dropout_rate = kwargs['dropout_rate']
        alpha = kwargs['reg_strength']
        reg_decay = kwargs['reg_decay']
        reg = kwargs['regularizer']
        keras_model = models.Sequential()
        for idx in range(1, len(layer_dims)-1, 1):
            keras_model.add(layers.Dense(layer_dims[idx],
                                         input_dim=layer_dims[idx-1],
                                         bias_initializer=hidden_init,
                                         kernel_initializer=hidden_init,
                                         kernel_regularizer=reg(alpha)))
            keras_model.add(layers.Activation(hidden_activation))
            keras_model.add(layers.Dropout(dropout_rate))
            alpha *= reg_decay
        keras_model.add(layers.Dense(layer_dims[-1],
                                     input_dim=layer_dims[-2],
                                     bias_initializer=output_init,
                                     kernel_initializer=output_init,
                                     kernel_regularizer=reg(alpha)))
        keras_model.add(layers.Activation(output_activation))
        return keras_model

    def __build_trainer(self, *args, **kwargs):
        loss = kwargs['loss']
        learning_rate = kwargs['learning_rate']
        lr_decay = kwargs['lr_decay']
        keras_optimizer = optimizers.SGD(learning_rate, decay=lr_decay)
        keras_model = DistKeras.__build_keras_model(**kwargs)
        self._trainer = self.__trainer_klass(keras_model, keras_optimizer,
                                             loss, **self.__trainer_params)

    def _fit(self, *args, **kwargs):
        data_frame = args[0]
        if len(args) > 1:
            params = args[1]
            self.__build_trainer(**params)
        keras_model = self._trainer.train(data_frame)
        return DistKerasModel(keras_model)


class DistKerasModel(ml.Model):

    def __init__(self, *args, **kwargs):
        self._keras_model = args[0]
        self._predictor = predictors.ModelPredictor(self._keras_model)
        super(DistKerasModel, self).__init__()

    def _transform(self, *args, **kwargs):
        data_frame = args[0]
        pred_col = self._predictor.output_column
        preds = self._predictor.predict(data_frame)
        return preds.withColumn(pred_col,
                                cast_to_double(preds[pred_col]))


cast_to_double = functions.udf(lambda row: float(row[0]), types.DoubleType())

Last but not least, you should define some important helper functions, starting with the show() function which displays arbitrary and generic matplotlib figures. This function is adapted from the Qubole blog on integrating the alternate library Plotly into our notebooks.

# must do before importing pyplot or pylab
matplotlib.use('Agg')
from matplotlib import pyplot as plt

def show(fig):
    image = StringIO.StringIO()
    fig.savefig(image, format='svg')
    image.seek(0)
    print("%html <div style='width:1200px'>"+ image.buf +"</div>")

Another important helper function is process_csv(), which automates the highly redundant task of creating a data frame with renamed columns (such as label for the label column) and with excluded columns (such as unused ID columns) from a CSV file in cloud storage.

def process_csv(fully_qualified_path, columns_renamed=tuple(),
                excluded_columns=tuple(), num_workers=None):
    if num_workers is None:
        raise NotImplementedError

    excluded_columns = frozenset(excluded_columns)
    data_frame = sqlContext.read.format('com.databricks.spark.csv') \
                           .options(header='true', inferSchema='true') \
                           .load(fully_qualified_path)
    for (old_name, new_name) in columns_renamed:
        data_frame = data_frame.withColumnRenamed(old_name, new_name)
    data_frame = data_frame.repartition(num_workers)

    feature_columns = tuple(frozenset(data_frame.columns) \
                            .difference(excluded_columns))
    transformer = feature.VectorAssembler(inputCols=feature_columns,
                                          outputCol='features')
    data_frame = transformer.transform(data_frame) \
                            .drop(*feature_columns).cache()

    return data_frame

Now you are ready to configure, train, and evaluate any distributed deep learning model described in Keras!

Deep learning Machine learning Apache Spark Keras

Published at DZone with permission of Horia Margarit, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • The Importance of Semantics for Data Lakehouses
  • Common Types Of Network Security Vulnerabilities In 2022
  • How Low Code Demands More Creativity From Developers
  • Automation Testing vs. Manual Testing: What's the Difference?

Comments

AI Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • MVB Program
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends:

DZone.com is powered by 

AnswerHub logo