Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Monitoring Real-Time Uber Data Using Apache APIs, Part 4: Spark Streaming, DataFrames, and HBase

DZone's Guide to

Monitoring Real-Time Uber Data Using Apache APIs, Part 4: Spark Streaming, DataFrames, and HBase

Go over Spark Streaming writing to MapR-DB using the Spark HBase and MapR-DB Binary connector and reading from MapR-DB Binary using Spark SQL and DataFrames.

· AI Zone ·
Free Resource

Insight for I&O leaders on deploying AIOps platforms to enhance performance monitoring today. Read the Guide.

According to Gartner, 20.8 billion connected things will be in use worldwide by 2020. Danny Lange, the head of machine learning at Uber, aims to bring machine learning to every corner of Uber’s business. Examples of connected things include connected cars and devices as well as applications used for healthcare, telecom, manufacturing, retail, and finance. Leveraging the huge amounts of data coming from these devices requires processing events in real time, applying machine learning to add value, and scalable fast storage. Architectures for these types of applications are usually an event-driven microservices architecture.

This is the fourth in a series of blogs discusiing the architecture of an end-to-end application that combines streaming data with machine learning to do real-time analysis and visualization of where and when Uber cars are clustered, so as to predict and visualize the most popular Uber locations.

  1. The first part of this series discusses creating a machine learning model, using the Apache Spark K-means algorithm to cluster Uber data by location.
  2. The second post discusses using the saved K-means model with streaming data to do real-time analysis of where and when Uber cars are clustered.
  3. The third post discusses building a real-time dashboard to visualize the cluster data on a Google map. The following figure depicts the data pipeline:

Picture1

In this post, we will go over Spark Streaming writing to MapR-DB using the Spark HBase and MapR-DB Binary connector and reading from MapR-DB Binary using Spark SQL and DataFrames. The following figure depicts the data pipeline:

  • Uber trip data is published to a MapR Streams topic using the Kafka API.
  • A Spark Streaming application subscribed to the first topic enriches the event with the cluster location and publishes the results in JSON format to another topic.
  • A Vert.x web application subscribed to the second topic displays the Uber trip clusters in a heatmap.
  • A Spark Streaming application subscribed to the second topic stores the data in MapR-DB using the Spark HBase and MapR-DB Binary Connector.
  • A Spark batch application queries MapR-DB with Spark SQL using the Spark HBase and MapR-DB Binary Connector.

Picture2

Spark and MapR-DB

One of the challenges that comes when you are processing lots of streaming data is figuring out where you want to store it. With MapR-DB (HBase API or JSON API), a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.

Picture3

The Spark HBase and MapR-DB Binary Connector leverages the Spark DataSource API. The connector architecture has an HConnection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR-DB tablets.

Picture4

Spark Streaming Writing to MapR-DB

Picture5

You can read about the MapR Streams Spark Streaming code in part 2 of this series; here, we will focus on the writing to MapR-DB part. The messages from the MapR-DB topic are in JSON format and contain the following for each Uber trip: the cluster center id, datetime, latitude, and longitude for the trip, base for the trip, and latitude and longitude for the cluster center. An example is shown below:

{"cid":18, "dt":"2014-08-01 08:51:00", "lat":40.6858, "lon":-73.9923, "base":"B02682", "clat":40.67462874550765, "clon":-73.98667466026531}

In the code below, we create an HBaseContext object with an HBaseConfiguration object. The HBaseContext carries the configuration broadcast information to the HConnections in the executors.

Picture6

In the code below, we:

  • Get the message value from the message key value pair.
  • Call the HBaseContext streamBulkPut method, passing the message value DStream, the TableName to write to, and a function to convert the Dstream values to HBase put records.

Picture7

The convertToPut function parses the JSON string and creates an HBase PUT object.

Picture8

The Spark Streaming bulk put enables massively parallel sending of puts to HBase.

Picture9

SparkSQL and DataFrames

The Spark HBase and MapR-DB Binary Connector enables users to perform complex relational SQL queries on top of MapR-DB using a Spark DataFrame while applying critical techniques such as partition pruning, column pruning, predicate pushdown, and data locality.

To use the Spark HBase and MapR-DB Binary Connector, you need to define the Catalog for the schema mapping between the HBase and Spark tables. Below is the schema for storing the Uber trip data:

  • A composite row key contains the cluster id, the base, the data, and the time, separated by an underline.
  • There is a column family data for storing all the data and a column family stat for statistical roll ups.
  • There are two columns, one for the latitude and one for the longitude of each trip.

Picture10

The Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog. One is the row key definition. The other is the mapping between the table columns in Spark and the column family and column names in HBase. The following example defines the Catalog schema for the MapR-DB table with name as /user/user01/db/uber, row key as key, and columns lat and lon. Note that the row key also has to be defined as a column (key), which has a specific cf (row key).

Picture11

Loading Data From MapR-DB Into a Spark Dataframe

In the withCatalog function below,

  • The SQLContext read returns a DataFrameReader that can be used to read data in a DataFrame.
  • The options function adds input options for the underlying data source to the DataFrameReader.
  • The format function specifies the input data source format for the DataFrameReader.
  • The load() function loads input as a DataFrame. The first 20 rows of the data frame df returned by the withCatalog function are output with the df.show.

Picture12

The output of the df.show is displayed below:

Picture13

In the following example, the df.filter filters rows using the given SQL expression to filter for cluster ids (the beginning of the row key) >= 9. The select selects a set of columns: key, lat, and lon.

Picture14

The results of the the df.show are shown below:

Picture15

All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Converged Data Platform.

Downloading and Running the Example

Related Resources

TrueSight is an AIOps platform, powered by machine learning and analytics, that elevates IT operations to address multi-cloud complexity and the speed of digital transformation.

Topics:
ai ,tutorial ,uber ,spark ,streaming ,hbase ,real-time data

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}