{{announcement.body}}
{{announcement.title}}

Apache Ignite: RDBMS Integration

DZone 's Guide to

Apache Ignite: RDBMS Integration

In this article, we discuss how to integrate an existing MySQL database and read and write data using Ignite cache.

· Big Data Zone ·
Free Resource

woman-holding-sparkler

Scenario

In the last article, we created a partitioned cache and developed a basic framework of grid caching. In this article, we will integrate with the existing database and read/write data using our cache.

Some applications will already have an existing database, data access mechanisms, and data objects before using grid caching. In this article, we will

  • Use an existing database (MySQL) with single table, Product.
  • Use the product data object class to read and write data.
  • Build configuration and develop code for Ignite to read and write data to the database
  • Develop client code to integrate with Ignite cache. Hence, the client code will have no knowledge of the underlying database.

This kind of scenario might be useful in large applications, where all components are not yet migrated to Ignite or can’t be migrated to ignite due to technical challenges or different strategies for some components of an application.

You may also like: Getting Started With Apache Ignite (Part 1).

Existing Application Details

This section will provide details on the existing application in which we will use grid caching.

Database

The following is the database table structure:

Id

Name

Classification

1

Orange

Fruits

2

Apple

Fruits

3

Grapes

Fruits


CREATE TABLE `product` (
  `id` varchar(100) NOT NULL,
  `name` varchar(100) DEFAULT NULL,
  `classification` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) 


Data Object

The following is the data class of the product table; each instance of this class represents a row in the product table.

/**
 * This is data object class. This will be value in the cache. 
 */
package com.learning.ignite.dataobject;

import java.io.Serializable;

/**
 * @author Chintan
 *
 */
public class Product implements Serializable {

private static final long serialVersionUID = -3506766972673961325L;

private String id;

private String name;

private String classification;

public Product(String id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getClassification() {
return classification;
}

public void setClassification(String classification) {
this.classification = classification;
}

public String getId() {
return id;
}
}


Cache Integration With Existing Database

In this section, we will develop various configuration and code related to Ignite. Then, we will integrate the cache with the RDBMS.

Server Cache Configuration

The following is the configuration of the cache. I have described the details of the configuration after the XML code.

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

<!-- Configuration of Cache -->
<bean id="in-memory-grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<list>
<bean
class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="ProductsCache" />
<property name="cacheMode" value="PARTITIONED" />
<property name="backups" value="1" />
<property name="atomicityMode" value="ATOMIC" />
<property name="writeSynchronizationMode"
value="FULL_SYNC" />
<property name="groupName" value="Group1" />

<property name="readThrough" value="true"></property>
<property name="writeThrough" value="true"></property>

<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder"
factory-method="factoryOf">
<constructor-arg
value="com.learning.ignite.store.jdbc.ProductStore"></constructor-arg>
</bean>
</property>

<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.String"></property>
<property name="valueType"
value="com.learning.ignite.dataobject.Product"></property>
<property name="fields">
<map>
<entry key="id" value="java.lang.String"></entry>
<entry key="name" value="java.lang.String"></entry>
<entry key="classification" value="java.lang.String"></entry>
</map>
</property>
</bean>
</list>
</property>

</bean>
</list>
</property>
</bean>

<bean
class="org.springframework.jdbc.datasource.DriverManagerDataSource"
id="dataSource">
<property name="driverClassName"
value="com.mysql.jdbc.Driver"></property>
<property name="url"
value="jdbc:mysql://localhost:3306/sakila" />
<property name="username" value="user1" />
<property name="password" value="somepassword" />
</bean>

</beans>


  • Database Configuration: Database Configurations are added in the dataSource bean. This will provide details for the database, like database name, user id, password, JDBC URL, etc. This will be used by Ignite to read and write objects in the database. Ignite will use the  ProductStore class (Java code to read/write to/from the database). I have explained ProdsuctStore in detail in the following section.
  • Readthrough and Write through properties: These are two properties of cache. We need to set them to true to ensure that Cache is able to read and write data.
    • <property name="readThrough" value="true"></property>
    • <property name="writeThrough" value="true"></property>
  • Cache Store Factory: This configuration will provide details of the Database Store Java class.
  • Query Entities: This configuration will provide details of various objects stored in cache. This configuration will provide Key and Values.

Product Store

This section will provide details on the store class. Ignite will use this store class to read and write in the database. This store class has various methods, including loadwrite, and writeAll. A few key points:

  • The load method will be invoked when an object is not present in cache, and Ignite needs to load it from the database.
  • The write method will be used to write data to the database.
  • In this case, I have assumed that there will be only inserts, but ideally, we need to modify the written code to find out if the row is already present in database or not and if it's present in the database, then it should be updated.
  • If there is a possibility that the same row can be updated by other non-ignite components directly in the database, then we need to ensure that data only latest record is kept and this code throws an exception back to the client.
package com.learning.ignite.store.jdbc;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.sql.DataSource;

import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.SpringResource;

import com.learning.ignite.dataobject.Product;

public class ProductStore implements CacheStore<String, Product> {

@SpringResource(resourceName = "dataSource")
private DataSource dataSource;

/**
 * This will load the cache object from Cache object
 */
public Product load(String key) throws CacheLoaderException {

System.out.println("Load method is invoked...");
Product product = null;

try {
String sql = "SELECT * FROM PRODUCT WHERE ID = ?";
Connection conn = dataSource.getConnection();
PreparedStatement pStmt = conn.prepareStatement(sql);
pStmt.setInt(1, Integer.parseInt(key));
ResultSet rs = pStmt.executeQuery();

System.out.println("Executed Query.");
while (rs.next()) {
String id = rs.getString(1);
System.out.println("Got result set for " + id);

product = new Product(id);

String name = rs.getString(2);
String classification = rs.getString(3);
product.setName(name);
product.setClassification(classification);
}
} catch (SQLException exp) {
exp.printStackTrace();
} catch (Exception exp) {
exp.printStackTrace();
}

System.out.println("returning result");

return product;
}

public Map<String, Product> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
// TODO Auto-generated method stub
return null;
}

/**
 * This will write rows in tables from Cache
 */
public void write(Entry<? extends String, ? extends Product> entry) throws CacheWriterException {

System.out.println("Enterring in write method. ");
try {
String sql = "INSERT INTO PRODUCT VALUES (?, ?, ?)";
Connection conn = dataSource.getConnection();
PreparedStatement pStmt = conn.prepareStatement(sql);
System.out.println("Opened database connection..");
String id = entry.getKey();
String name = ((Product)entry.getValue()).getName();
String classification = ((Product)entry.getValue()).getClassification();
System.out.println("Got values from passed object.");
pStmt.setString(1, id);
pStmt.setString(2, name);
pStmt.setString(3, classification);
pStmt.execute();
System.out.println("Inserted objects.");

} catch (SQLException exp) {
exp.printStackTrace();
} catch (Exception exp) {
System.out.println("Exception while writing data.");
exp.printStackTrace();
}
}

/**
 * This will write objects in bulk in database. 
 */
public void writeAll(Collection<Entry<? extends String, ? extends Product>> entries) throws CacheWriterException {
try {
String sql = "INSERT INTO PRODUCT VALUES (?, ?, ?)";
Connection conn = dataSource.getConnection();
PreparedStatement pStmt = conn.prepareStatement(sql);
Iterator<Entry<? extends String, ? extends Product>> itr = entries.iterator();

while (itr.hasNext()) {

Product entry = (Product) itr.next();
String id = ((Product) entry).getId();
String name = ((Product) entry).getName();
String classification = ((Product) entry).getClassification();
pStmt.setString(1, id);
pStmt.setString(2, name);
pStmt.setString(3, classification);
pStmt.execute();
}

} catch (SQLException exp) {
exp.printStackTrace();
} catch (Exception exp) {
exp.printStackTrace();
}
}

public void delete(Object key) throws CacheWriterException {
// TODO Auto-generated method stub

}

public void deleteAll(Collection<?> keys) throws CacheWriterException {
// TODO Auto-generated method stub

}

/**
 * This will load all objects from database to cache
 */
public void loadCache(IgniteBiInClosure<String, Product> clo, Object... args) throws CacheLoaderException {
System.out.println("Load method is invoked...");
Product product = null;

try {
String sql = "SELECT * FROM PRODUCT";
Connection conn = dataSource.getConnection();
PreparedStatement pStmt = conn.prepareStatement(sql);
ResultSet rs = pStmt.executeQuery();

while (rs.next()) {
String id = rs.getString(1);
product = new Product(id);

String name = rs.getString(2);
String classification = rs.getString(3);
product.setName(name);
product.setClassification(classification);

clo.apply(String.valueOf(id), product);
}
} catch (SQLException exp) {
exp.printStackTrace();
} catch (Exception exp) {
exp.printStackTrace();
}
}

public void sessionEnd(boolean commit) throws CacheWriterException {
// TODO Auto-generated method stub
}
}


Client Ignite Configuration

The following is XML configuration at the client-side. I have kept client configuration separate as a client may not be part of the same application. The client can be an altogether different application.

<?xml version="1.0" encoding="UTF-8"?>


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

<!-- Configuration of Cache -->
<bean id="client-memory-grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
</bean>

<bean
class="org.springframework.jdbc.datasource.DriverManagerDataSource"
id="dataSource">
<property name="driverClassName"
value="com.mysql.jdbc.Driver"></property>
<property name="url"
value="jdbc:mysql://localhost:3306/sakila" />
<property name="username" value="user" />
<property name="password" value="password" />
</bean>

</beans>


Client Code

This is sample code from the client. This code will access Product Cache and will put some objects and get some objects.

package com.learning.ignite.node;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;

import com.learning.ignite.dataobject.Product;

/**
 * This is Ignite Cache Client. This class will put and get objects from Cache
 * Server.
 * 
 * @author chintan
 *
 */
public class CacheClient {

public static String cacheName = "ProductsCache";

/**
 * @param args
 */
public static void main(String[] args) {

System.out.println("Starting Cache Client");

/*
 * Set the mode of the client. This can also be done by creating XML
 * configuration.
 * 
 * In this case, my server and clients are running on same machine. Hence, I
 * have not provided network discovery related configuration. In most cases, the
 * client node will run on different machine. in this case, we need to pass the
 * network configuration like static IP address or multicast IP address. in
 * general, I prefer to use multicast IP address to make sure that multiple
 * clients & servers are able to connect and there is no dependency on actual IP
 * address. s
 * 
 */
Ignition.setClientMode(true);
Ignite ignite = Ignition.start("F:\\Installed\\apache-ignite-2.7.6-bin\\config\\client-config.xml");

System.out.println("Client Node Started..");

// The Cache name
IgniteCache<String, Product> cache = ignite.cache(CacheClient.cacheName);

System.out.println("Got instance of Cache " + CacheClient.cacheName);

// Store keys in cache (values will end up on different cache nodes).
for (int i = 21; i <= 30; i++) {
Product prdObject = new Product(String.valueOf(i));
prdObject.setName("Product Name = " + i);
prdObject.setClassification("classification" + i);
cache.put(String.valueOf(prdObject.getId()), prdObject);
}

System.out.println("Added Objects in Cache");
for (int i = 0; i < 10; i++) {
System.out.println("trying to get object for " + i);
Product prd = cache.get(String.valueOf(i));
if (prd != null)
System.out.println("Received object from Cache " + prd);
else
System.out.println("Object is null");
}
}
}


Console Output

Database Table

As shown in the table below, the database table has additional rows. These rows are inserted by the client.

id Name Classification
1 Orange Fruits
2 Apple Fruits
21 Product Name = 21 classification21
22 Product Name = 22 classification22
23 Product Name = 23 classification23
24 Product Name = 24 classification24
25 Product Name = 25 classification25
26 Product Name = 26 classification26
27 Product Name = 27 classification27
28 Product Name = 28 classification28
29 Product Name = 29 classification29
3 Grapes Fruits
30 Product Name = 30

classification30


Server Console

As shown in the image below, the server prints various SOPs when cache objects are put or get from cache.

SOPs

SOPs

Client Console

The following is output on client console. This shows that it is able to get objects of ID 1, 2, and 3 from the database. These are existing rows in the database.

RDBMS client

RDBMS client


Further Reading

Topics:
apache ignite ,in-memory caching ,cache ,rdms ,integration ,apach ,mksql ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}