Apache Ignite: RDBMS Integration
Join the DZone community and get the full member experience.
Join For FreeScenario
In the last article, we created a partitioned cache and developed a basic framework of grid caching. In this article, we will integrate with the existing database and read/write data using our cache.
Some applications will already have an existing database, data access mechanisms, and data objects before using grid caching. In this article, we will
- Use an existing database (MySQL) with single table,
Product
. - Use the product data object class to read and write data.
- Build configuration and develop code for Ignite to read and write data to the database
- Develop client code to integrate with Ignite cache. Hence, the client code will have no knowledge of the underlying database.
This kind of scenario might be useful in large applications, where all components are not yet migrated to Ignite or can’t be migrated to ignite due to technical challenges or different strategies for some components of an application.
You may also like: Getting Started With Apache Ignite (Part 1).
Existing Application Details
This section will provide details on the existing application in which we will use grid caching.
Database
The following is the database table structure:
Id |
Name |
Classification |
1 |
Orange |
Fruits |
2 |
Apple |
Fruits |
3 |
Grapes |
Fruits |
CREATE TABLE `product` (
`id` varchar(100) NOT NULL,
`name` varchar(100) DEFAULT NULL,
`classification` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
)
Data Object
The following is the data class of the product table; each instance of this class represents a row in the product table.
/**
* This is data object class. This will be value in the cache.
*/
package com.learning.ignite.dataobject;
import java.io.Serializable;
/**
* @author Chintan
*
*/
public class Product implements Serializable {
private static final long serialVersionUID = -3506766972673961325L;
private String id;
private String name;
private String classification;
public Product(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getClassification() {
return classification;
}
public void setClassification(String classification) {
this.classification = classification;
}
public String getId() {
return id;
}
}
Cache Integration With Existing Database
In this section, we will develop various configuration and code related to Ignite. Then, we will integrate the cache with the RDBMS.
Server Cache Configuration
The following is the configuration of the cache. I have described the details of the configuration after the XML code.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- Configuration of Cache -->
<bean id="in-memory-grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<list>
<bean
class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="ProductsCache" />
<property name="cacheMode" value="PARTITIONED" />
<property name="backups" value="1" />
<property name="atomicityMode" value="ATOMIC" />
<property name="writeSynchronizationMode"
value="FULL_SYNC" />
<property name="groupName" value="Group1" />
<property name="readThrough" value="true"></property>
<property name="writeThrough" value="true"></property>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder"
factory-method="factoryOf">
<constructor-arg
value="com.learning.ignite.store.jdbc.ProductStore"></constructor-arg>
</bean>
</property>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.String"></property>
<property name="valueType"
value="com.learning.ignite.dataobject.Product"></property>
<property name="fields">
<map>
<entry key="id" value="java.lang.String"></entry>
<entry key="name" value="java.lang.String"></entry>
<entry key="classification" value="java.lang.String"></entry>
</map>
</property>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
<bean
class="org.springframework.jdbc.datasource.DriverManagerDataSource"
id="dataSource">
<property name="driverClassName"
value="com.mysql.jdbc.Driver"></property>
<property name="url"
value="jdbc:mysql://localhost:3306/sakila" />
<property name="username" value="user1" />
<property name="password" value="somepassword" />
</bean>
</beans>
- Database Configuration: Database Configurations are added in the
dataSource
bean. This will provide details for the database, like database name, user id, password, JDBC URL, etc. This will be used by Ignite to read and write objects in the database. Ignite will use theProductStore
class (Java code to read/write to/from the database). I have explainedProdsuctStore
in detail in the following section. - Readthrough and Write through properties: These are two properties of cache. We need to set them to true to ensure that Cache is able to read and write data.
- <property name="readThrough" value="true"></property>
- <property name="writeThrough" value="true"></property>
- Cache Store Factory: This configuration will provide details of the
Database Store
Java class. - Query Entities: This configuration will provide details of various objects stored in cache. This configuration will provide Key and Values.
Product Store
This section will provide details on the store class. Ignite will use this store class to read and write in the database. This store class has various methods, including load
, write
, and writeAll
. A few key points:
- The
load
method will be invoked when an object is not present in cache, and Ignite needs to load it from the database. - The
write
method will be used to write data to the database. - In this case, I have assumed that there will be only inserts, but ideally, we need to modify the written code to find out if the row is already present in database or not and if it's present in the database, then it should be updated.
- If there is a possibility that the same row can be updated by other non-ignite components directly in the database, then we need to ensure that data only latest record is kept and this code throws an exception back to the client.
package com.learning.ignite.store.jdbc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.sql.DataSource;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.SpringResource;
import com.learning.ignite.dataobject.Product;
public class ProductStore implements CacheStore<String, Product> {
@SpringResource(resourceName = "dataSource")
private DataSource dataSource;
/**
* This will load the cache object from Cache object
*/
public Product load(String key) throws CacheLoaderException {
System.out.println("Load method is invoked...");
Product product = null;
try {
String sql = "SELECT * FROM PRODUCT WHERE ID = ?";
Connection conn = dataSource.getConnection();
PreparedStatement pStmt = conn.prepareStatement(sql);
pStmt.setInt(1, Integer.parseInt(key));
ResultSet rs = pStmt.executeQuery();
System.out.println("Executed Query.");
while (rs.next()) {
String id = rs.getString(1);
System.out.println("Got result set for " + id);
product = new Product(id);
String name = rs.getString(2);
String classification = rs.getString(3);
product.setName(name);
product.setClassification(classification);
}
} catch (SQLException exp) {
exp.printStackTrace();
} catch (Exception exp) {
exp.printStackTrace();
}
System.out.println("returning result");
return product;
}
public Map<String, Product> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
// TODO Auto-generated method stub
return null;
}
/**
* This will write rows in tables from Cache
*/
public void write(Entry<? extends String, ? extends Product> entry) throws CacheWriterException {
System.out.println("Enterring in write method. ");
try {
String sql = "INSERT INTO PRODUCT VALUES (?, ?, ?)";
Connection conn = dataSource.getConnection();
PreparedStatement pStmt = conn.prepareStatement(sql);
System.out.println("Opened database connection..");
String id = entry.getKey();
String name = ((Product)entry.getValue()).getName();
String classification = ((Product)entry.getValue()).getClassification();
System.out.println("Got values from passed object.");
pStmt.setString(1, id);
pStmt.setString(2, name);
pStmt.setString(3, classification);
pStmt.execute();
System.out.println("Inserted objects.");
} catch (SQLException exp) {
exp.printStackTrace();
} catch (Exception exp) {
System.out.println("Exception while writing data.");
exp.printStackTrace();
}
}
/**
* This will write objects in bulk in database.
*/
public void writeAll(Collection<Entry<? extends String, ? extends Product>> entries) throws CacheWriterException {
try {
String sql = "INSERT INTO PRODUCT VALUES (?, ?, ?)";
Connection conn = dataSource.getConnection();
PreparedStatement pStmt = conn.prepareStatement(sql);
Iterator<Entry<? extends String, ? extends Product>> itr = entries.iterator();
while (itr.hasNext()) {
Product entry = (Product) itr.next();
String id = ((Product) entry).getId();
String name = ((Product) entry).getName();
String classification = ((Product) entry).getClassification();
pStmt.setString(1, id);
pStmt.setString(2, name);
pStmt.setString(3, classification);
pStmt.execute();
}
} catch (SQLException exp) {
exp.printStackTrace();
} catch (Exception exp) {
exp.printStackTrace();
}
}
public void delete(Object key) throws CacheWriterException {
// TODO Auto-generated method stub
}
public void deleteAll(Collection<?> keys) throws CacheWriterException {
// TODO Auto-generated method stub
}
/**
* This will load all objects from database to cache
*/
public void loadCache(IgniteBiInClosure<String, Product> clo, Object... args) throws CacheLoaderException {
System.out.println("Load method is invoked...");
Product product = null;
try {
String sql = "SELECT * FROM PRODUCT";
Connection conn = dataSource.getConnection();
PreparedStatement pStmt = conn.prepareStatement(sql);
ResultSet rs = pStmt.executeQuery();
while (rs.next()) {
String id = rs.getString(1);
product = new Product(id);
String name = rs.getString(2);
String classification = rs.getString(3);
product.setName(name);
product.setClassification(classification);
clo.apply(String.valueOf(id), product);
}
} catch (SQLException exp) {
exp.printStackTrace();
} catch (Exception exp) {
exp.printStackTrace();
}
}
public void sessionEnd(boolean commit) throws CacheWriterException {
// TODO Auto-generated method stub
}
}
Client Ignite Configuration
The following is XML configuration at the client-side. I have kept client configuration separate as a client may not be part of the same application. The client can be an altogether different application.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- Configuration of Cache -->
<bean id="client-memory-grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
</bean>
<bean
class="org.springframework.jdbc.datasource.DriverManagerDataSource"
id="dataSource">
<property name="driverClassName"
value="com.mysql.jdbc.Driver"></property>
<property name="url"
value="jdbc:mysql://localhost:3306/sakila" />
<property name="username" value="user" />
<property name="password" value="password" />
</bean>
</beans>
Client Code
This is sample code from the client. This code will access Product Cache
and will put some objects and get some objects.
package com.learning.ignite.node;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import com.learning.ignite.dataobject.Product;
/**
* This is Ignite Cache Client. This class will put and get objects from Cache
* Server.
*
* @author chintan
*
*/
public class CacheClient {
public static String cacheName = "ProductsCache";
/**
* @param args
*/
public static void main(String[] args) {
System.out.println("Starting Cache Client");
/*
* Set the mode of the client. This can also be done by creating XML
* configuration.
*
* In this case, my server and clients are running on same machine. Hence, I
* have not provided network discovery related configuration. In most cases, the
* client node will run on different machine. in this case, we need to pass the
* network configuration like static IP address or multicast IP address. in
* general, I prefer to use multicast IP address to make sure that multiple
* clients & servers are able to connect and there is no dependency on actual IP
* address. s
*
*/
Ignition.setClientMode(true);
Ignite ignite = Ignition.start("F:\\Installed\\apache-ignite-2.7.6-bin\\config\\client-config.xml");
System.out.println("Client Node Started..");
// The Cache name
IgniteCache<String, Product> cache = ignite.cache(CacheClient.cacheName);
System.out.println("Got instance of Cache " + CacheClient.cacheName);
// Store keys in cache (values will end up on different cache nodes).
for (int i = 21; i <= 30; i++) {
Product prdObject = new Product(String.valueOf(i));
prdObject.setName("Product Name = " + i);
prdObject.setClassification("classification" + i);
cache.put(String.valueOf(prdObject.getId()), prdObject);
}
System.out.println("Added Objects in Cache");
for (int i = 0; i < 10; i++) {
System.out.println("trying to get object for " + i);
Product prd = cache.get(String.valueOf(i));
if (prd != null)
System.out.println("Received object from Cache " + prd);
else
System.out.println("Object is null");
}
}
}
Console Output
Database Table
As shown in the table below, the database table has additional rows. These rows are inserted by the client.
id | Name | Classification |
1 | Orange | Fruits |
2 | Apple | Fruits |
21 | Product Name = 21 | classification21 |
22 | Product Name = 22 | classification22 |
23 | Product Name = 23 | classification23 |
24 | Product Name = 24 | classification24 |
25 | Product Name = 25 | classification25 |
26 | Product Name = 26 | classification26 |
27 | Product Name = 27 | classification27 |
28 | Product Name = 28 | classification28 |
29 | Product Name = 29 | classification29 |
3 | Grapes | Fruits |
30 | Product Name = 30 | classification30 |
Server Console
As shown in the image below, the server prints various SOPs when cache objects are put or get from cache.
Client Console
The following is output on client console. This shows that it is able to get objects of ID 1, 2, and 3 from the database. These are existing rows in the database.
Further Reading
Opinions expressed by DZone contributors are their own.
Comments