Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Kundera, an Object Mapper for Big Flat Files (CSV/JSON)

DZone's Guide to

Kundera, an Object Mapper for Big Flat Files (CSV/JSON)

Dive into Kundera, a tool that aims to make working with NoSQL as simple as possible.

· Integration Zone ·
Free Resource

Continue to drive demand for API management solutions that address the entire API life cycle and bridge the gap to microservices adoption.  Brought you to you in partnership with CA Technologies.

Kundera is a "Polyglot Object Mapper" with a JPA interface. Kundera currently supports Cassandra, MongoDB, HBase, Redis, OracleNoSQL, Neo4j, CouchDB, Kudu, Relational databases, and Apache Spark.

New folks? Here is how you can get Started in 5 minutes.

Who Should Read This:

There are many articles and docs for using Kundera with NoSQL datastores. This one is for those who are dealing with CSV/JSON (structured) files for storing data and would want to use a JPA interface for querying on the files.

Why Choose Kundera:

In addition to the JPA interface, one can take advantage of Kundera's Polyglot Persistence. This feature is supported by using Aapache Spark under the hood.

How to Use:

Spark Core Module

This module includes support for HDFS and FS (CSV & JSON). Users can read/write data from/to a file at HDFS or in the file system as CSV or JSON. SQL queries can also be performed over data.

To use it, add the following dependency in pom.xml.

<dependency>
    <groupId>com.impetus.kundera.client</groupId>
    <artifactId>kundera-spark</artifactId>
    <version>${kundera.version}</version>
</dependency>


Persistence Unit Configuration

<persistence-unit name="spark_hdfs_pu">
    <provider>com.impetus.kundera.KunderaPersistence</provider>
    <properties>
        <property name="kundera.nodes" value="localhost" />
        <property name="kundera.port" value="7077" />
        <property name="kundera.keyspace" value="sparktest" />
        <property name="kundera.dialect" value="spark" />
        <property name="kundera.client" value="hdfs" />
        <property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
        <property name="kundera.client.property" value="KunderaSparkTest.xml" />
    </properties>
</persistence-unit>


Spark Related Properties

Spark Related Properties are configured using an XML file. This file is given to Kundera using  kundera.client.property through  persistence.xml.

Sample Property File:

<?xml version="1.0" encoding="UTF-8"?>
<clientProperties>
   <datastores>
      <dataStore>
         <name>hdfs</name>
         <connection>
            <properties>
               <property name="spark.master" value="local" />
               <property name="spark.app.name" value="sparkhdfs" />
               <property name="spark.executor.memory" value="1g" />
               <property name="spark.driver.allowMultipleContexts" value="true" />
            </properties>
         </connection>
      </dataStore>
   </datastores>
</clientProperties>


Here "spark.master" and "spark.app.name" properties are mandatory. We can add more spark related properties as per the requirement.

Entity

@Entity
@Table(name = "spark_person")
public class Person implements Serializable
{

    /** The Constant serialVersionUID. */
    private static final long serialVersionUID = 1L;

    /** The person id. */
    @Id
    private String personId;

    /** The person name. */
    private String personName;

    /** The age. */
    private int age;

    /** The salary. */
    private Double salary;

   // setters and getters. 
}


Basic Configuration

We can set the path and format of data for FS/HDFS file wherever we want to save, read or write data. These parameters can be set using entity manager level properties as shown below:

For HDFS:

em.setProperty("kundera.hdfs.inputfile.path", "hdfs://localhost:9000/sparkInputTest/input");
em.setProperty("kundera.hdfs.outputfile.path", "hdfs://localhost:9000/sparkOutputTest/output");


For FS:

em.setProperty("kundera.fs.inputfile.path", "src/test/resources/csv_input/")
em.setProperty("kundera.fs.outputfile.path", "src/test/resources/csv_output/")


Format:

em.setProperty("format", "json");


Note: currently CSV & JSON formats are supported.

Read-Write Operation

EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_hdfs_pu");
EntityManager em = emf.createEntityManager();
Person person = new Person();
person.setAge(23);
person.setPersonId("1");
person.setPersonName("Dev");
person.setSalary(100000.0);

// save data 
em.persist(person);

em.clear();

Person peronFound = em.find(Person.class, "1"); 

em.close();
emf.close();


Query Operation

Select all:

String query = "select * from spark_person"; 
List results = em.createNativeQuery(query).getResultList();


Select with WHERE:

String query = "select * from spark_person where salary > 35000";
List results = em.createNativeQuery(query).getResultList();


Select with LIKE:

String query = "select * from spark_person where personName like 'De%'";
List results = em.createNativeQuery(query).getResultList();


Sum (Aggregation):

String query = "select sum(salary) from spark_person";
List results = em.createNativeQuery(query).getResultList();


Saving Data After Querying

We can save the results of the query in HDFS/FS or Cassandra as CSV or JSON.

General Format:

INSERT INTO <source>.<path-to-table/file> [AS <file-type>] FROM <sql-query>

  • source: Its value can be FS or HDFS and if we want to save in database, its value is cassandra (currently Cassandra is supported)
  • path-to-table/file: For FS/HDFS, its path to directory and for database, its dbname.tablename
  • file-type: It is required only for FS/HDFS. Its value can be  CSV  or  JSON 
  • sql-query: Result of this SQL Query is saved according the above mentioned other parameters

Examples:

To save in FS as CSV:

String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_person)";
Query q = em.createNativeQuery(query, Person.class);
q.executeUpdate();


To save in FS as JSON:

query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_person)";
q = em.createNativeQuery(query, Person.class);
q.executeUpdate();


For more details, you can check this test case and detailed documentation.

Conclusion

Using this feature of Kundera, one can achieve easy querying on CSV/JSON files using the SQL query format. Reading, querying, and writing on/with these files becomes very simple.

Discover how organizations are modernizing their application architectures for speed and agility from the growing API economy.  Brought to you in partnership with CA Technologies.

Topics:
kundera ,big data ,orm ,apache spark

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}