Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using the Kinetica Spark Connector

DZone's Guide to

Using the Kinetica Spark Connector

Learn how to use an open-source Spark 2.x connector — the Kinetica Spark Connector — for massive and rapid parallel data ingestion into Kinetica.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Recently, I engineered the new Spark 1.x and 2.x connector for massive parallel ingest into Kinetica. This connector is open-source; therefore, it is intended only as an accelerator to get you started on rapidly ingesting data from Spark to Kinetica. You can extend the connector as you see fit. This article focuses on using Spark 2.x to ingest into Kinetica.

Features of the Kinetica Spark Connector include:

  • Open-source; extendable
  • Accepts Spark DataFrame
  • Creates Kinetica table from Spark DataFrame
    • Optimizes datatype conversions between Spark and Kinetica
    • Auto right-sizing string and numeric fields
  • Massive parallel ingestion
    • Clocked in 3+ billion inserts per minute
  • Easy-to-use API interface

Architecture

Each Spark DataFrame partition instantiates a Kinetica Bulk Inserter, which is a native API for MPP ingest.

The connector accepts a Spark DataFrame and a LoaderParams object. LoaderParams objects hold all necessary data to interface with Kinetica.

For this tutorial, the following will be used:

  • A subset of the 2008 airline dataset will be used for ingestion, which is available here for download.
  • The Spark 2.x Kinetica connector JAR, which is available here for download.
  • The Connector Java doc is available here.

How to Use the Kinetica Spark Connector

  1. Launch Spark Shell using the connector JAR:

  2. Import com.kinetica.spark._Image title
  3. Create a Spark DataFrame from the airline dataset:Image title

  4. Create a LoaderParams object. Refer to java to for constructor signature.

    • Each parameter within LoaderParams (i.e. table name) may be fetched and set via getter/setter methods.

    • The connector will create a Kinetica table from a DataFrame LoaderParams attribute createTable is set to true.

    •  Fields are mapped (case-sensitive) from the DataFrame to Kinetica table columns.

      • Setting the LoaderParams attribute mapToSchema to false will use chronological order between Spark DataFrame and Kinetica table columns.

      • Example LoaderParams (Scala):

      val lp = new LoaderParams("http:myserver:9191", "jdbc:simba://myserver:9292;ParentSet=MASTER", "airline", false,"",10000,true,true,"username","password", 4, true, true, 1);
      
      #or set individual parameters
      
      val lp = new LoaderParams()
      
      lp.setKineticaURL("myserver:9191")
      lp.setJdbcURL("jdbc:simba://myserver:9292;ParentSet=MASTER")
      lp.setTablename("airline")
      lp.setTableReplicated(false)
      ...
  5. Set primary key (not required):Image title

  6. Set field AirTime to be compressed using Snappy, lz4, or lz4hc:Image title

  7. Call SparkKineticaLoader.KineticaWriter with the DataFrame and LoaderParam:Image title

That’s it! The table should now appear in Kinetica. For further information, visit Kinetica's connector page. Shortly, I will be publishing a video tutorial. 

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
spark ,kinetica ,big data ,tutorial ,connector ,ingestion

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}