Refreshing Mule Cache Using Oracle Database Change Notification
How to use Mule Cache refresh using Oracle's Database Change Notification software.
Join the DZone community and get the full member experience.
Join For FreeObjective
This technical article is all about Mule Cache refresh using Oracle Database Change Notification. The audience of Mule ESB can also understand how we can designate the open source Cache framework Ehcache as the data store for the Mule Cache.
Prerequisites
To run the Mule application, you need to have the following software to be installed on your machine.
- Oracle XE 11.2g: http://www.oracle.com/technetwork/database/database-technologies/express-edition/downloads/index.html
- Java SE 8: http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
- Mule Anypoint Studio 5.4 with 3.7.3 Runtime: https://www.mulesoft.com/lp/dl/studio
- Mule ESB 3.7.3 (Optional): https://developer.mulesoft.com/download-mule-esb-runtime
- Ehcache 3.0 Milestone 3: https://github.com/ehcache/ehcache3/releases/download/v3.0.0.m3/ehcache-3.0.0.m3.jar
- Mule Application Properties (mule-app.properties): Use the following properties or define them as per your system.
db.host=localhost
db.port=1521
db.instance=XE
db.user=hr
db.password=hr
http.host=localhost
http.port=8081
tcp.host=localhost
tcp.port=60505
Database Change Notification Concept
As you can understand from the title of this article, the main objective of this mule application is to designate Mule Cache as a middle-tier data cache and how this data cache can be refreshed using Oracle’s Database Change Notification feature to avoid redundant queries to the database. Also, you can understand how the open source cache framework Ehcache can be used as the data store for the caching purpose.
Let us first focus on the concept of Oracle’s Database Change Notification. Starting from 11g Release 1 (11.1), Oracle JDBC drivers provide support for the Database Change Notification feature of Oracle Database. Using this functionality of the JDBC drivers, multitier systems can take advantage of the Database Change Notification feature to maintain a data cache as up-to-date as possible, by receiving invalidation events from the JDBC drivers. The following diagram depicts a high level Database Change Notification process.
How to Code Database Change Notification Client?
Now, let us see the steps to be considered in order to process database change notification and code the client.
- You need to grant the CHANGE NOTIFICATION privilege to the oracle database user.
GRANT CHANGE NOTIFICATION TO <user-name>
- Registration: You need to create a registration of SQL queries with the JDBC drivers for Database Change Notification.
- Establish Oracle Database connection
final String USERNAME = "hr";
final String PASSWORD = "hr";
final String URL = "jdbc:oracle:thin:@localhost:1521:XE";
final oracle.jdbc.OracleDriver dr = new
oracle.jdbcOracleDriver();
final java.util.Properties prop = new java.util.Properties();
prop.setProperty("user", USERNAME);
prop.setProperty("password", PASSWORD);
oracle.jdbc.OracleConnection conn = dr.connect(URL, prop);
- Set the registration options in a Properties object
final java.util.Properties prop = new java.util.Properties();
prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION,"true");
oracle.jdbc.dcn.DatabaseChangeRegistration dcr =
conn.registerDatabaseChangeNotification(prop);
Associating the query with the registration: After you have created a registration, you have to associate a query with it which is a one-time process and is done outside of the currently used registration. The query will be associated even if the local transaction is rolled back.
final Statement stmt = conn.createStatement();
((oracle.jdbc.OracleStatement)
stmt).setDatabaseChangeRegistration(dcr);
final ResultSet rs = stmt
.executeQuery("select EMPLOYEE_ID,
FIRST_NAME,LAST_NAME, SALARY, DEPARTMENT_ID from employees");
while (rs.next()) {
}
Notification or Notifying Database Change Events: In order to receive the database change events or notifications, attach a listener to the registration. When a database change event occurs, the database server notifies the JDBC driver and subsequently the driver then constructs a new Java event, identifies the registration to be notified, and notifies the listeners attached to the registration. The event contains the object ID of the database object that has changed and the type of operation that caused the change. Depending on the registration options (in this case, the options are OracleConnection.DCN_NOTIFY_ROWIDS and OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION), the event may also contain row-level detail information. The listener code can then use the event to make decisions about the data cache.
Define the listener class: The listener class must implement oracle.jdbc.dcn. DatabaseChangeListener interface and must override the method public void onDatabaseChangeNotification (oracle.jdbc.dcn.DatabaseChangeEvent e).
class DatabaseChangeNotificationListener implements DatabaseChangeListener {
//declarations
public void onDatabaseChangeNotification
(DatabaseChangeEvent e) {
//To-do code
}
}
- Attach the listener to the registration: You can attach the listener using the addListener method.
- Deleting a Registration: You need to explicitly unregister a registration to delete it from the server and release the resources in the driver. You can unregister a registration using a connection different from one that was used for creating it. To unregister a registration, you can use the unregisterDatabaseChangeNotification method defined in oracle.jdbc.OracleConnection.
You must pass the DatabaseChangeRegistration object as a parameter to this method. This method deletes the registration from the server and the driver and closes the listener socket.
So far you have learnt the JDBC API related stuff to write the listener or client. Remember, the listener is a standalone Java application which also needs to communicate with the Mule TCP connector usingorg.mule.module.client.MuleClient to send the database change notification events as the payload. Here, the payload is consisting of ROWIDs which were associated with the SQL query change event as a result of the DML like SQL INSERT, SQL UPDATE, and SQL DELETE. In this case, the database object associated with the registered SQL query is HR.EMPLOYEES as depicted in point number 3 above. The complete listener code is shown below.
package com.db.dcnclient;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.mule.api.MuleContext;
import org.mule.module.client.MuleClient;
import org.mule.DefaultMuleMessage;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleDriver;
import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.DatabaseChangeEvent;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.DatabaseChangeRegistration;
import oracle.jdbc.dcn.QueryChangeDescription;
import oracle.jdbc.dcn.RowChangeDescription;
import oracle.jdbc.dcn.RowChangeDescription.RowOperation;
import oracle.jdbc.dcn.TableChangeDescription;
public class DatabaseChangeNotificationClient {
public MuleContext context;
public DefaultMuleMessage message;
public String payload;
DatabaseChangeRegistration dcr;
OracleConnection conn;
public static void main(String[] args) throws SQLException {
DatabaseChangeNotificationClient dcn = newDatabaseChangeNotificationClient();
try {
dcn.run();
} catch (final SQLException mainSQLException) {
mainSQLException.printStackTrace();
} catch (java.lang.RuntimeException e) {
if (dcn.conn != null)
dcn.conn.unregisterDatabaseChangeNotification(dcn.dcr);
throw e;
}
}
void run() throws SQLException {
conn = connect();
final Properties prop = new Properties();
prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true");
dcr = conn.registerDatabaseChangeNotification(prop);
try {
final DatabaseChangeNotificationListener list = newDatabaseChangeNotificationListener(
this);
dcr.addListener(list);
final Statement stmt = conn.createStatement();
((OracleStatement) stmt).setDatabaseChangeRegistration(dcr);
final ResultSet rs = stmt
.executeQuery("select EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from employees");
while (rs.next()) {
}
final String[] tableNames = dcr.getTables();
for (int i = 0; i < tableNames.length; i++)
System.out.println(tableNames[i]
+ " is part of the registration.");
rs.close();
stmt.close();
} catch (final SQLException ex) {
if (conn != null)
conn.unregisterDatabaseChangeNotification(dcr);
throw ex;
} finally {
try {
conn.close();
} catch (final Exception innerex) {
innerex.printStackTrace();
}
}
}
/**
* Creates a connection the database.
*/
public OracleConnection connect() throws SQLException {
final String USERNAME = "hr";
final String PASSWORD = "hr";
final String URL = "jdbc:oracle:thin:@localhost:1521:XE";
final OracleDriver dr = new OracleDriver();
final Properties prop = new Properties();
prop.setProperty("user", USERNAME);
prop.setProperty("password", PASSWORD);
return (OracleConnection) dr.connect(URL, prop);
}
protected void finalize() throws SQLException {
if (conn != null)
conn.unregisterDatabaseChangeNotification(dcr);
}
}
/**
* DCN listener
*/
class DatabaseChangeNotificationListener implements DatabaseChangeListener {
DatabaseChangeNotificationClient dcn;
DatabaseChangeNotificationListener(
final DatabaseChangeNotificationClient dem) {
dcn = dem;
}
public void onDatabaseChangeNotification(final DatabaseChangeEvent e) {
MuleClient client = null;
if (e.getEventType() == DatabaseChangeEvent.EventType.QUERYCHANGE
&& e.getRegId() == dcn.dcr.getRegId()) {
try {
final QueryChangeDescription[] q = e
.getQueryChangeDescription();
TableChangeDescription[] t = null;
RowChangeDescription[] r = null;
StringBuffer buf = null;
buf = new StringBuffer();
char oper=0;
for (int i = 0; i < q.length; i++) {
t = q[i].getTableChangeDescription();
for (int j = 0; j < t.length; j++) {
r = t[j].getRowChangeDescription();
for (int k = 0; k < r.length; k++) {
if (r[k].getRowOperation() == RowOperation.INSERT) {
oper = 'I';
} else if (r[k].getRowOperation() == RowOperation.UPDATE) {
oper = 'U';
} else {
oper = 'D';
}
final String rowId = r[k].getRowid().stringValue();
buf.append(oper + rowId.concat(","));
}
}
}
client = new MuleClient(true);
dcn.payload = buf.substring(0, buf.lastIndexOf(","));
dcn.message = new DefaultMuleMessage("",
client.getMuleContext());
dcn.message.setPayload(dcn.payload);
client.dispatch("tcp://localhost:60505", dcn.message, null);
client.dispose();
} catch (final Exception e1) {
e1.printStackTrace();
}
}
}
}
Note: Create a Java Project as DCNClient in Anypoint Studio and include com.db.dcnclient. DatabaseChangeNotificationClient.java class, as defined above. Include Mule Server 3.7.3 EE libraries and ojdbc6.jar in Java Build Path of the project. Run this class as a Java Application.
DatabaseChangeNotification Mule Application
Now that you have learnt the fundamentals of Oracle Database Change Notification and how to handle the database change events with the listener, let us proceed to the Mule application that is responsible for refreshing the cache with regards to the database change notifications. The Mule application is associated with the following flows and sub flows: (Note: Include ojdbc6.jar and ehcache-3.0.0.m3.jar in your mule project).
DatabaseChangeNotificationFlow
This flow receives the comma separated ROWIDs of the database table (HR.EMPLOYEES) as a result of SQL INSERT, SQL UPDATE, or SQL DELETE through the TCP inbound endpoint and refreshes the cache accordingly. The listener communicates with the TCP endpoint and sends the comma separated ROWIDs for further processing.
Flow Elements:
Serial No. | Element Type | Display Name | Configurations | ||
1 | TCP Connector | DCN Receiver | Exchange Patterns: One Way Host: {tcp.host} Port: {tcp.port} Global Elements Name: TCP | ||
2 | Set Payload | Set Payload | Value: #[message.payload] | ||
3 | Splitter | Splitter | Expression: #[StringUtils.split(payload, ',')] Global Configurations: <configuration doc:name="Configuration"> <expression-language autoResolveVariables="true"> <import class="org.mule.util.StringUtils" /> </expression-language> </configuration> | ||
4 | Variable | Insert Update Flag | Name: flag Value: #[payload.charAt(0)] | ||
5 | Variable | Rowid | Name: rowed Value: #[payload.substring(1)] | ||
6 | Choice | Choice | When | Route Message To | |
#[flowVars.flag == "I"] | Async-Insert Cache | ||||
#[flowVars.flag == "U"] | Async_Update Cache | ||||
#[flowVars.flag == "D"] | Delete Cache Processor Chain | ||||
7 | Async | Async-Insert Cache | Wrapped Element | Configurations | |
Flow Reference | Flow name: Insert_Update_Cache_Sub_Flow | ||||
8 | Async | Async-Update Cache | Wrapped Element | Configurations | |
Flow Reference | Flow name: Insert_Update_Cache_Sub_Flow | ||||
9 | Flow Reference | Insert_Update_Cache_Sub_Flow | Flow name: Insert_Update_Cache_Sub_Flow | ||
10 | Processor Chain | Delete Cache | Wrapped Element | Configurations | |
Expression | Expression: #[app.registry.Caching_Strategy.getStore().remove(flowVars.rowid)] | ||||
Logger | Message: Employee record with ROWID #[flowVars.rowid] is removed from cache |
Flow XML:
<flow name="DatabaseChangeNotificationFlow">
<tcp:inbound-endpoint exchange-pattern="one-way"
host="${tcp.host}" port="${tcp.port}" connector-ref="TCP"
responseTimeout="10000" transformer-refs="Object_to_String"doc:name="DCN Receiver" />
<set-payload value="#[message.payload]" doc:name="Set Payload" />
<splitter expression="#[StringUtils.split(payload, ',')]"
doc:name="Splitter" />
<set-variable variableName="flag" value="#[payload.charAt(0)]"
doc:name="Insert Update Flag" />
<set-variable variableName="rowid" value="#[payload.substring(1)]"
doc:name="Rowid" />
<choice doc:name="Choice">
<when expression="#[flowVars.flag == "I"]">
<async doc:name="Async-Insert Cache">
<flow-ref name="Insert_Update_Cache_Sub_Flow"doc:name="Insert_Update_Cache_Sub_Flow" />
</async>
</when>
<when expression="#[flowVars.flag == "U"]">
<async doc:name="Async-Update Cache">
<flow-ref name="Insert_Update_Cache_Sub_Flow"doc:name="Insert_Update_Cache_Sub_Flow" />
</async>
</when>
<when expression="#[flowVars.flag == "D"]">
<processor-chain doc:name="Delete Cache">
<expression-transformer
expression="#[app.registry.Caching_Strategy.getStore().remove(flowVars.rowid)]"
doc:name="Expression" />
<logger
message="Employee record with ROWID #[flowVars.rowid] is removed from cache"
level="INFO" doc:name="Logger" />
</processor-chain>
</when>
</choice>
</flow>
2. Insert_Update_Cache_Sub_Flow
This sub flow obtains the employee records pertaining to the ROWIDs through the Database connector, transforms them into Java objects and subsequently refreshes the cache.
The Transform Message component uses com.db.cache.Employee POJO as the transformed Java object. The complete code is shown below:
package com.db.cache;
@SuppressWarnings("serial")
public class Employee implements java.io.Serializable {
private int employeeId;
private int departmentId;
private String firstName;
private String lastName;
private float salary;
private String rowid;
public void setEmployeeId(int employeeId) {
this.employeeId = employeeId;
}
public int getEmployeeId() {
return employeeId;
}
public void setDepartmentId(int departmentId) {
this.departmentId = departmentId;
}
public int getDepartmentId() {
return departmentId;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getLastName() {
return lastName;
}
public void setSalary(float salary) {
this.salary = salary;
}
public float getSalary() {
return salary;
}
public void setRowid(String rowid) {
this.rowid = rowid;
}
public String getRowid() {
return rowid;
}
}
Sub Flow Elements:
Serial No. | Element Type | Display Name | Configurations | |
1 | Database Connector | Database | Connector configurations: Oracle Configuration Name: Oracle_Configuration Host: ${db.host} Port: ${db.port} Instance: ${db.instance} User: ${db.user} Password: ${db.password} Enable DataSense: Checked Oracle Driver: ojdbc6.jar Operation: Select Query Type: Parameterized Parameterized query: select''||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_IDfrom EMPLOYEES where ROWID = #[payload.substring(1)] | |
2 | Transform Message | Transform Message | %dw 1.0 %output application/java --- payload map ((payload01 ,indexOfPayload01) -> { departmentId:payload01.DEPARTMENT_ID as :numberdefault 0, employeeId: payload01.EMPLOYEE_IDas :number, firstName: payload01.FIRST_NAMEdefault "", lastName: payload01.LAST_NAMEdefault "", rowid: flowVars.flag ++payload01."''||ROWID", salary: payload01.SALARY as :numberdefault 0 } as :object {class:"com.db.cache.Employee"}) | |
3 | For Each | For Each | Wrapped Element | Configurations |
Variable | Display Name: Cache Key Name: key Value: #[payload.getRowid()] | |||
Flow Reference | Display Name: Ehcache_Sub_Flow Flow name: Ehcache_Sub_Flow |
Sub Flow XML:
<sub-flow name="Insert_Update_Cache_Sub_Flow">
<db:select config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select ''||ROWID, EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from EMPLOYEES where ROWID =#[payload.substring(1)]]]></db:parameterized-query>
</db:select>
<dw:transform-message metadata:id="c5016f4b-6468-4137-b96a-601feaca71f9"
doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
payload map ((payload01 , indexOfPayload01) -> {
departmentId: payload01.DEPARTMENT_ID as :number default 0,
employeeId: payload01.EMPLOYEE_ID as :number,
firstName: payload01.FIRST_NAME default "",
lastName: payload01.LAST_NAME default "",
rowid: flowVars.flag ++ payload01."''||ROWID",
salary: payload01.SALARY as :number default 0
} as :object {class: "com.db.cache.Employee"})]]></dw:set-payload>
</dw:transform-message>
<foreach doc:name="For Each">
<set-variable variableName="key" value="#[payload.getRowid()]"
doc:name="Cache Key" />
<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />
</foreach>
</sub-flow>
Ehcache_Sub_Flow
This sub flow is associated with a Mule Cache scope with Ehcache as the data store. You need to configure Mule to use Ehcache to handle the caching. Follow the steps:
Define cache manager and cache factory bean
<spring:beans>
<spring:bean id="cacheManager" name="cacheManager"
class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
scope="singleton" />
<spring:bean id="cache" name="cache"
class="org.springframework.cache.ehcache.EhCacheFactoryBean"scope="singleton">
<spring:property name="cacheManager" ref="cacheManager" />
<spring:property name="cacheName" value="dbCache" />
<spring:property name="maxElementsInMemory" value="10000" />
<spring:property name="eternal" value="true" />
<spring:property name="timeToIdle" value="120" />
<spring:property name="timeToLive" value="120" />
<spring:property name="overflowToDisk" value="true" />
<spring:property name="maxElementsOnDisk" value="10000000" />
<spring:property name="diskPersistent" value="false" />
<spring:property name="diskExpiryThreadIntervalSeconds"
value="120" />
<spring:property name="memoryStoreEvictionPolicy"
value="LRU" />
</spring:bean>
</spring:beans>
Create a custom object store
Once the cache manager and cache factory bean are configured, the next step is to define the custom object store that uses Ehcache to store and retrieve the data. We just need to create a new class that implementsorg.mule.api.store.ObjectStore interface, and use Ehcache to do the operations.
package com.db.cache;
import java.io.Serializable;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
public class EmployeeEhcacheObjectStore<T extends Serializable> implements
ObjectStore<T> {
private Ehcache cache;
@Override
public synchronized boolean contains(Serializable key)
throws ObjectStoreException {
return cache.isKeyInCache(key);
}
@Override
public synchronized void store(Serializable key, T value)
throws ObjectStoreException {
Serializable k = key;
if (k.toString().charAt(0) == 'U' || k.toString().charAt(0) == 'I') {
k = (Serializable) k.toString().substring(1);
}
Element element = new Element(k, value);
cache.put(element);
}
@SuppressWarnings("unchecked")
@Override
public synchronized T retrieve(Serializable key)
throws ObjectStoreException {
Element element = cache.get(key);
if (element == null) {
return null;
}
return (T) element.getValue();
}
@Override
public synchronized T remove(Serializable key) throws ObjectStoreException {
T value = retrieve(key);
cache.remove(key);
return value;
}
@Override
public boolean isPersistent() {
return false;
}
public Ehcache getCache() {
return cache;
}
public void setCache(Ehcache cache) {
this.cache = cache;
}
@Override
public void clear() throws ObjectStoreException {
// TODO Auto-generated method stub
}
}
Define the Mule caching strategy
In order to use the newly created object store, the next step is to configure a caching strategy, as shown below.
<ee:object-store-caching-strategy name="Caching_Strategy"
keyGenerationExpression="#[flowVars.key]"doc:name="Caching Strategy">
<custom-object-store class="com.db.cache.
EmployeeEhcacheObjectStore">
<spring:property name="cache" ref="cache" />
</custom-object-store>
</ee:object-store-caching-strategy>
Sub Flow Elements:
Serial No. | Element Type | Display Name | Configurations | |
1 | Cache | Cache | Caching strategy reference: Reference to a strategy Name: Caching_Strategy Object Store: core:custom-object-store class=com.db.cache.EmployeeEhcacheObjectStore Spring property: Name: cache Reference: cache Key Expression: #[flowVars.key] | |
Wrapped Element | Configurations | |||
Variable | Name: msg Value: #['ROWID: '+payload.getRowid().substring(1)+', EMPLOYEE_ID: '+payload.getEmployeeId()+', FIRSTNAME: '+payload.getFirstName()+', LASTNAME: '+payload.getLastName()+', DEPARTMENT_ID: '+payload.getDepartmentId()+', SALARY: '+payload.getSalary()] | |||
Logger | Message: #[payload.getRowid().charAt(0) == 'I'? flowVars.msg +' loaded in cache' :flowVars.msg+' modified in cache'] |
Sub Flow XML:
<sub-flow name="Ehcache_Sub_Flow">
<ee:cache cachingStrategy-ref="Caching_Strategy" doc:name="Cache">
<set-variable variableName="msg"
value="#['ROWID: '+payload.getRowid().substring(1)+', EMPLOYEE_ID: '+payload.getEmployeeId()+', FIRSTNAME: '+payload.getFirstName()+', LASTNAME: '+payload.getLastName()+', DEPARTMENT_ID: '+payload.getDepartmentId()+', SALARY: '+payload.getSalary()]"
doc:name="Variable" />
<logger
message="#[payload.getRowid().charAt(0) == 'I' ? flowVars.msg +' loaded in cache' : flowVars.msg+' modified in cache']"
level="INFO" doc:name="Logger" />
</ee:cache>
</sub-flow>
Initialize_Cache_Flow
This flow is used to initialize the cache with all the employee records of HR.EMPLOYEES database table as POJOs. The ROWID is considered as the object’s key to store them in the caching strategy.
The Transform Message component uses com.db.cache.Employee POJO as the transformed Java object. The complete code is shown below:
package com.db.cache;
@SuppressWarnings("serial")
public class Employee implements java.io.Serializable {
private int employeeId;
private int departmentId;
private String firstName;
private String lastName;
private float salary;
private String rowid;
public void setEmployeeId(int employeeId) {
this.employeeId = employeeId;
}
public int getEmployeeId() {
return employeeId;
}
public void setDepartmentId(int departmentId) {
this.departmentId = departmentId;
}
public int getDepartmentId() {
return departmentId;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getLastName() {
return lastName;
}
public void setSalary(float salary) {
this.salary = salary;
}
public float getSalary() {
return salary;
}
public void setRowid(String rowid) {
this.rowid = rowid;
}
public String getRowid() {
return rowid;
}
}
Parse Template File: InitCacheResult.html
Place this HTML template file in src/main/resources project folder.
<html>
<head>
<meta http-equiv="Content-Type"
content="text/html; charset=WINDOWS-1252">
<meta name="generator" content="SQL*Plus 11.2.0">
<style type='text/css'>
body {
font: 10pt Arial, Helvetica, sans-serif;
color: black;
background: White;
}
p {
font: 10pt Arial, Helvetica, sans-serif;
color: black;
background: White;
}
table, tr, td {
font: 10pt Arial, Helvetica, sans-serif;
color: Black;
background: #f7f7e7;
padding: 0px 0px 0px 0px;
margin: 0px 0px 0px 0px;
}
th {
font: bold 10pt Arial, Helvetica, sans-serif;
color: #336699;
background: #cccc99;
padding: 0px 0px 0px 0px;
}
h1 {
font: 16pt Arial, Helvetica, Geneva, sans-serif;
color: #336699;
background-color: White;
border-bottom: 1px solid #cccc99;
margin-top: 0pt;
margin-bottom: 0pt;
padding: 0px 0px 0px 0px;
-
}
h2 {
font: bold 10pt Arial, Helvetica, Geneva, sans-serif;
color: #336699;
background-color: White;
margin-top: 4pt;
margin-bottom: 0pt;
}
a {
font: 9pt Arial, Helvetica, sans-serif;
color: #663300;
background: #ffffff;
margin-top: 0pt;
margin-bottom: 0pt;
vertical-align: top;
}
</style>
<title>Cache Initialization Result</title>
</head>
<body>
<h1>Cache initialized with the following employee records:</h1>
<p>
<table BORDER=2>
<tr>
<th scope="col">ROWID</th>
<th scope="col">DEPARTMENT_ID</th>
<th scope="col">EMPLOYEE_ID</th>
<th scope="col">FIRST_NAME</th>
<th scope="col">LAST_NAME</th>
<th scope="col">SALARY</th>
</tr>
#[payload]
</table>
<p>
</body>
</html >
Use the URL http://localhost:8081/Init to initialize the cache. After the initialization is done, the flow returns all the initialized employee records as an HTML report with the help of a Parse Template, as shown below.
Flow Elements:
Serial No. | Element Type | Display Name | Configurations | |
1 | HTTP Inbound endpoint | HTTP | Connector Configuration: Name: HTTP_Listener_Configuration Protocol: HTTP Host: Default Port: 8081 Path: /Init | |
2 | Database Connector | Database | Connector configurations: Oracle Configuration Name: Oracle_Configuration Host: ${db.host} Port: ${db.port} Instance: ${db.instance} User: ${db.user} Password: ${db.password} Enable DataSense: Checked Oracle Driver: ojdbc6.jar Operation: Select Query Type: Parameterized Parameterized query: select'I'||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID fromEMPLOYEES | |
3 | Transform Message | Transform Message | %dw 1.0 %output application/java --- payload map ((payload01 ,indexOfPayload01) -> { departmentId: payload01.DEPARTMENT_ID as:number default 0, employeeId: payload01.EMPLOYEE_ID as:number, firstName: payload01.FIRST_NAME default'', lastName: payload01.LAST_NAME default '', rowid: payload01."'I'||ROWID", salary: payload01.SALARY as :numberdefault 0 } as :object {class:"com.db.cache.Employee"}) | |
4 | For Each | For Each | Collection: #[payload] Counter Variable Name: Default Batch Size: Default Root Message Variable Name: Default | |
Wrapped Element | Configuration | |||
Variable | Display Name: HTML Name: html Value: #[flowVars['html'] + '<tr><td align="right">' +payload.getRowid().substring(1) + '</td><td align="right">'+payload.getDepartmentId() + '</td><td align="right">' +payload.getEmployeeId() + '</td><td>'+payload.getFirstName()+'</td><td>'+payload.getLastName()+'</td><td align="right">'+payload.getSalary()+'</td></tr>'] | |||
Variable | Display Name: Search Key Name: key Value: #[payload.getRowid()] | |||
Flow Reference | Display Name: Ehcache_Sub_Flow Flow name: Ehcache_Sub_Flow | |||
5 | Set Payload | Set Payload | Value: #[flowVars.html.substring(4)] | |
6 | Parse Template | Parse Template | Location: src\main\resources\InitCacheResult.html |
Flow XML:
<flow name="Initialize_Cache_Flow">
<http:listener config-ref="HTTP_Listener_Configuration"
path="/Init" doc:name="HTTP" />
<db:select config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select 'I'||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from EMPLOYEES]]></db:parameterized-query>
</db:select>
<dw:transform-message metadata:id="c21542f5-3971-4fc6-a5c1-26324e0b0b00"
doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
payload map ((payload01 , indexOfPayload01) -> {
departmentId: payload01.DEPARTMENT_ID as :number default 0,
employeeId: payload01.EMPLOYEE_ID as :number,
firstName: payload01.FIRST_NAME default '',
lastName: payload01.LAST_NAME default '',
rowid: payload01."'I'||ROWID",
salary: payload01.SALARY as :number default 0
} as :object {class: "com.db.cache.Employee"})]]></dw:set-payload>
</dw:transform-message>
<foreach doc:name="For Each">
<set-variable variableName="html"
value="#[flowVars['html'] + '<tr><td align="right">' + payload.getRowid().substring(1) + '</td><td align="right">'+ payload.getDepartmentId() + '</td><td align="right">' +payload.getEmployeeId() + '</td><td>'+payload.getFirstName()+'</td><td>'+payload.getLastName()+'</td><td align="right">'+payload.getSalary()+'</td></tr>']"
doc:name="HTML" />
<set-variable variableName="key" value="#[payload.getRowid()]"
doc:name="Search Key" />
<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />
</foreach>
<set-payload value="#[flowVars.html.substring(4)]"
doc:name="Set Payload" mimeType="text/html" />
<parse-template
location="src\main\resources\InitCacheResult.html"
doc:name="Parse Template" />
</flow>
Search_Cache_Flow
This flow is used to search the employee records from the cache with ROWID as the search key. Use the URL http://localhost:8081/cache?key=ROWID.
Parse Template File: SearchCacheResult.html
Place this HTML template file in src/main/resources project folder.
<html>
<head>
<meta http-equiv="Content-Type"
content="text/html; charset=WINDOWS-1252">
<meta name="generator" content="SQL*Plus 11.2.0">
<style type='text/css'>
body {
font: 10pt Arial, Helvetica, sans-serif;
color: black;
background: White;
}
p {
font: 10pt Arial, Helvetica, sans-serif;
color: black;
background: White;
}
table, tr, td {
font: 10pt Arial, Helvetica, sans-serif;
color: Black;
background: #f7f7e7;
padding: 0px 0px 0px 0px;
margin: 0px 0px 0px 0px;
}
th {
font: bold 12pt Arial, Helvetica, sans-serif;
color: #336699;
background: #cccc99;
padding: 0px 0px 0px 0px;
}
h1 {
font: 16pt Arial, Helvetica, Geneva, sans-serif;
color: #336699;
background-color: White;
border-bottom: 1px solid #cccc99;
margin-top: 0pt;
margin-bottom: 0pt;
padding: 0px 0px 0px 0px;
-
}
h2 {
font: bold 10pt Arial, Helvetica, Geneva, sans-serif;
color: #336699;
background-color: White;
margin-top: 4pt;
margin-bottom: 0pt;
}
a {
font: 9pt Arial, Helvetica, sans-serif;
color: #663300;
background: #ffffff;
margin-top: 0pt;
margin-bottom: 0pt;
vertical-align: top;
}
</style>
<title>Search Cache Result</title>
</head>
<body>
<h1>The following are the Cache search result based on Rowid:</h1>
<p>
<table BORDER=2>
<tr>
<th scope="col">ROWID</th>
<th scope="col">DEPARTMENT_ID</th>
<th scope="col">EMPLOYEE_ID</th>
<th scope="col">FIRST_NAME</th>
<th scope="col">LAST_NAME</th>
<th scope="col">SALARY</th>
</tr>
#[payload]
</table>
<p>
</body>
</html>
The search result is obtained with the help of a Parse Template, as shown below:
Flow Elements:
Serial No. | Element Type | Display Name | Configurations | |
1 | HTTP Inbound endpoint | HTTP | Connector Configuration: Name: HTTP_Listener_Configuration Protocol: HTTP Host: Default Port: 8081 Path: /cache | |
2 | Variable | Variable | Name: key Value: #[message.inboundProperties.'http.query.params'.key] | |
3 | Flow Reference | Ehcache_Sub_Flow | Flow name: Ehcache_Sub_Flow | |
4 | For Each | For Each | Collection: #[payload] Counter Variable Name: Default Batch Size: Default Root Message Variable Name: Default | |
Wrapped Element | Configuration | |||
Variable | Display Name: HTML Name: finalString Value: #[flowVars['finalString'] + '<tr><td align="right">' +payload.getRowid().substring(1) +'</td><td align="right">' +payload.getDepartmentId() + '</td><td align="right">' +payload.getEmployeeId() + '</td><td>'+payload.getFirstName()+'</td><td>'+payload.getLastName()+'</td><td align="right">'+payload.getSalary()+'</td></tr>'] | |||
5 | Set Payload | Set Payload | Value: #[flowVars.finalString.substring(4)] | |
6 | Parse Template | Parse Template | Location: src\main\resources\SearchCacheResult.html |
Flow XML:
<flow name="Search_Cache_Flow">
<http:listener config-ref="HTTP_Listener_Configuration"
path="/cache" doc:name="HTTP" />
<set-variable variableName="key"
value="#[message.inboundProperties.'http.query.params'.key]"
doc:name="Variable" />
<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />
<foreach collection="#[payload]" doc:name="For Each">
<set-variable variableName="finalString"
value="#[flowVars['finalString'] + '<tr><td align="right">' + payload.getRowid().substring(1) +'</td><td align="right">' + payload.getDepartmentId() + '</td><td align="right">' +payload.getEmployeeId() + '</td><td>'+payload.getFirstName()+'</td><td>'+payload.getLastName()+'</td><td align="right">'+payload.getSalary()+'</td></tr>']"
doc:name="Variable" />
</foreach>
<set-payload value="#[flowVars.finalString.substring(4)]"
doc:name="Set Payload" />
<parse-template
location="src\main\resources\SearchCacheResult.html"
doc:name="Parse Template" />
</flow>
Complete Mule Application XML Code (DatabaseChangeNotification.xml):
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"
xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns:tcp="http://www.mulesoft.org/schema/mule/tcp" xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/tcp http://www.mulesoft.org/schema/mule/tcp/current/mule-tcp.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<configuration doc:name="Configuration">
<expression-language autoResolveVariables="true">
<import class="org.mule.util.StringUtils" />
</expression-language>
</configuration>
<spring:beans>
<spring:bean id="cacheManager" name="cacheManager"
class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
scope="singleton" />
<spring:bean id="cache" name="cache"
class="org.springframework.cache.ehcache.EhCacheFactoryBean" scope="singleton">
<spring:property name="cacheManager" ref="cacheManager" />
<spring:property name="cacheName" value="dbCache" />
<spring:property name="maxElementsInMemory" value="10000" />
<spring:property name="eternal" value="true" />
<spring:property name="timeToIdle" value="120" />
<spring:property name="timeToLive" value="120" />
<spring:property name="overflowToDisk" value="true" />
<spring:property name="maxElementsOnDisk" value="10000000" />
<spring:property name="diskPersistent" value="false" />
<spring:property name="diskExpiryThreadIntervalSeconds"
value="120" />
<spring:property name="memoryStoreEvictionPolicy"
value="LRU" />
</spring:bean>
</spring:beans>
<ee:object-store-caching-strategy
name="Caching_Strategy" keyGenerationExpression="#[flowVars.key]"
doc:name="Caching Strategy">
<custom-object-store class="com.db.cache.EmployeeEhcacheObjectStore">
<spring:property name="cache" ref="cache" />
</custom-object-store>
</ee:object-store-caching-strategy>
<tcp:connector name="TCP" validateConnections="true"
sendBufferSize="0" receiveBufferSize="0" receiveBacklog="0"
clientSoTimeout="10000" serverSoTimeout="10000" socketSoLinger="0"
doc:name="TCP">
</tcp:connector>
<object-to-string-transformer name="Object_to_String"
doc:name="Object to String" />
<db:oracle-config name="Oracle_Configuration" host="${db.host}"
port="${db.port}" instance="${db.instance}" user="${db.user}"
password="${db.password}" doc:name="Oracle Configuration" />
<http:listener-config name="HTTP_Listener_Configuration"
host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration" />
<flow name="DatabaseChangeNotificationFlow">
<tcp:inbound-endpoint exchange-pattern="one-way"
host="${tcp.host}" port="${tcp.port}" connector-ref="TCP"
responseTimeout="10000" transformer-refs="Object_to_String" doc:name="DCN Receiver" />
<set-payload value="#[message.payload]" doc:name="Set Payload" />
<splitter expression="#[StringUtils.split(payload, ',')]"
doc:name="Splitter" />
<set-variable variableName="flag" value="#[payload.charAt(0)]"
doc:name="Insert Update Flag" />
<set-variable variableName="rowid" value="#[payload.substring(1)]"
doc:name="Rowid" />
<choice doc:name="Choice">
<when expression="#[flowVars.flag == "I"]">
<async doc:name="Async-Insert Cache">
<flow-ref name="Insert_Update_Cache_Sub_Flow" doc:name="Insert_Update_Cache_Sub_Flow" />
</async>
</when>
<when expression="#[flowVars.flag == "U"]">
<async doc:name="Async-Update Cache">
<flow-ref name="Insert_Update_Cache_Sub_Flow" doc:name="Insert_Update_Cache_Sub_Flow" />
</async>
</when>
<when expression="#[flowVars.flag == "D"]">
<processor-chain doc:name="Delete Cache">
<expression-transformer
expression="#[app.registry.Caching_Strategy.getStore().remove(flowVars.rowid)]"
doc:name="Expression" />
<logger
message="Employee record with ROWID #[flowVars.rowid] is removed from cache"
level="INFO" doc:name="Logger" />
</processor-chain>
</when>
</choice>
</flow>
<sub-flow name="Insert_Update_Cache_Sub_Flow">
<db:select config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select ''||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from EMPLOYEES where ROWID = #[payload.substring(1)]]]></db:parameterized-query>
</db:select>
<dw:transform-message metadata:id="c5016f4b-6468-4137-b96a-601feaca71f9"
doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
payload map ((payload01 , indexOfPayload01) -> {
departmentId: payload01.DEPARTMENT_ID as :number default 0,
employeeId: payload01.EMPLOYEE_ID as :number,
firstName: payload01.FIRST_NAME default "",
lastName: payload01.LAST_NAME default "",
rowid: flowVars.flag ++ payload01."''||ROWID",
salary: payload01.SALARY as :number default 0
} as :object {class: "com.db.cache.Employee"})]]></dw:set-payload>
</dw:transform-message>
<foreach doc:name="For Each">
<set-variable variableName="key" value="#[payload.getRowid()]"
doc:name="Cache Key" />
<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />
</foreach>
</sub-flow>
<sub-flow name="Ehcache_Sub_Flow">
<ee:cache cachingStrategy-ref="Caching_Strategy" doc:name="Cache">
<set-variable variableName="msg"
value="#['ROWID: '+payload.getRowid().substring(1)+', EMPLOYEE_ID: '+payload.getEmployeeId()+', FIRSTNAME: '+payload.getFirstName()+', LASTNAME: '+payload.getLastName()+', DEPARTMENT_ID: '+payload.getDepartmentId()+', SALARY: '+payload.getSalary()]"
doc:name="Variable" />
<logger
message="#[payload.getRowid().charAt(0) == 'I' ? flowVars.msg +' loaded in cache' : flowVars.msg+' modified in cache']"
level="INFO" doc:name="Logger" />
</ee:cache>
</sub-flow>
<flow name="Search_Cache_Flow">
<http:listener config-ref="HTTP_Listener_Configuration"
path="/cache" doc:name="HTTP" />
<set-variable variableName="key"
value="#[message.inboundProperties.'http.query.params'.key]"
doc:name="Variable" />
<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />
<foreach collection="#[payload]" doc:name="For Each">
<set-variable variableName="finalString"
value="#[flowVars['finalString'] + '<tr><td align="right">' + payload.getRowid().substring(1) +'</td><td align="right">' + payload.getDepartmentId() + '</td><td align="right">' +payload.getEmployeeId() + '</td><td>'+payload.getFirstName()+'</td><td>'+payload.getLastName()+'</td><td align="right">'+payload.getSalary()+'</td></tr>']"
doc:name="Variable" />
</foreach>
<set-payload value="#[flowVars.finalString.substring(4)]"
doc:name="Set Payload" />
<parse-template
location="src\main\resources\SearchCacheResult.html"
doc:name="Parse Template" />
</flow>
<flow name="Initialize_Cache_Flow">
<http:listener config-ref="HTTP_Listener_Configuration"
path="/Init" doc:name="HTTP" />
<db:select config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select 'I'||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from EMPLOYEES]]></db:parameterized-query>
</db:select>
<dw:transform-message metadata:id="c21542f5-3971-4fc6-a5c1-26324e0b0b00"
doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
payload map ((payload01 , indexOfPayload01) -> {
departmentId: payload01.DEPARTMENT_ID as :number default 0,
employeeId: payload01.EMPLOYEE_ID as :number,
firstName: payload01.FIRST_NAME default '',
lastName: payload01.LAST_NAME default '',
rowid: payload01."'I'||ROWID",
salary: payload01.SALARY as :number default 0
} as :object {class: "com.db.cache.Employee"})]]></dw:set-payload>
</dw:transform-message>
<foreach doc:name="For Each" collection="#[payload]">
<set-variable variableName="html"
value="#[flowVars['html'] + '<tr><td align="right">' + payload.getRowid().substring(1) + '</td><td align="right">'+ payload.getDepartmentId() + '</td><td align="right">' +payload.getEmployeeId() + '</td><td>'+payload.getFirstName()+'</td><td>'+payload.getLastName()+'</td><td align="right">'+payload.getSalary()+'</td></tr>']"
doc:name="HTML" />
<set-variable variableName="key" value="#[payload.getRowid()]"
doc:name="Search Key" />
<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />
</foreach>
<set-payload value="#[flowVars.html.substring(4)]"
doc:name="Set Payload" mimeType="text/html" />
<parse-template
location="src\main\resources\InitCacheResult.html"
doc:name="Parse Template" />
</flow>
</mule>
Conclusion
Finally, let us run the DatabaseChangeNotificationClient.java and the Mule application and justify with the help of the log reports and search facility that the cache is refreshed when the DML operations are performed at the back end on the database object HR.EMPLOYEES associated with the registered SQL query for database change notifications.
Run DatabaseChangeNotificationClient.java as a java application.
Run DatabaseChangeNotification.xml as Mule application.
SQL Insert DML operation on HR.EMPLOYEES table.
Mule Logger reports that the SQL Insert has refreshed the Mule cache with the two new records. You can also check with the URL http://localhost:8081/cache?key=ROWID to view the record obtained from the cache. (Note: Substitute the ROWID in the URL with the actual ROWID from the log below).
INFO 2016-02-17 00:00:24,065 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.1.03] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADPAAG, EMPLOYEE_ID: 223, FIRSTNAME: Christopher, LASTNAME: Lee, DEPARTMENT_ID: 0, SALARY: 1200.0 loaded in cache
INFO 2016-02-17 00:00:24,127 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.1.04] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADPAAH, EMPLOYEE_ID: 224, FIRSTNAME: Prashant, LASTNAME: Jha, DEPARTMENT_ID: 0, SALARY: 1200.0 loaded in cache
- SQL Update DML operation on HR.EMPLOYEES table.
Mule Logger reports that the SQL update has refreshed the Mule cache with the four modified records. You can also check with the URL http://localhost:8081/cache?key=ROWID to view the record obtained from the cache. (Note: Substitute the ROWID in the URL with the actual ROWID from the log below).
INFO 2016-02-17 00:11:30,514 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.2.02] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADMAAA, EMPLOYEE_ID: 207, FIRSTNAME: Peck, LASTNAME: Gregory, DEPARTMENT_ID: 90, SALARY: 7700.0 modified in cache
INFO 2016-02-17 00:11:30,650 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.2.03] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADNAAA, EMPLOYEE_ID: 100, FIRSTNAME: Steven, LASTNAME: King, DEPARTMENT_ID: 90, SALARY: 7700.0 modified in cache
INFO 2016-02-17 00:11:30,716 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.2.05] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADNAAC, EMPLOYEE_ID: 102, FIRSTNAME: Lex, LASTNAME: De Haan, DEPARTMENT_ID: 90, SALARY: 7700.0 modified in cache
INFO 2016-02-17 00:11:30,799 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.2.04] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADNAAB, EMPLOYEE_ID: 101, FIRSTNAME: Neena, LASTNAME: Kochhar, DEPARTMENT_ID: 90, SALARY: 7700.0 modified in cache
- SQL Delete DML operation on HR.EMPLOYEES table.
Mule Logger reports that the SQL delete has refreshed the Mule cache with the two deleted records. You can also check with the URL http://localhost:8081/cache?key=ROWID to view the result. In this case you will see the result as an exception stating the message as NullPayload (Note: Substitute the ROWID in the URL with the actual ROWID from the log below).
INFO 2016-02-17 00:19:01,898 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.stage1.05] org.mule.api.processor.LoggerMessageProcessor: Employee record with ROWID AAAEAbAAEAAAADPAAG is removed from cache
INFO 2016-02-17 00:19:01,906 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.stage1.05] org.mule.api.processor.LoggerMessageProcessor: Employee record with ROWID AAAEAbAAEAAAADPAAH is removed from cache
Email: pbghosh67@gmail.com
Opinions expressed by DZone contributors are their own.
Comments