Using the Kinetica Spark Connector
Using the Kinetica Spark Connector
Learn how to use an open-source Spark 2.x connector — the Kinetica Spark Connector — for massive and rapid parallel data ingestion into Kinetica.
Join the DZone community and get the full member experience.
Join For FreeLearn how to operationalize machine learning and data science projects to monetize your AI initiatives. Download the Gartner report now.
Recently, I engineered the new Spark 1.x and 2.x connector for massive parallel ingest into Kinetica. This connector is open-source; therefore, it is intended only as an accelerator to get you started on rapidly ingesting data from Spark to Kinetica. You can extend the connector as you see fit. This article focuses on using Spark 2.x to ingest into Kinetica.
Features of the Kinetica Spark Connector include:
- Open-source; extendable
- Accepts Spark DataFrame
- Creates Kinetica table from Spark DataFrame
- Optimizes datatype conversions between Spark and Kinetica Auto right-sizing string and numeric fields
- Massive parallel ingestion
- Clocked in 3+ billion inserts per minute
- Easy-to-use API interface
Architecture
Each Spark DataFrame partition instantiates a Kinetica Bulk Inserter, which is a native API for MPP ingest.
The connector accepts a Spark DataFrame and a LoaderParams
object. LoaderParams
objects hold all necessary data to interface with Kinetica.
For this tutorial, the following will be used:
- A subset of the 2008 airline dataset will be used for ingestion, which is available here for download.
- The Spark 2.x Kinetica connector JAR, which is available here for download.
- The Connector Java doc is available here.
How to Use the Kinetica Spark Connector
Launch Spark Shell using the connector JAR:
- Import
com.kinetica.spark._
: Create a Spark DataFrame from the airline dataset:
Create a
LoaderParams
object. Refer to java to for constructor signature.Each parameter within
LoaderParams
(i.e. table name) may be fetched and set via getter/setter methods.The connector will create a Kinetica table from a DataFrame
LoaderParams
attributecreateTable
is set totrue
.Fields are mapped (case-sensitive) from the DataFrame to Kinetica table columns.
Setting the
LoaderParams
attributemapToSchema
tofalse
will use chronological order between Spark DataFrame and Kinetica table columns.Example
LoaderParams
(Scala):
val lp = new LoaderParams("http:myserver:9191", "jdbc:simba://myserver:9292;ParentSet=MASTER", "airline", false,"",10000,true,true,"username","password", 4, true, true, 1); #or set individual parameters val lp = new LoaderParams() lp.setKineticaURL("myserver:9191") lp.setJdbcURL("jdbc:simba://myserver:9292;ParentSet=MASTER") lp.setTablename("airline") lp.setTableReplicated(false) ...
Set primary key (not required):
Set field
AirTime
to be compressed using Snappy, lz4, or lz4hc:Call
SparkKineticaLoader.KineticaWriter
with the DataFrame andLoaderParam
:
That’s it! The table should now appear in Kinetica. For further information, visit Kinetica's connector page. Shortly, I will be publishing a video tutorial.
Bias comes in a variety of forms, all of them potentially damaging to the efficacy of your ML algorithm. Our Chief Data Scientist discusses the source of most headlines about AI failures here.
Opinions expressed by DZone contributors are their own.
{{ parent.title || parent.header.title}}
{{ parent.tldr }}
{{ parent.linkDescription }}
{{ parent.urlSource.name }}