Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Load CSV Data into Couchbase using Apache Spark

DZone's Guide to

Load CSV Data into Couchbase using Apache Spark

In this article, we have a look at how to load CSV data into Couchbase via Apache Spark and its Couchbase Spark Connector in just a few steps.

· Big Data Zone
Free Resource

See how the beta release of Kubernetes on DC/OS 1.10 delivers the most robust platform for building & operating data-intensive, containerized apps. Register now for tech preview.

I've been spending a lot of time working with Big Data tools lately, in particular Apache Spark. In case you're unfamiliar, Apache Spark is an incredibly efficient tool for processing massive amounts of data. It performs significantly better than MapReduce, and in reality, it isn't too difficult to use.

Apache Spark works very will in combination with Couchbase through the Couchbase Spark Connector. We're going to see what it takes to load some raw comma separated value (CSV) data into Couchbase using Apache Spark.

The Requirements

There are not too many requirements to get this project up and running. At a mimimum you'll need the following:

Most of the development will happen with the JDK 1.8 and Maven, but when it comes time to running the application, Apache Spark will be needed, whether that is through a local instance or remote instance.

Understanding the Dataset and Data Model

A great way to get your feet wet when it comes to Apache Spark is to get a sample dataset through the data science website, Kaggle. For this example we're going to take a look at the sample dataset called SF Salaries which has information regarding how much money government employees in San Francisco are earning.

From a data perspective there is a single comma separated value (CSV) file called salaries.csv with the following columns in it:

  1. Id
  2. EmployeeName
  3. JobTitle
  4. BasePay
  5. OvertimePay
  6. OtherPay
  7. Benefits
  8. TotalPay
  9. TotalPayBenefits
  10. Year
  11. Notes
  12. Agency
  13. Status

Working with the data in CSV format is near impossible. More so when it is massive amounts of it. Instead, this data is going to be stored as NoSQL data so it can be later processed. We won't get into the number crunching and querying here, but it will come in a future article. Right now we just want to get it into NoSQL format.

When loaded into Couchbase, each row of the CSV will look something like the following:


{
    "Id": "10029",
    "EmployeeName": "FERGAL CLANCY",
    "JobTitle": "BUILDING INSPECTOR",
    "BasePay": "94529.22",
    "OvertimePay": "0",
    "OtherPay": "2502.6",
    "Benefits": "",
    "TotalPay": "97031.82",
    "TotalPayBenefits": "97031.82",
    "Year": "2011",
    "Notes": "",
    "Agency": "San Francisco",
    "Status": ""
}

Yes, the above chunk of data is a JSON document, which is what Couchbase supports. Now that we know the data goals, we can begin loading the CSV data into Couchbase with Apache Spark.

Transforming the Raw Data and Writing to Couchbase

To use Apache Spark in a Java application, a few dependencies must be included. We need to include Spark Core, Spark SQL, Spark CSV, and the Couchbase Spark Connector. Since we're using Maven, all can be included via the Maven pom.xml file. To include Spark Core, include the following dependency in your Maven file:


<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.1</version>
</dependency>

Since the raw data will be in the form of CSV, we can use the convenience package for Spark called Spark CSV. The Maven dependency for Spark CSV can be added like this:


<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

The CSV data will be loaded into an Apache Spark DataFrame. If you're unfamiliar with DataFrames, they can be queried using Spark SQL. This is part of how we'll get the data into Couchbase. To include Spark SQL into your project, add the Maven dependency like so:


<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.1</version>
    <scope>provided</scope>
</dependency>

Finally, Apache Spark needs to be connected to Couchbase Server. This can be done through the Couchbase Connector for Spark. To add this dependency into your Maven project, add the following to your pom.xml file:


<dependency>
    <groupId>com.couchbase.client</groupId>
    <artifactId>spark-connector_2.10</artifactId>
    <version>1.1.0</version>
</dependency>

All the project dependencies are good to go!

To start loading CSV data via Java code, Apache Spark must first be configured within our project. This includes defining what Spark instance to use and what Couchbase bucket to store data into.


SparkConf conf = new SparkConf()
        .setAppName("SF Salaries")
        .setMaster("local[*]")
        .set("com.couchbase.bucket.default", "");
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

The application name will be SF Salaries and the master Spark cluster will be the local machine since Spark will be running locally in this example. The Couchbase bucket to be used is once again the default bucket.

To create a Spark DataFrame, a SQLContext must be created from the JavaSparkContext.


SQLContext sqlContext = new SQLContext(javaSparkContext);

Using the SQLContext the CSV data can be read like so:


DataFrame dataFrame = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("PATH_TO_CSV_FILE");

The read process will use the Spark CSV package and preserve the header information that exists at the top of the CSV file. When read into a DataFrame, the CSV data is now something Couchbase can understand.

An adjustment must be made to the id data. Spark will recognize it as an integer or numeric because this dataset only has numeric values as the column. Couchbase expects a string id.


dataFrame = dataFrame.withColumn("Id", df.col("Id").cast("string"));

The DataFrame can now be prepared for saving to Couchbase.


DataFrameWriterFunctions dataFrameWriterFunctions = new DataFrameWriterFunctions(dataFrame.write());
Map

With the DataFrame data piped into the appropriate DataFrameWriterFunctions object, the id value can be mapped to a document id. The data at this point can be saved.


dataFrameWriterFunctions.couchbase(options);

Massive amounts of Couchbase documents will be saved to the bucket.

Running the Project With Apache Spark

Package the project into an executable JAR using Maven. The project can be executed after being packaged by doing something like this:


/path/to/apache/spark/bin/spark-submit --class "com.app.Main" target/project-jar-with-dependencies.jar

Depending on the size of the dataset and the speed of your computer or server, the load process could take a while.

Conclusion

You just got a taste of loading dirty CSV data into Couchbase by using Apache Spark and the Couchbase Spark Connector. Spark was designed to be able to quickly process massive amounts of data in real time. Combine it with Couchbase and its memory-centric architecture and you have a great package of software.

New Mesosphere DC/OS 1.10: Production-proven reliability, security & scalability for fast-data, modern apps. Register now for a live demo.

Topics:
couchbase ,spark ,apache ,csv ,database ,load ,etl

Published at DZone with permission of Nic Raboy, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}