Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Kundera, an Object Mapper for Big Flat Files (CSV/JSON)

DZone's Guide to

Kundera, an Object Mapper for Big Flat Files (CSV/JSON)

Dive into Kundera, a tool that aims to make working with NoSQL as simple as possible.

· Integration Zone ·
Free Resource

SnapLogic is the leading self-service enterprise-grade integration platform. Download the 2018 GartnerMagic Quadrant for Enterprise iPaaS or play around on the platform, risk free, for 30 days.

Kundera is a "Polyglot Object Mapper" with a JPA interface. Kundera currently supports Cassandra, MongoDB, HBase, Redis, OracleNoSQL, Neo4j, CouchDB, Kudu, Relational databases, and Apache Spark.

New folks? Here is how you can get Started in 5 minutes.

Who Should Read This:

There are many articles and docs for using Kundera with NoSQL datastores. This one is for those who are dealing with CSV/JSON (structured) files for storing data and would want to use a JPA interface for querying on the files.

Why Choose Kundera:

In addition to the JPA interface, one can take advantage of Kundera's Polyglot Persistence. This feature is supported by using Aapache Spark under the hood.

How to Use:

Spark Core Module

This module includes support for HDFS and FS (CSV & JSON). Users can read/write data from/to a file at HDFS or in the file system as CSV or JSON. SQL queries can also be performed over data.

To use it, add the following dependency in pom.xml.

<dependency>
    <groupId>com.impetus.kundera.client</groupId>
    <artifactId>kundera-spark</artifactId>
    <version>${kundera.version}</version>
</dependency>


Persistence Unit Configuration

<persistence-unit name="spark_hdfs_pu">
    <provider>com.impetus.kundera.KunderaPersistence</provider>
    <properties>
        <property name="kundera.nodes" value="localhost" />
        <property name="kundera.port" value="7077" />
        <property name="kundera.keyspace" value="sparktest" />
        <property name="kundera.dialect" value="spark" />
        <property name="kundera.client" value="hdfs" />
        <property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
        <property name="kundera.client.property" value="KunderaSparkTest.xml" />
    </properties>
</persistence-unit>


Spark Related Properties

Spark Related Properties are configured using an XML file. This file is given to Kundera using  kundera.client.property through  persistence.xml.

Sample Property File:

<?xml version="1.0" encoding="UTF-8"?>
<clientProperties>
   <datastores>
      <dataStore>
         <name>hdfs</name>
         <connection>
            <properties>
               <property name="spark.master" value="local" />
               <property name="spark.app.name" value="sparkhdfs" />
               <property name="spark.executor.memory" value="1g" />
               <property name="spark.driver.allowMultipleContexts" value="true" />
            </properties>
         </connection>
      </dataStore>
   </datastores>
</clientProperties>


Here "spark.master" and "spark.app.name" properties are mandatory. We can add more spark related properties as per the requirement.

Entity

@Entity
@Table(name = "spark_person")
public class Person implements Serializable
{

    /** The Constant serialVersionUID. */
    private static final long serialVersionUID = 1L;

    /** The person id. */
    @Id
    private String personId;

    /** The person name. */
    private String personName;

    /** The age. */
    private int age;

    /** The salary. */
    private Double salary;

   // setters and getters. 
}


Basic Configuration

We can set the path and format of data for FS/HDFS file wherever we want to save, read or write data. These parameters can be set using entity manager level properties as shown below:

For HDFS:

em.setProperty("kundera.hdfs.inputfile.path", "hdfs://localhost:9000/sparkInputTest/input");
em.setProperty("kundera.hdfs.outputfile.path", "hdfs://localhost:9000/sparkOutputTest/output");


For FS:

em.setProperty("kundera.fs.inputfile.path", "src/test/resources/csv_input/")
em.setProperty("kundera.fs.outputfile.path", "src/test/resources/csv_output/")


Format:

em.setProperty("format", "json");


Note: currently CSV & JSON formats are supported.

Read-Write Operation

EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_hdfs_pu");
EntityManager em = emf.createEntityManager();
Person person = new Person();
person.setAge(23);
person.setPersonId("1");
person.setPersonName("Dev");
person.setSalary(100000.0);

// save data 
em.persist(person);

em.clear();

Person peronFound = em.find(Person.class, "1"); 

em.close();
emf.close();


Query Operation

Select all:

String query = "select * from spark_person"; 
List results = em.createNativeQuery(query).getResultList();


Select with WHERE:

String query = "select * from spark_person where salary > 35000";
List results = em.createNativeQuery(query).getResultList();


Select with LIKE:

String query = "select * from spark_person where personName like 'De%'";
List results = em.createNativeQuery(query).getResultList();


Sum (Aggregation):

String query = "select sum(salary) from spark_person";
List results = em.createNativeQuery(query).getResultList();


Saving Data After Querying

We can save the results of the query in HDFS/FS or Cassandra as CSV or JSON.

General Format:

INSERT INTO <source>.<path-to-table/file> [AS <file-type>] FROM <sql-query>

  • source: Its value can be FS or HDFS and if we want to save in database, its value is cassandra (currently Cassandra is supported)
  • path-to-table/file: For FS/HDFS, its path to directory and for database, its dbname.tablename
  • file-type: It is required only for FS/HDFS. Its value can be  CSV  or  JSON 
  • sql-query: Result of this SQL Query is saved according the above mentioned other parameters

Examples:

To save in FS as CSV:

String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_person)";
Query q = em.createNativeQuery(query, Person.class);
q.executeUpdate();


To save in FS as JSON:

query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_person)";
q = em.createNativeQuery(query, Person.class);
q.executeUpdate();


For more details, you can check this test case and detailed documentation.

Conclusion

Using this feature of Kundera, one can achieve easy querying on CSV/JSON files using the SQL query format. Reading, querying, and writing on/with these files becomes very simple.

Download A Buyer's Guide to Application and Data Integration, your one-stop-shop for research, checklists, and explanations for an application and data integration solution.

Topics:
kundera ,big data ,orm ,apache spark

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}