ETL Pipeline to Analyze Healthcare Data With Spark SQL, JSON, and MapR-DB
Learn how to ETL Open Payments CSV file data to JSON, explore with SQL, and store in a document database using Spark Datasets and MapR-DB.
Join the DZone community and get the full member experience.
Join For FreeThis post is based on a recent workshop I helped develop and deliver at a large health services and innovation company’s analytics conference. This company is doing a lot of interesting analytics and machine learning on top of the MapR Converged Data Platform, including an internal “Data Science University.” In this post, we will:
- Extract Medicare Open payments data from a CSV file and load into an Apache Spark Dataset.
- Analyze the data with Spark SQL.
- Transform the data into JSON format and save to the MapR-DB document database.
- Query and Load the JSON data from MapR-DB back into Spark.
A large health payment dataset, JSON, Apache Spark, and MapR-DB are an interesting combination for a health analytics workshop because:
- JSON is an open-standard and efficient format that uses human-readable text to represent, transmit, and interpret data objects consisting of attribute-value pairs. Because JSON is easy for computer languages to manipulate, JSON has supplanted XML for web and mobile applications.
- Newer standards for exchanging healthcare information such as FHIR are easier to implement because they use a modern web-based suite of API technology, including REST and JSON.
- Apache Spark SQL, DataFrames, and datasets make it easy to load, process, transform, and analyze JSON data.
- MapR-DB, a high-performance NoSQL database, supports JSON documents as a native data store. MapR-DB makes it easy to store, query, and build applications with JSON documents.
Apache Spark and MapR-DB
One of the challenges that comes up when you are processing lots of data is where you want to store it. With MapR-DB (HBase API or JSON API), a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.
The MapR-DB OJAI Connector for Apache Spark makes it easier to build real-time or batch pipelines between your JSON data and MapR-DB and leverage Spark within the pipeline. Included is a set of APIs that that enable MapR users to write applications that consume MapR-DB JSON tables and use them in Spark.
The Spark MapR-DB Connector leverages the Spark DataSource API. The connector architecture has a connection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR-DB tablets.
Example Use Case Dataset
Since 2013, Open Payments has been a federal program that collects information about the payments drug and device companies make to physicians and teaching hospitals for things like travel, research, gifts, speaking fees, and meals.
Below is an example of one line from an Open Payments CSV file:
"NEW","Covered Recipient Physician",,,,"132655","GREGG","D","ALZATE",,"8745 AERO DRIVE","STE 200","SAN DIEGO","CA","92123","United States",,,"Medical Doctor","Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology","CA",,,,,"DFINE, Inc","100000000326","DFINE, Inc","CA","United States",90.87,"02/12/2016","1","In-kind items and services","Food and Beverage",,,,"No","No Third Party Payment",,,,,"No","346039438","No","Yes","Covered","Device","Radiology","StabiliT",,"Covered","Device","Radiology","STAR Tumor Ablation System",,,,,,,,,,,,,,,,,"2016","06/30/2017"
There are a lot of fields in this file that we will not use; we will select the following fields:
And transform them into the following JSON object:
{
"_id":"317150_08/26/2016_346122858",
"physician_id":"317150",
"date_payment":"08/26/2016",
"record_id":"346122858",
"payer":"Mission Pharmacal Company",
"amount":9.23,
"Physician_Specialty":"Obstetrics & Gynecology",
"Nature_of_payment":"Food and Beverage"
}
Apache Spark SQL, Datasets, and DataFrames
A Spark dataset is a distributed collection of data. Dataset is a newer interface, which provides the benefits of the older RDD interface (strong typing, ability to use powerful lambda functions) combined with the benefits of Spark SQL’s optimized execution engine. Datasets also provide faster performance than RDDs with more efficient object serialization and deserialization.
A DataFrame is a dataset organized into named columns Dataset[Row]
. (In Spark 2.0, the DataFrame APIs merged with Datasets APIs.)
Read the Data From a CSV File Into a Dataframe
In the following code:
- The SparkSession read method loads a CSV file and returns the result as a DataFrame.
- A user-defined method is used to convert the amount column from a string to a double.
- A local temporary view is created in order to easily use SQL.
One row of the DataFrame is shown below:
Transform Into a Dataset of Payment Objects
Next, we want to select only the fields that we are interested in and transform them into a Dataset of payment objects. First, we define the payment object schema with a Scala case class:
Next, we use Spark SQL to select the fields we want from the DataFrame and convert this to a Dataset[Payment]
by providing the Payment
class. Then, we replace the Payment
view.
One row of the Dataset[Payment]
is shown below:
Explore and Query the Open Payment Data With Spark Dataset
Datasets provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples. The Dataset show()
action displays the top 20 rows in a tabular form.
Dataset's printSchema()
prints the schema to the console in a tree format:
Here are some example queries using the Scala Dataset API on the payments Dataset.
What is the Nature of Payments with reimbursement amounts greater than $1,000 ordered by count?
What are the top five Nature of Payments by count?
You can register a Dataset as a temporary table using a given name and then run Spark SQL. With the Zeppelin Notebook, you can display query results in table or chart formats. Here are some example Spark SQL queries on the payments dataset.
What are the top ten Nature of Payments by count?
What are the top ten Nature of Payments by total amount?
What are the top five physician specialties by total amount?
Here is the same query with the result displayed in a pie chart:
Saving JSON Documents in a MapR-DB JSON Table
In order to save the JSON objects to MapR-DB, the first thing we need to do is define the _id
field, which is the row key and primary index for MapR-DB. In the function below, we create an object with the id equal to a combination of the physician ID, the date, and the record ID. This way the payments will be grouped by physician and date. Next, we use a map operation with the createPaymentwId
function to convert the Dataset[Payment]
to a Dataset[PaymentwId]
, then we convert this to an RDD of JSON documents. (Note that with MapR-DB v6, the Spark connector will support Datasets.)
One row of the RDD of JSON documents is shown below:
In the code below, we save the RDD of JSON objects into a MapR-DB JSON table:
Note that in this example, the table was already created. To create a table using the shell, execute the following at the Linux command line:
mapr dbshell
After starting the shell, run the create command. See mapr dbshell.
Loading Data From a MapR-DB JSON Table
The code below loads the documents from the /user/user01/testable
table into an RDD and prints out two rows:
Projection Pushdown and Predicate Pushdown for the Load API
The “load” API of the connector also supports select
and where
clauses. These can be used for projection pushdown of subsets of fields and/or can filter out documents by using a condition.
Here is an example of how to use the where
clause to restrict the rows:
Similarly, if one wants to project only the nature_of_payment
and payer
fields, and to use the where
clause to restrict the rows by amount, the following code will generate the required output:
Code
- You can download the code and data to run this example from here
- Zeppelin Notebook code
- MapR Sandbox download
Summary
In this blog post, you’ve learned how to ETL Open Payments CSV file data to JSON, explore with SQL, and store in a document database using Spark Datasets and MapR-DB.
Published at DZone with permission of Carol McDonald, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments