Mule Caching Strategy With Redis Cache
You can customize a Mule caching strategy so that the Mule cache scope actions can operate on the Redis data store. You can also try other third-party caching frameworks.
Join the DZone community and get the full member experience.
Join For FreeThe objective of this tutorial is to demonstrate the implementation of Mule caching strategy with REDIS cache using Spring Data Redis module. The Mule caching strategy is associated with Mule Cache scope and it is used to define the actions a cache scope takes when a message enters its subflow. In this tutorial, we will be using a simple use case to show the steps required to cache the query results of an Oracle database table into Redis cache using Spring Data Redis module.
What Is REDIS?
REDIS stands for Remote Dictionary Server. It is an open-source key-value data store and is often referred to as a data structure server because keys contain strings, hashes, lists, sets, and sorted sets. REDIS can also be used as a cache and message broker. REDIS can be installed on various platforms such as Linux, and Windows. We will be using the windows version of REDIS, which can be downloaded from GitHub here. You can download the pre-built binaries available in 32-bit and 64-bit from the repository and install it accordingly.
REDIS Command Line Interface
The REDIS installation folder contains a programed called redis-cli.exe that allows us to send commands to REDIS Server and read the replies send by the server directly from the terminal. The below figure depicts the REDIS terminal.
The default port where REDIS server listens is 6379, which is shown in the above figure. We will use the REDIS terminal to verify whether the custom Mule Caching Strategy is correctly working with the REDIS server.
Prerequisites
- Anypoint Studio 6+ with Mule ESB 3.8 runtime.
- JDK 1.8.
- Maven 3.3.9.
- Oracle 11.2g XE.
- Redis 3.2 on Windows.
- Spring Data Redis Module 1.7.1.
- Jedis 2.6.1.
Creating a Mavenized Mule Project
Click File > New Mule Project and name your project (in the tutorial, it was called redis-cache-example) and check the option Use Maven.
Add Maven Dependencies
Add the following Maven dependencies in pom.xml file:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1,7.1.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.12</version>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>com.hynnet</groupId>
<artifactId>oracle-driver-ojdbc6</artifactId>
<version>12.1.0.1</version>
</dependency>
We need to define a model for this application so that we can map the result of SQL query with the model and store this model into the Redis object store via customized Mule caching strategy. This model should be defined in src/main/java folder.
Define the Model (POJO)
package com.examples.bean;
import java.io.Serializable;
public class Person implements Serializable {
private static final long serialVersionUID = -8243145429438016231L;
private int id;
private String fname;
private String lname;
private int deptid;
private float salary;
public Person() {
}
public int getId() {
return id;
}
public String getFname() {
return fname;
}
public void setFname(String fname) {
this.fname = fname;
}
public String getLname() {
return lname;
}
public void setLname(String lname) {
this.lname = lname;
}
public int getDeptid() {
return deptid;
}
public void setDeptid(int deptid) {
this.deptid = deptid;
}
public float getSalary() {
return salary;
}
public void setSalary(float salary) {
this.salary = salary;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + fname + " " + lname + ", deptid=" + deptid + ", salary=" + salary + "]";
}
}
Bean Configuration
We need bean configurations for connecting Redis server using Jedis driver (JedisConnectionFactory) to carry out the cache actions such as read, write, and delete on the Redis server using RedisTemplate and to manage the cache using RedisCacheManager. Remember, RedisTemplate depends on JedisConnectionFactory and RedisCacheManager depends on RedisTemplate respectively.
JedisConnectionFactory Configuration
<spring:bean id="jedisConnFactory" name="jedisConnFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
</spring:bean>
RedisTemplate Configuration
<spring:bean id="redisTemplate" name="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate" lazy-init="false">
<spring:property name="connectionFactory" ref="jedisConnFactory" />
<spring:property name="valueSerializer">
<spring:bean
class="org.springframework.data.redis.serializer.JacksonJsonRedisSerializer">
<spring:constructor-arg type="java.lang.Class"
value="java.lang.Object" />
</spring:bean>
</spring:property>
</spring:bean>
RedisCacheManager Configuration
<spring:bean id="cacheManager" name="cacheManager"
class="org.springframework.data.redis.cache.RedisCacheManager">
<spring:constructor-arg ref="redisTemplate" />
</spring:bean>
Caching Strategy Configuration
We need to customize the Mule caching strategy so that the cache actions can be applied in a custom object store such as Redis instead of InMemoryObjectStore. Follow these steps to customize the Mule caching strategy.
1. Create a Java Class in src/main/java Folder
This Java class must implement the org.mule.api.store.ObjectStore<T extends Serializable> interface. In the tutorial, the Java class was named RedisObjectStore.java. The Java class must implement the following methods of ObjectStore interface:
Modifier and Type |
Method and Description |
void |
clear () Removes all items in this store without disposing of them, meaning that after performing a clear(), you should still be able to perform other operations. |
boolean |
contains (Serializable key) Check whether the given Object is already registered with this store. |
boolean |
isPersistent () Is this store persistent? |
T |
remove (Serializable key) Remove the object with the key. |
T |
retrieve (Serializable key) Retrieve the given Object. |
void |
store (Serializable key, T value) Store the given Object. |
package com.examples.cache;
import java.io.Serializable;
import java.util.List;
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.context.MuleContextAware;
import org.mule.api.registry.RegistrationException;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.cache.RedisCache;
import org.springframework.data.redis.cache.RedisCacheManager;
public class RedisObjectStore<T extends Serializable> implements ObjectStore<T>, MuleContextAware {
@Autowired
private RedisCacheManager cacheManager;
private org.springframework.data.redis.cache.RedisCache cache;
private MuleContext context;
private DefaultMuleMessage message;
@Autowired
public void setCache() {
this.cache = (RedisCache) this.cacheManager.getCache("PERSON");
}
public RedisCache getCache() {
return this.cache;
}
@Override
public synchronized boolean contains(Serializable key) throws ObjectStoreException {
System.out.println("Inside contains Method");
if (cache.get(key.toString(), Object.class) == null)
return false;
else
return true;
}
@Override
public synchronized void store(Serializable key, T value) throws ObjectStoreException {
System.out.println("Inside Store Method");
MuleEvent event = (MuleEvent) value;
@SuppressWarnings("unchecked")
List<Object> person = (List<Object>) event.getMessage().getPayload();
cache.put(key.toString(), person);
}
@SuppressWarnings("unchecked")
@Override
public synchronized T retrieve(Serializable key) throws ObjectStoreException {
System.out.println("Inside Retrieve Method");
List<Object> person = (List<Object>) cache.get(key.toString(), Object.class);
DefaultMuleEvent event = null;
String operation = null;
operation = context.getRegistry().get("operation");
if (operation.equalsIgnoreCase("store")) {
return null;
}
else if (person == null && operation.equalsIgnoreCase("remove")) {
return null;
} else if (person == null && operation.equalsIgnoreCase("retrieve")) {
message = new DefaultMuleMessage("Key " + key.toString() + " not found", context);
FlowConstruct flow = context.getRegistry().lookupFlowConstruct("cacheFlow");
event = new DefaultMuleEvent(message, org.mule.MessageExchangePattern.ONE_WAY, flow);
} else {
message = new DefaultMuleMessage(person, context);
FlowConstruct flow = context.getRegistry().lookupFlowConstruct("cacheFlow");
event = new DefaultMuleEvent(message, org.mule.MessageExchangePattern.ONE_WAY, flow);
}
return (T) event;
}
public synchronized T remove(Serializable key) throws ObjectStoreException {
T value = retrieve(key);
if (value != null) {
cache.evict(key);
try {
context.getRegistry().registerObject("evict", "Key " + key.toString() + " evicted from cache");
} catch (RegistrationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
try {
context.getRegistry().registerObject("evict", "Key " + key.toString() + " not found");
} catch (RegistrationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return value;
}
@Override
public boolean isPersistent() {
return true;
}
@Override
public synchronized void clear() throws ObjectStoreException {
System.out.println("Inside clear method");
}
@Override
public void setMuleContext(MuleContext context) {
this.context = context;
}
}
2. Create Global Elements Caching Strategy Configuration
- Select the Global Elements tab and click the Create button.
- Expand the Caching Strategies Global Type, select Caching Strategy, and click OK.
- Type name for the Caching Strategy. In this tutorial, it was named Caching_Strategy.
- Click the + sign associated with Object Store Reference and then select core:custom-object-store and click Next.
- Type the Java class name com.examples.cache.RedisObjectStore in the Class: textbox area and click Finish.
- Select the Key Expression option under the Event Key section and then type the MEL #[message.inboundProperties.'http.query.params'.key] and click OK to complete the the caching strategy configuration.
- The following XML code is generated after completing the above steps:
<ee:object-store-caching-strategy
name="Caching_Strategy" doc:name="Caching Strategy"
keyGenerationExpression="#[message.inboundProperties.'http.
query.params'.key]" >
<custom-object-store
class="com.examples.cache.RedisObjectStore"/>
</ee:object-store-caching-strategy>
HTTP listener configuration:
<http:listener-config name="HTTP_Listener_Configuration"
host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration" />
Oracle configuration:
<db:oracle-config name="Oracle_Configuration" host="localhost"
port="1521" instance="XE" user="hr" password="hr"
doc:name="Oracle Configuration" />
Creating Flows for redis-cache-example
The application is factored into three flows, as described below:
redisCacheExampleFlow
This flow provides the main access point to initiate the application so that the users can send their requests for cache actions (store, retrieve, remove) through the HTTP endpoint listening on port 8081. The HTTP requests can be done with any one of the following URLs at a time.
- http://localhost:8081/redis?operation=store&key=deptno.
- http://localhost:8081/redis?operation=retrieve&key=deptno.
- http://localhost:8081/redis?operation=remove&key=deptno.
The HTTP request is comprised of two query parameters such as operation and key where the first denotes the cache action (store, retrieve, remove) and the second denotes the key (employee’s department number) for the object to be stored in the cache.
Once the HTTP request is consumed by the HTTP inbound endpoint, the Expression component stores the HTTP query parameters in Mule registry for later usage. Next, the flow reference component refers to processFlow flow for further message processing. Once the processFlow completes its process, the final payload is set using the Set Payload transformer and returned to the user’s browser.
redisCacheExampleFlow XML code:
<flow name="redisCacheExampleFlow">
<http:listener config-ref="HTTP_Listener_Configuration"
path="/redis" doc:name="HTTP">
<http:response-builder>
<http:header headerName="Content-Type" value="text/plain" />
</http:response-builder>
</http:listener>
<expression-component doc:name="Store cache actions in registry">
<![CDATA[app.registry.put('operation',message.inboundProperties.'http.query.params'.operation)]]>
</expression-component>
<flow-ref name="processFlow" doc:name="processFlow" />
<set-payload value="#[payload]" doc:name="Set Payload" />
</flow>
processFlow
The processFlow flow is used to route the request to the appropriate processor chain associated with the Choice router to trigger the cache actions such as store, retrieve, or remove.
The Choice router routes to Processor Chain: Store/Retrieve to/from cache when the query parameter operation has a value either store or retrieve. In this case, the cacheFlow flow is triggered via a flow reference and on completion of the cacheFlow flow the inner Choice router routes to the Transform Message component based on the payload type, i.e. List.
List holds the output of the SQL query fired on the employee table. This List is transformed into a POJO (com.examples.bean.Person) and subsequently, the POJO is transformed into String with the Object to String transformer and returned back to the parent flow redisCacheExampleFlow. The default processor is considered when the payload type is String.
The Choice router routes to the processor chain Processor Chain: Remove key from cache when the query parameter operation has a value remove. In this case, the invalidate-key message processor is triggered first which in turn removes the object from the cache with the specified key by invoking the remove method of the com.examples.cache.RedisObjectStore Java class and subsequently triggers the Expression component, which in turn sets the payload with the value of the evict key set by the remove method and finally removes the evict key from the Mule registry and returns back to the parent flow.
The Choice router routes to the default processor chain when the query parameter operation contains an invalid cache action. In this case, the Set Payload transformer sets the payload with Invalid Operation message and returns back to the parent flow.
processFlow XML code:
<flow name="processFlow">
<choice doc:name="Choice">
<when
expression="#[message.inboundProperties.'http.query.params'.operation == 'retrieve' || message.inboundProperties.'http.query.params'.operation == 'store']">
<processor-chain doc:name="Processor Chain: Store/Retrieve to/from cache">
<flow-ref name="cacheFlow" doc:name="cacheFlow" />
<choice doc:name="Choice">
<when expression="#[payload is List]">
<dw:transform-message metadata:id="0a00cbc3-684f-40d9-9301-2c7c24922ec8"
doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
payload map ((payload01 , indexOfPayload01) -> {
deptid: payload01.DEPARTMENT_ID as :number,
fname: payload01.FIRST_NAME,
id: payload01.EMPLOYEE_ID as :number,
lname: payload01.LAST_NAME,
salary: payload01.SALARY as :number
} as :object {class: "com.examples.bean.Person"} )]]></dw:set-payload>
</dw:transform-message>
<object-to-string-transformer
doc:name="Object to String" />
</when>
<otherwise>
<set-payload value="#[payload]" doc:name="Set Payload" />
</otherwise>
</choice>
</processor-chain>
</when>
<when
expression="#[message.inboundProperties.'http.query.params'.operation == 'remove']">
<processor-chain doc:name="Processor Chain: Remove key from cache">
<ee:invalidate-key doc:name="invalidate-key"
cachingStrategy-ref="Caching_Strategy"
keyGenerationExpression="#[message.inboundProperties.'http.query.params'.key]"></ee:invalidate-key>
<expression-component doc:name="Expression"><![CDATA[payload=app.registry.get("evict");
app.registry.remove("evict");]]></expression-component>
</processor-chain>
</when>
<otherwise>
<set-payload value="#['Invalid Operation']" doc:name="Set Payload" />
</otherwise>
</choice>
</flow>
cacheFlow
The cacheFlow flow is used to carry out the Mule cache actions with the help of Mule Cache Scope and sets the cache result with the Set Payload transformer. The Mule Cache Scope will be using the customized caching strategy instead of default caching strategy. Follow the steps to configure the Mule Cache Scope with customized caching strategy:
- Drag Cache Scope from the palette into the process area of cacheFlow flow.
- Provide the display name as Cache-RedisCache.
- Select the Reference to a strategy option and select Caching_Strategy from the drop-down.
- Save the configuration.
The cache scope is encapsulated with a database connector and a logger component. The database connector establishes a connection with the Oracle database and executes SQL query on the HR.EMPLOYEES table. The resultset is processed as per the cache action and the caching strategy and subsequently, the resultset is logged with the logger component. The cache scope is followed by the Set Payload transformer to set the cache result as the payload. The database connector must be configured according to the details depicted in the screenshot below:
cacheFlow XML code:
<flow name="cacheFlow">
<ee:cache cachingStrategy-ref="Caching_Strategy" doc:name="Cache-RedisCache">
<db:select config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from employees where DEPARTMENT_ID = #[ message.inboundProperties.'http.query.params'.key]]]></db:parameterized-query>
</db:select>
<logger message="Payload: #[payload.toString()]" level="INFO"
doc:name="Logger" />
</ee:cache>
<set-payload value="#[message.payload]" doc:name="Set Payload" />
</flow>
Run and Test the Application
- Start the Redis server/service.
- Start Oracle database server and listener service.
- Run the Mule application redis-cache-example. Click Run As > Mule Application with Maven.
- Provide the HTTP request with the URL http://localhost:8081/redis?operation=store&key=90 to trigger the cache store action.
- The previous HTTP request should store the employee details belonging to the department code 90 into the Redis data store and produces the same as HTTP response. The following screenshots prove the same.
- Provide the HTTP request with the URL http://localhost:8081/redis?operation=retrieve&key=90 to trigger the cache retrieve action.
The previous HTTP request should retrieve the employee details belonging to the department code 90 and produces the same as HTTP response. The following screen shot proves the same.
- Provide the HTTP request with the URL http://localhost:8081/redis?operation=remove&key=90 to trigger the cache remove action.
- The previous HTTP request should remove the employee details belonging to the department code 90 and produces acknowledgment message as HTTP response. The following screenshot proves the same.
- Let’s cross-verify whether the cache removal was successful by retrieving the employee details again with department code 90 by issuing the HTTP request http://localhost:8081/redis?operation=retrieve&key=90.
Let’s verify the same with Redis client command line. The below screenshot proves that the employee detail with department code 90 has been removed from Redis data store.
Conclusion
In this tutorial, I have tried to explain how you can customize the Mule caching strategy so that the Mule cache scope actions can be operated on the Redis data store. Similarly, you can try other third-party caching frameworks with Mule cache scope.
Opinions expressed by DZone contributors are their own.
Comments