Using Vertica With Spark-Kafka: Writing
Using Vertica With Spark-Kafka: Writing
A developer gives a tutorial on how to get started using Spark and Vertica to bring in data to your big data environment for analysis.
Join the DZone community and get the full member experience.Join For Free
In the previous post in this series, we took a glance over the basic definitions of Spark and Vertica. We also did a code overview for reading data from Vertica using Spark as DataFrame and saving the data into Kafka. In this post, we will be doing the reverse flow, i.e. working on reading the data from Kafka as a DataFrame and writing that DataFrame into Vertica.
Looks pretty simple, right? That's exactly what I thought when I was getting started with this. So let's get right into it.
In Vertica, there are type of storages:
- WOS: write optimized store
- ROS: read optimized store
Usually, when the data is loaded, it first goes into WOS (unless you use the COPY DIRECT statement). Records in WOS are stored without compression or indexing to support faster loading. Because the data in WOS is sorted only when queried, it is not optimized for reading. Then Vertica moves data from WOS to ROS using the Tuple Mover using the following two operations:
The above method is used when you have a load of 100 MB or less (trickle data load). For heavier data load than this, there is the
DIRECT COPY command, which copies data directly to ROS. In this blog, we are considering big data movements, hence we'll be focusing on using the
DIRECT COPY command.
So, let's move on to connecting Spark with Vertica. With Spark, there are two ways to use the
DIRECT COPY command to save data to vertica:
The direct method, which copies data directly from Spark to Vertica.
Although it may seem that the first approach, "direct method," is better as it doesn't have any intermediate dependency and the data is travelling directly to Vertica, potetntially causing less latency, believe me, it is not and we'll be using the indirect method to connect Spark with Vertica.
Problems With the Direct Method:
- This method uses Spark's JDBC connector to save DataFrames into Vertica, which doesn't support Vertica very well in cases of task failures from Spark, and can introduce duplicate data in Vertica as a result.
- Also, this method is slow in copying data to vertica.
Why Use the Indirect Method?
- This method uses a dedicated Spark connector instead of a JDBC connector and hence can manage with Spark task failures and atomicity of Vertica.
- In this method, Spark first copies data to HDFS as ORC files and then Vertica natively runs its built-in
COPYcommand to fetch data from HDFS, which is supposed to be very fast. Hence, this method is fast compared to the other one.
So let's get on with it.
The first thing we need to do is to set up the project. The setup can be referred from the setup section of the previous post in this series. Other than that, we will also need HDFS, as it will be used as the intermediate storage.
The first thing is to create a SparkSession to connect with Spark and Spark-SQL:
val sparkSession = SparkSession.builder() .config(new SparkConf().setAll(config)) .appName(appName) .master(master) .getOrCreate()
appName will be the name you want to set to your Spark application and
master will be the master URL for Spark. Here, we are running Spark in local mode:
master = "local" //4 is number of cores to be used
For reading data from Kafka we need the following:
def kafkaOptions: Map[String, String] = Map( "kafka.bootstrap.servers" -> brokers, //address of Kafka brokers "group.id" -> groupId, //group id for kafka consumers "startingOffsets" -> "earliest", //starting offsets to start picking data "endingOffsets" -> "latest", //the limit till where to pick the data and create a dataframe "subscribe" -> sourceTopic //the topic from which the data will be consumed ) val kafkaSource: String = "kafka" val dataFrame = sparkSession.read.format(kafkaSource).options(kafkaOptions).load()
The above code will read the data and create the DataFrame. Now, we have to start the preparations for writing this DataFrame to Vertica.
To save the data to Vertica, first we need some credentials to connect to database and some other properties:
val verticaProperties: Map[String, String] = Map( "db" -> "db", // Database name "user" -> "user", // Database username "password" -> "password", // Password "table" -> "table", // vertica table name "dbschema" -> "dbschema", // schema of vertica where the table will be residing "host" -> "host", // Host on which vertica is currently running "hdfs_url" -> "hdfs_url", // HDFS directory url in which intermediate orc file will persist before sending it to vertica "web_hdfs_url" -> "web_hdfs_url" // FileSystem interface for HDFS over the Web )
One last thing, we need a data source to provide data to Spark for Vertica:
val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
Now for the actual writing part, the following code will do the trick:
Here the mode is set as "append" ( can be "append" / "overwrite" / "ignore") to append the new data into existing data. The above code will save the data into Vertica.
One last thing is to stop the
SparkSession after all the operations:
And, yeah, this was pretty much it for writing data to Vertica. Easy enough, huh?!
In this post, we reviewed the types data storages in Vertica and how the data flows through them while saving. We also discussed the ways of saving data into Vertica using Spark and used a Spark-Vertica connector to save the data into Vertica. The one thing to note here is that we saved the data in batch mode not in streaming mode. That's for another post. Hence, in out next installment, we'll be taking a look at how we can do the same thing using Spark Structured Streaming.
For more, you can refer to this repository which implements the same thing in the KafkaToVerticaApplication object.
Like, comment, share, and stay tuned.
Published at DZone with permission of Anuj Saxena , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.