Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using Vertica With Spark-Kafka: Reading

DZone 's Guide to

Using Vertica With Spark-Kafka: Reading

Learn about Vertica and the connectivity of Spark and Vertica and how to fetch the data from Vertica and saved the data into Kafka.

· Big Data Zone ·
Free Resource

Vertica is a tool which is really helpful in working with big data. Accordingt to LogiAnalytics:

Vertica is a columnar storage platform designed to handle large volumes of data, which enables very fast query performance in traditionally intensive scenarios.

Now, what do we mean by 'columnar storage?' This means that Vertica stores data in a column format so it can be queried. In this way, Vertica reads only the columns needed to answer the query, which reduces disk I/O and makes it ideal for read-intensive workloads. The following are some of the features provided by Vertica:

  • Column-oriented storage organization.
  • Standard SQL interface with many analytics capabilities built-in.
  • Compression to reduce storage costs.
  • Support for standard programming interfaces.
  • High performance and parallel data transfer.
  • Ability to store machine learning models and use them for database scoring.

To download Vertica and try it out, you can go to official Vertica site. Installing it also is really easy and steps can be found in the documentation.

This post will be focusing on reading the data from Vertica using Spark and dumping it into Kafka. So let's get started.

Setup

First, add the following dependency for Spark SQL and Spark-SQL-Kafka into your build.sbt:

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.3",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.3"
)

Also, from version 2.4.x onwards, Spark supports Scala 2.12.x. Hence, the Scala version can be set 2.12.x in this project.

To support Vertica, we need the two following jars:

  1. vertica-jdbc driver jar
  2. vertica-spark connector

These jars aren't available on Maven, hence we have to manually add these jars into our SBT project. Luckily, Vertica includes these jars into the package we just installed in the following paths:

/opt/vertica/packages/SparkConnector/lib

Now, for different Spark versions, there are different connectors and here we can choose which connector we have to use. As we are using the latest version of Spark, we'll choose the latest version of the connector.

After deciding which connector to use, copy the jars to the project-root/lib/ folder and add the following line to your build.sbt to add the unmanaged jars to the classpath:

unmanagedJars in Compile ++= Seq(
  baseDirectory.value / "lib/.jar",
  baseDirectory.value / "lib/.jar")

That's it. All required dependencies are there and we are now ready to start coding.

Where's the Code?

To read the data from Vertica, first, we have to provide some properties and credentials to access the Vertica. It'll need the following properties:

val properties: Map[String, String] = Map(
  "db" -> "db", // Database name
  "user" -> "user", // Database username
  "password" -> "password", // Password
  "table" -> "source.table", // vertica table name
  "dbschema" -> "source.dbschema", // schema of vertica where the table will be residing
  "host" -> "host", // Host on which vertica is currently running
  "numPartitions" -> "source.numPartitions" // Num of partitions to be created in resulting DataFrame
)

The next thing is to create a Spark Session to connect with Spark and Spark-SQL:

val sparkSession = SparkSession.builder()
  .config(new SparkConf().setAll(config))
  .appName(appName)
  .master(master)
  .getOrCreate()

Here, appName will be the name you want to set to your Spark application and master will be the master URL for Spark. Here we are running Spark in local mode, so we set master to the following:

master = "local[]"

One last thing we need is a data source to provide to Spark for Vertica:

val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"

Now, we can read the data as a DataFrame from Vertica through the following code:

val verticaDF = sparkSession.read.format(verticaDataSource).options(properties).load()

To verify the data, we can use the show method of the DataFrame, which will print the DataFrame to the console:

val numRows: Int = 
verticaDF.show(numRows)

Now the reading data is complete. The next step is to save this data to Kafka.

To save data to Kafka we again need to set up some properties for Kafka:

val kafkaSinkProperties = Map(
    "kafka.bootstrap.servers" -> "brokers-host:brokers-port", //Host and port of Kafka broker
    "topic" -> "sinkTopic" //Topic name which needs to be populated with the data
  )

We need the source for Kafka as well to save data into it.

val kafkaDataSource = "kafka"

Now, all we need to do is to save the DataFrame which we created from Vertica. The following code saved the DataFrame into Kafka:

verticaDF.write.format(kafkaDataSource).options(kafkaSinkProperties).mode("append").save()

Here the mode is set as "append" (can be "append" / "overwrite" / "ignore") to append the new data into existing data. The above code will save the data into Kafka.

One last thing is to stop SparkSession after all the operations are done:

sparkSession.stop()

And with this, we are done. Yup, that was all to it.

In this post, we learned a bit more about Vertica and the connectivity of Spark and Vertica. We fetched the data from Vertica and saved the data into Kafka. In future installments, we will do the reverse, i.e reading from Kafka and writing data into Vertica. We will also try out some Structured Streaming with some compatible use cases.

For more, you can refer to this repository which implements the same thing in the VerticaToKafkaApplication object.

Like, comment, share and stay tuned!

Thanks!

Topics:
big data ,apache kafka ,apache spark ,vertica ,reading data

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}