Load CSV Data into Couchbase using Apache Spark
In this article, we have a look at how to load CSV data into Couchbase via Apache Spark and its Couchbase Spark Connector in just a few steps.
I've been spending a lot of time working with Big Data tools lately, in particular Apache Spark. In case you're unfamiliar, Apache Spark is an incredibly efficient tool for processing massive amounts of data. It performs significantly better than MapReduce, and in reality, it isn't too difficult to use.
Apache Spark works very will in combination with Couchbase through the Couchbase Spark Connector. We're going to see what it takes to load some raw comma separated value (CSV) data into Couchbase using Apache Spark.
The Requirements
There are not too many requirements to get this project up and running. At a mimimum you'll need the following:
- Apache Spark 1.6.1
- JDK 1.8+
- Apache Maven 3.3+
- Couchbase Server 4.1+
Most of the development will happen with the JDK 1.8 and Maven, but when it comes time to running the application, Apache Spark will be needed, whether that is through a local instance or remote instance.
Understanding the Dataset and Data Model
A great way to get your feet wet when it comes to Apache Spark is to get a sample dataset through the data science website, Kaggle. For this example we're going to take a look at the sample dataset called SF Salaries which has information regarding how much money government employees in San Francisco are earning.
From a data perspective there is a single comma separated value (CSV) file called salaries.csv with the following columns in it:
- Id
- EmployeeName
- JobTitle
- BasePay
- OvertimePay
- OtherPay
- Benefits
- TotalPay
- TotalPayBenefits
- Year
- Notes
- Agency
- Status
Working with the data in CSV format is near impossible. More so when it is massive amounts of it. Instead, this data is going to be stored as NoSQL data so it can be later processed. We won't get into the number crunching and querying here, but it will come in a future article. Right now we just want to get it into NoSQL format.
When loaded into Couchbase, each row of the CSV will look something like the following:
{
"Id": "10029",
"EmployeeName": "FERGAL CLANCY",
"JobTitle": "BUILDING INSPECTOR",
"BasePay": "94529.22",
"OvertimePay": "0",
"OtherPay": "2502.6",
"Benefits": "",
"TotalPay": "97031.82",
"TotalPayBenefits": "97031.82",
"Year": "2011",
"Notes": "",
"Agency": "San Francisco",
"Status": ""
}
Yes, the above chunk of data is a JSON document, which is what Couchbase supports. Now that we know the data goals, we can begin loading the CSV data into Couchbase with Apache Spark.
Transforming the Raw Data and Writing to Couchbase
To use Apache Spark in a Java application, a few dependencies must be included. We need to include Spark Core, Spark SQL, Spark CSV, and the Couchbase Spark Connector. Since we're using Maven, all can be included via the Maven pom.xml file. To include Spark Core, include the following dependency in your Maven file:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
</dependency>
Since the raw data will be in the form of CSV, we can use the convenience package for Spark called Spark CSV. The Maven dependency for Spark CSV can be added like this:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
The CSV data will be loaded into an Apache Spark DataFrame. If you're unfamiliar with DataFrames, they can be queried using Spark SQL. This is part of how we'll get the data into Couchbase. To include Spark SQL into your project, add the Maven dependency like so:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
Finally, Apache Spark needs to be connected to Couchbase Server. This can be done through the Couchbase Connector for Spark. To add this dependency into your Maven project, add the following to your pom.xml file:
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>spark-connector_2.10</artifactId>
<version>1.1.0</version>
</dependency>
All the project dependencies are good to go!
To start loading CSV data via Java code, Apache Spark must first be configured within our project. This includes defining what Spark instance to use and what Couchbase bucket to store data into.
SparkConf conf = new SparkConf()
.setAppName("SF Salaries")
.setMaster("local[*]")
.set("com.couchbase.bucket.default", "");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
The application name will be SF Salaries and the master Spark cluster will be the local machine since Spark will be running locally in this example. The Couchbase bucket to be used is once again the default bucket.
To create a Spark DataFrame, a SQLContext
must be created from the JavaSparkContext
.
SQLContext sqlContext = new SQLContext(javaSparkContext);
Using the SQLContext the CSV data can be read like so:
DataFrame dataFrame = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("PATH_TO_CSV_FILE");
The read process will use the Spark CSV package and preserve the header information that exists at the top of the CSV file. When read into a DataFrame, the CSV data is now something Couchbase can understand.
An adjustment must be made to the id data. Spark will recognize it as an integer or numeric because this dataset only has numeric values as the column. Couchbase expects a string id.
dataFrame = dataFrame.withColumn("Id", df.col("Id").cast("string"));
The DataFrame can now be prepared for saving to Couchbase.
DataFrameWriterFunctions dataFrameWriterFunctions = new DataFrameWriterFunctions(dataFrame.write());
Map
With the DataFrame data piped into the appropriate DataFrameWriterFunctions
object, the id value can be mapped to a document id. The data at this point can be saved.
dataFrameWriterFunctions.couchbase(options);
Massive amounts of Couchbase documents will be saved to the bucket.
Running the Project With Apache Spark
Package the project into an executable JAR using Maven. The project can be executed after being packaged by doing something like this:
/path/to/apache/spark/bin/spark-submit --class "com.app.Main" target/project-jar-with-dependencies.jar
Depending on the size of the dataset and the speed of your computer or server, the load process could take a while.
Conclusion
You just got a taste of loading dirty CSV data into Couchbase by using Apache Spark and the Couchbase Spark Connector. Spark was designed to be able to quickly process massive amounts of data in real time. Combine it with Couchbase and its memory-centric architecture and you have a great package of software.
Comments