Streaming Data Pipeline to Transform, Store, and Explore With Kafka, Spark, and Drill
Learn how to consume streaming Open Payments CSV data, transform it to JSON, store it in a document database, and explore with SQL using Apache Spark, MapR-ES MapR-DB, OJAI, and Apache Drill.
Join the DZone community and get the full member experience.
Join For FreeIn the past, big data was interacted with in batch on a once-a-day basis. Now, data is dynamic, and data-driven businesses need instant results from continuously changing data. Data pipelines, which combine real-time stream processing with the collection, analysis, and storage of large amounts of data, enable modern, real-time applications, analytics and reporting.
This post is based on a recent workshop I helped develop and deliver at a large health service and innovation company's analytics conference. This company is combining streaming data pipelines with data science on top of the MapR Converged Data Platform to improve healthcare outcomes, improve access to appropriate care, better manage cost, and reduce fraud, waste, and abuse.
In this post we will:
- Use Apache Spark streaming to consume Medicare Open payments data using the Apache Kafka API.
- Transform the streaming data into JSON format and save to the MapR-DB document database.
- Query the MapR-DB JSON table with Apache Spark SQL, Apache Drill, and the Open JSON API (OJAI) and Java.
Example Use Case Dataset
Since 2013, Open Payments has been a federal program that collects information about the payments drug and device companies make to physicians and teaching hospitals for things like travel, research, gifts, speaking fees, and meals.
For a large health payment dataset, JSON, Apache Spark, MapR-ES, and MapR-DB are an interesting combination for a health analytics workshop because:
- JSON is an open-standard and efficient format that is easy for computer languages to manipulate. Newer standards for exchanging healthcare information such as FHIR are easier to implement because they use a modern suite of API technology, including JSON.
- Apache Spark SQL, DataFrames, and Datasets make it easy to load, process, transform, and analyze JSON data. MapR-ES is a distributed messaging system for streaming event data at scale. MapR-ES integrates with Spark Streaming via the Kafka API.
- MapR-DB, a high-performance NoSQL database, supports JSON documents as a native data store. MapR-DB makes it easy to store, query, and build applications with JSON documents. The Spark connector makes it easy to build real-time or batch pipelines between your JSON data and MapR-DB and leverage Spark within the pipeline.
How Do You Build a Data Pipeline That Handles Millions of Events in Real-Time at Scale?
A common data pipeline architecture pattern is event-sourcing using an append-only publish-subscribe event stream such as MapR Event Streams (which provides a Kafka API). MapR-ES Topics are logical collections of events that organize events into categories and decouple producers from consumers, making it easy to add new producers and consumers. Topics are partitioned for throughput and scalability, producers are load-balanced, and consumers can be grouped to read in parallel. MapR-ES can scale to very high throughput levels, easily delivering millions of messages per second using very modest hardware.
Processing Streaming Data With Spark
Apache Spark Streaming is an extension of the core Spark API that enables continuous data stream processing. Data streams can be processed with Spark's core, SQL, GraphX, or machine learning APIs, and can be persisted to a file system, HDFS, MapR-XD, MapR-DB, HBase, or any data source offering a Hadoop OutputFormat or Spark connector. Stream processing of events is useful for filtering, transforming, creating counters and aggregations, correlating values, joining streams together, machine learning, and publishing to a different topic for pipelines.
MapR Event Streams integrates with Spark Streaming via the Kafka direct approach. The MapR-DB OJAI Connector for Apache Spark enables you to use MapR-DB as a sink for Apache Spark Data Streams.
The incoming data is in CSV format. An example is shown below:
"NEW","Covered Recipient Physician",,,,"132655","GREGG","D","ALZATE",,"8745 AERO DRIVE","STE 200","SAN DIEGO","CA","92123","United States",,,"Medical Doctor","Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology","CA",,,,,"DFINE, Inc","100000000326","DFINE, Inc","CA","United States",90.87,"02/12/2016","1","In-kind items and services","Food and Beverage",,,,"No","No Third Party Payment",,,,,"No","346039438","No","Yes","Covered","Device","Radiology","StabiliT",,"Covered","Device","Radiology","STAR Tumor Ablation System",,,,,,,,,,,,,,,,,"2016","06/30/2017"
There are a lot of fields in this data that we will not use; we will parse the following fields:
And transform them into the following JSON document for storing in MapR-DB:
{
"_id" :"317150_08/26/2016_346122858",
"physician_id" :"317150",
"date_payment" :"08/26/2016",
"record_id" :"346122858",
"payer" :"Mission Pharmacal Company",
"amount" :9.23,
"Physician_Specialty" :"Obstetrics & Gynecology",
"Nature_of_payment" :"Food and Beverage"
}
Spark Kafka consumer-producer code
Note: Code snippets are shown here; you can download the complete code and instructions from the GitHub link at the end of this post.
Parsing the Dataset Records
A Scala Payment case class defines the schema corresponding to the CSV data that we are interested in. The parsePayment
function parses a line of comma-separated values into the Payment case class.
A PaymentwId
class defines the JSON document schema for MapR-DB.
In order to save the JSON objects to MapR-DB, we need to define the _id
field, which is the row key and primary index for MapR-DB. The parsePaymentwID
function creates an object with an ID equal to a combination of the physician ID, the date, and the record ID. Since MapR-DB row keys are partitioned and sorted by row key when inserted, the payment documents will be grouped by physician and date in MapR-DB. This will give really fast queries, aggregations, and sorting by physician ID and date. We will also look at secondary indexes later in this post.
Spark Streaming Code
We use the KafkaUtils createDirectStream
method with Kafka configuration parameters to create an input stream from a MapR-ES topic. This creates a DStream that represents the stream of incoming data, where each message is a key-value pair. We use the DStream map transformation to create a DStream with the message values, and then another map transformation with the parsePaymentwID
function to create a DStream of PaymentwID
objects.
The output of the paymentDStream.print(3)
is shown below:
For storing lots of streaming data, we need a data store that supports fast writes and scales. The MapR-DB Spark Connector DStream saveToMapRDB
method performs a parallel partitioned bulk insert of JSON PaymentwID
objects into MapR-DB:
MapR-DB
One of the challenges when you are processing lots of data is where to store it? With MapR-DB (HBase API or JSON API), a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.
The Spark MapR-DB Connector leverages the Spark DataSource API. The connector architecture has a connection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR-DB tablets.
Querying MapR-DB JSON With Spark SQL
The Spark MapR-DB Connector enables users to perform complex SQL queries and updates on top of MapR-DB using a Spark Dataset while applying critical techniques such as projection and filter pushdown, custom partitioning, and data locality.
A Spark Dataset is a distributed collection of data. Dataset is a newer interface that provides the benefits of strong typing, the ability to use powerful lambda functions, and efficient object serialization/ deserialization, combined with the benefits of Spark SQL's optimized execution engine.
A DataFrame is a Dataset organized into named columns Dataset[Row]
. (In Spark 2.0, the DataFrame APIs merged with Datasets APIs.)
Loading Data From MapR-DB Into a Spark Dataset
To load data from a MapR-DB JSON table into an Apache Spark Dataset, we first define the Scala class and Spark StructType
matching the structure of the JSON objects in the MapR-DB table.
Next, we invoke the loadFromMapRDB
method on a SparkSession object, providing the tableName
, schema, and case class. This will return a Dataset of PaymentwId
objects:
Explore and query the Payment data with Spark SQL
Datasets provide a domain-specific language for structured data manipulation in Scala, Java, and Python. Below are some examples in Scala. The Dataset show()
action displays the top 20 rows in a tabular form.
What are the top five natures of payments by count?
What is the count of Payers with payment amounts > $1,000?
You can register a Dataset as a temporary table using a given name and then run Spark SQL. Here are some example Spark SQL queries on the payments dataset.
What are the top five physician specialties by amount with count?
Querying the Data With Apache Drill
Apache Drill is an open-source, low-latency query engine for big data that delivers interactive SQL analytics at petabyte scale. Drill provides a massively parallel processing execution engine, built to perform distributed query processing across the various nodes in a cluster.
With Drill, you can use SQL to interactively query and join data from files in JSON, Parquet, or CSV format, Hive, and NoSQL stores, including HBase, MapR-DB, and Mongo, without defining schemas. MapR provides a Drill JDBC driver that you can use to connect Java applications and BI tools such as SquirreL and Spotfire to Drill. Below is a snippet of Java code for querying MapR-DB using Drill and JDBC:
Partial output for this query is shown below:
Below are some examples SQL queries using the Drill shell.
What are the top five physicians by total amount?
What are the distinct payers in the Payments table?
Follow the instructions in the GitHub code README to add a secondary index to MapR-DB and try more queries.
Querying With the Open JSON API (OJAI)
Below is a Java example of using the OJAI Query interface to query documents in a MapR-DB JSON table:
Partial output for this query is shown below:
Summary
In this post, you've learned how to consume streaming Open Payments CSV data, transform to JSON, store in a document database, and explore with SQL using Apache Spark, MapR-ES MapR-DB, OJAI, and Apache Drill.
Code
- You can download the code and data to run these examples from here (refer to the README for complete instructions to run).
Running the Code
All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Converged Data Platform.
This example was developed using the MapR 6.0 container for developers, a Docker container that enables you to create a single node MapR cluster. The container is lightweight and designed to run on your laptop. (Refer to the code README for instructions on running the code.)
Published at DZone with permission of Carol McDonald, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Boosting Application Performance With MicroStream and Redis Integration
-
13 Impressive Ways To Improve the Developer’s Experience by Using AI
-
Insider Threats and Software Development: What You Should Know
-
Hyperion Essbase Technical Functionality
Comments