Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Streaming Data Pipeline to Transform, Store, and Explore With Kafka, Spark, and Drill

DZone's Guide to

Streaming Data Pipeline to Transform, Store, and Explore With Kafka, Spark, and Drill

Learn how to consume streaming Open Payments CSV data, transform it to JSON, store it in a document database, and explore with SQL using Apache Spark, MapR-ES MapR-DB, OJAI, and Apache Drill.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

In the past, big data was interacted with in batch on a once-a-day basis. Now, data is dynamic, and data-driven businesses need instant results from continuously changing data. Data pipelines, which combine real-time stream processing with the collection, analysis, and storage of large amounts of data, enable modern, real-time applications, analytics and reporting.

This post is based on a recent workshop I helped develop and deliver at a large health service and innovation company's analytics conference. This company is combining streaming data pipelines with data science on top of the MapR Converged Data Platform to improve healthcare outcomes, improve access to appropriate care, better manage cost, and reduce fraud, waste, and abuse.

In this post we will:

  • Use Apache Spark streaming to consume Medicare Open payments data using the Apache Kafka API.
  • Transform the streaming data into JSON format and save to the MapR-DB document database.
  • Query the MapR-DB JSON table with Apache Spark SQL, Apache Drill, and the Open JSON API (OJAI) and Java.

Example Streamline Processing Pipeline

Example Use Case Dataset

Since 2013, Open Payments has been a federal program that collects information about the payments drug and device companies make to physicians and teaching hospitals for things like travel, research, gifts, speaking fees, and meals.

Facts About Open Payments Data

For a large health payment dataset, JSON, Apache Spark, MapR-ES, and MapR-DB are an interesting combination for a health analytics workshop because:

  • JSON is an open-standard and efficient format that is easy for computer languages to manipulate. Newer standards for exchanging healthcare information such as FHIR are easier to implement because they use a modern suite of API technology, including JSON.
  • Apache Spark SQL, DataFrames, and Datasets make it easy to load, process, transform, and analyze JSON data. MapR-ES is a distributed messaging system for streaming event data at scale. MapR-ES integrates with Spark Streaming via the Kafka API.
  • MapR-DB, a high-performance NoSQL database, supports JSON documents as a native data store. MapR-DB makes it easy to store, query, and build applications with JSON documents. The Spark connector makes it easy to build real-time or batch pipelines between your JSON data and MapR-DB and leverage Spark within the pipeline.

MapR-DB table

How Do You Build a Data Pipeline That Handles Millions of Events in Real-Time at Scale?

A common data pipeline architecture pattern is event-sourcing using an append-only publish-subscribe event stream such as MapR Event Streams (which provides a Kafka API). MapR-ES Topics are logical collections of events that organize events into categories and decouple producers from consumers, making it easy to add new producers and consumers. Topics are partitioned for throughput and scalability, producers are load-balanced, and consumers can be grouped to read in parallel. MapR-ES can scale to very high throughput levels, easily delivering millions of messages per second using very modest hardware.

Kafka API

Processing Streaming Data With Spark

Apache Spark Streaming is an extension of the core Spark API that enables continuous data stream processing. Data streams can be processed with Spark's core, SQL, GraphX, or machine learning APIs, and can be persisted to a file system, HDFS, MapR-XD, MapR-DB, HBase, or any data source offering a Hadoop OutputFormat or Spark connector. Stream processing of events is useful for filtering, transforming, creating counters and aggregations, correlating values, joining streams together, machine learning, and publishing to a different topic for pipelines.

MapR Event Streams integrates with Spark Streaming via the Kafka direct approach. The MapR-DB OJAI Connector for Apache Spark enables you to use MapR-DB as a sink for Apache Spark Data Streams.

Application

The incoming data is in CSV format. An example is shown below:

"NEW","Covered Recipient Physician",,,,"132655","GREGG","D","ALZATE",,"8745 AERO DRIVE","STE 200","SAN DIEGO","CA","92123","United States",,,"Medical Doctor","Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology","CA",,,,,"DFINE, Inc","100000000326","DFINE, Inc","CA","United States",90.87,"02/12/2016","1","In-kind items and services","Food and Beverage",,,,"No","No Third Party Payment",,,,,"No","346039438","No","Yes","Covered","Device","Radiology","StabiliT",,"Covered","Device","Radiology","STAR Tumor Ablation System",,,,,,,,,,,,,,,,,"2016","06/30/2017"

There are a lot of fields in this data that we will not use; we will parse the following fields:

Parse Fields

And transform them into the following JSON document for storing in MapR-DB:

{

    "_id" :"317150_08/26/2016_346122858",
    "physician_id" :"317150",
    "date_payment" :"08/26/2016",
    "record_id" :"346122858",
    "payer" :"Mission Pharmacal Company",
    "amount" :9.23,
    "Physician_Specialty" :"Obstetrics & Gynecology",
    "Nature_of_payment" :"Food and Beverage"
}

Transform

Spark Kafka consumer-producer code

Note: Code snippets are shown here; you can download the complete code and instructions from the GitHub link at the end of this post.

Parsing the Dataset Records

A Scala Payment case class defines the schema corresponding to the CSV data that we are interested in. The parsePayment function parses a line of comma-separated values into the Payment case class.

Parsing the Data Set Records

PaymentwId class defines the JSON document schema for MapR-DB.

In order to save the JSON objects to MapR-DB, we need to define the _id field, which is the row key and primary index for MapR-DB. The parsePaymentwID function creates an object with an ID equal to a combination of the physician ID, the date, and the record ID. Since MapR-DB row keys are partitioned and sorted by row key when inserted, the payment documents will be grouped by physician and date in MapR-DB. This will give really fast queries, aggregations, and sorting by physician ID and date. We will also look at secondary indexes later in this post.

Parsing the Data Set Records

Spark Streaming Code

We use the KafkaUtils createDirectStream method with Kafka configuration parameters to create an input stream from a MapR-ES topic. This creates a DStream that represents the stream of incoming data, where each message is a key-value pair. We use the DStream map transformation to create a DStream with the message values, and then another map transformation with the parsePaymentwID function to create a DStream of PaymentwID objects.

Spark Streaming Code

Spark Streaming Code

The output of the paymentDStream.print(3) is shown below:

output of the paymentDStream.print(3)

For storing lots of streaming data, we need a data store that supports fast writes and scales. The MapR-DB Spark Connector DStream saveToMapRDB method performs a parallel partitioned bulk insert of JSON PaymentwID objects into MapR-DB:

MapR-DB Spark Connector DStream saveToMapRDB method

Save to MapR-DB JSON

MapR-DB

One of the challenges when you are processing lots of data is where to store it? With MapR-DB (HBase API or JSON API), a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.

Scalable and Fast Reads and Writes by Row Key

The Spark MapR-DB Connector leverages the Spark DataSource API. The connector architecture has a connection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR-DB tablets.

Connection in Every Spark Executor

Querying MapR-DB JSON With Spark SQL

The Spark MapR-DB Connector enables users to perform complex SQL queries and updates on top of MapR-DB using a Spark Dataset while applying critical techniques such as projection and filter pushdown, custom partitioning, and data locality.

Querying Application

A Spark Dataset is a distributed collection of data. Dataset is a newer interface that provides the benefits of strong typing, the ability to use powerful lambda functions, and efficient object serialization/ deserialization, combined with the benefits of Spark SQL's optimized execution engine.

Dataset

A DataFrame is a Dataset organized into named columns Dataset[Row]. (In Spark 2.0, the DataFrame APIs merged with Datasets APIs.)

Unified Apache Spark

Loading Data From MapR-DB Into a Spark Dataset

To load data from a MapR-DB JSON table into an Apache Spark Dataset, we first define the Scala class and Spark StructType matching the structure of the JSON objects in the MapR-DB table.

load data from a MapR-DB JSON

Next, we invoke the loadFromMapRDB method on a SparkSession object, providing the tableName, schema, and case class. This will return a Dataset of PaymentwId objects:

Dataset of PaymentwId objects

Explore and query the Payment data with Spark SQL

Datasets provide a domain-specific language for structured data manipulation in Scala, Java, and Python. Below are some examples in Scala. The Dataset show() action displays the top 20 rows in a tabular form.

20 rows in a tabular form

What are the top five natures of payments by count?

What are the top 5 nature of payments by count

What is the count of Payers with payment amounts > $1,000?

Payers with payment amounts > $1000

You can register a Dataset as a temporary table using a given name and then run Spark SQL. Here are some example Spark SQL queries on the payments dataset.

What are the top five physician specialties by amount with count?

5 physician specialties by amount with count

Querying the Data With Apache Drill

Apache Drill is an open-source, low-latency query engine for big data that delivers interactive SQL analytics at petabyte scale. Drill provides a massively parallel processing execution engine, built to perform distributed query processing across the various nodes in a cluster.

Apache Drill

With Drill, you can use SQL to interactively query and join data from files in JSON, Parquet, or CSV format, Hive, and NoSQL stores, including HBase, MapR-DB, and Mongo, without defining schemas. MapR provides a Drill JDBC driver that you can use to connect Java applications and BI tools such as SquirreL and Spotfire to Drill. Below is a snippet of Java code for querying MapR-DB using Drill and JDBC:

Java code for querying MapR-DB using Drill and JDBC

Partial output for this query is shown below:

Partial output for this query

Below are some examples SQL queries using the Drill shell.

What are the top five physicians by total amount?

top 5 physicians by total amount

What are the distinct payers in the Payments table?

distinct payers in the Payments table

Follow the instructions in the GitHub code README to add a secondary index to MapR-DB and try more queries.

Querying With the Open JSON API (OJAI)

Below is a Java example of using the OJAI Query interface to query documents in a MapR-DB JSON table:

Java example

Partial output for this query is shown below:

Partial output

Summary

In this post, you've learned how to consume streaming Open Payments CSV data, transform to JSON, store in a document database, and explore with SQL using Apache Spark, MapR-ES MapR-DB, OJAI, and Apache Drill.

Code

  • You can download the code and data to run these examples from here (refer to the README for complete instructions to run).

Running the Code

All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Converged Data Platform.

MapR-CDP

This example was developed using the MapR 6.0 container for developers, a Docker container that enables you to create a single node MapR cluster. The container is lightweight and designed to run on your laptop. (Refer to the code README for instructions on running the code.)

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,tutorial ,kafka ,spark ,drill ,json ,data analytics ,streaming data ,data pipeline

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}