Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Refreshing Mule Cache Using Oracle Database Change Notification

DZone's Guide to

Refreshing Mule Cache Using Oracle Database Change Notification

How to use Mule Cache refresh using Oracle's Database Change Notification software.

· Database Zone
Free Resource

Download the Guide to Open Source Database Selection: MySQL vs. MariaDB and see how the side-by-side comparison of must-have features will ease the journey. Brought to you in partnership with MariaDB.

Objective

This technical article is all about Mule Cache refresh using Oracle Database Change Notification. The audience of Mule ESB can also understand how we can designate the open source Cache framework Ehcache as the data store for the Mule Cache.

Image title

Prerequisites

To run the Mule application, you need to have the following software to be installed on your machine.

  1. Oracle XE 11.2g: http://www.oracle.com/technetwork/database/database-technologies/express-edition/downloads/index.html
  2. Java SE 8: http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
  3. Mule Anypoint Studio 5.4 with 3.7.3 Runtime: https://www.mulesoft.com/lp/dl/studio
  4. Mule ESB 3.7.3 (Optional): https://developer.mulesoft.com/download-mule-esb-runtime
  5. Ehcache 3.0 Milestone 3: https://github.com/ehcache/ehcache3/releases/download/v3.0.0.m3/ehcache-3.0.0.m3.jar
  6. Mule Application Properties (mule-app.properties): Use the following properties or define them as per your system.

db.host=localhost

db.port=1521

db.instance=XE

db.user=hr

db.password=hr

http.host=localhost

http.port=8081

tcp.host=localhost

tcp.port=60505

Database Change Notification Concept

As you can understand from the title of this article, the main objective of this mule application is to designate Mule Cache as a middle-tier data cache and how this data cache can be refreshed using Oracle’s Database Change Notification feature to avoid redundant queries to the database. Also, you can understand how the open source cache framework Ehcache can be used as the data store for the caching purpose.

Let us first focus on the concept of Oracle’s Database Change Notification. Starting from 11g Release 1 (11.1), Oracle JDBC drivers provide support for the Database Change Notification feature of Oracle Database. Using this functionality of the JDBC drivers, multitier systems can take advantage of the Database Change Notification feature to maintain a data cache as up-to-date as possible, by receiving invalidation events from the JDBC drivers. The following diagram depicts a high level Database Change Notification process.

Image title


How to Code Database Change Notification Client?

Now, let us see the steps to be considered in order to process database change notification and code the client.

  1. You need to grant the CHANGE NOTIFICATION privilege to the oracle database user.

GRANT CHANGE NOTIFICATION TO <user-name>

Image title

  1. Registration: You need to create a registration of SQL queries with the JDBC drivers for Database Change Notification.
    1. Establish Oracle Database connection
final String USERNAME = "hr";

final String PASSWORD = "hr";

final String URL = "jdbc:oracle:thin:@localhost:1521:XE";

final oracle.jdbc.OracleDriver dr = new 
oracle.jdbcOracleDriver();

final java.util.Properties prop = new java.util.Properties();



prop.setProperty("user", USERNAME);

prop.setProperty("password", PASSWORD);



oracle.jdbc.OracleConnection conn = dr.connect(URL, prop);


    1. Set the registration options in a Properties object
final java.util.Properties prop = new java.util.Properties();



prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");


prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION,"true");

oracle.jdbc.dcn.DatabaseChangeRegistration dcr = 
conn.registerDatabaseChangeNotification(prop);


  1. Associating the query with the registration: After you have created a registration, you have to associate a query with it which is a one-time process and is done outside of the currently used registration. The query will be associated even if the local transaction is rolled back.

final Statement stmt = conn.createStatement();

((oracle.jdbc.OracleStatement)
stmt).setDatabaseChangeRegistration(dcr);

final ResultSet rs = stmt
.executeQuery("select EMPLOYEE_ID, 
FIRST_NAME,LAST_NAME, SALARY, DEPARTMENT_ID from employees");

while (rs.next()) {

}


  1. Notification or Notifying Database Change Events: In order to receive the database change events or notifications, attach a listener to the registration. When a database change event occurs, the database server notifies the JDBC driver and subsequently the driver then constructs a new Java event, identifies the registration to be notified, and notifies the listeners attached to the registration. The event contains the object ID of the database object that has changed and the type of operation that caused the change. Depending on the registration options (in this case, the options are OracleConnection.DCN_NOTIFY_ROWIDS and OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION), the event may also contain row-level detail information. The listener code can then use the event to make decisions about the data cache.

    1. Define the listener class: The listener class must implement oracle.jdbc.dcn. DatabaseChangeListener interface and must override the method public void onDatabaseChangeNotification (oracle.jdbc.dcn.DatabaseChangeEvent e).

class DatabaseChangeNotificationListener implements DatabaseChangeListener {

//declarations

public void onDatabaseChangeNotification 
(DatabaseChangeEvent e) {

​
//To-do code

}

}


    1. Attach the listener to the registration: You can attach the listener using the addListener method.


  1. Deleting a Registration: You need to explicitly unregister a registration to delete it from the server and release the resources in the driver. You can unregister a registration using a connection different from one that was used for creating it. To unregister a registration, you can use the unregisterDatabaseChangeNotification method defined in oracle.jdbc.OracleConnection.

You must pass the DatabaseChangeRegistration object as a parameter to this method. This method deletes the registration from the server and the driver and closes the listener socket.

So far you have learnt the JDBC API related stuff to write the listener or client. Remember, the listener is a standalone Java application which also needs to communicate with the Mule TCP connector usingorg.mule.module.client.MuleClient to send the database change notification events as the payload. Here, the payload is consisting of ROWIDs which were associated with the SQL query change event as a result of the DML like SQL INSERT, SQL UPDATE, and SQL DELETE. In this case, the database object associated with the registered SQL query is HR.EMPLOYEES as depicted in point number 3 above. The complete listener code is shown below.

package com.db.dcnclient;



import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.Properties;



import org.mule.api.MuleContext;

import org.mule.module.client.MuleClient;

import org.mule.DefaultMuleMessage;



import oracle.jdbc.OracleConnection;

import oracle.jdbc.OracleDriver;

import oracle.jdbc.OracleStatement;

import oracle.jdbc.dcn.DatabaseChangeEvent;

import oracle.jdbc.dcn.DatabaseChangeListener;

import oracle.jdbc.dcn.DatabaseChangeRegistration;

import oracle.jdbc.dcn.QueryChangeDescription;

import oracle.jdbc.dcn.RowChangeDescription;

import oracle.jdbc.dcn.RowChangeDescription.RowOperation;

import oracle.jdbc.dcn.TableChangeDescription;



public class DatabaseChangeNotificationClient {



public MuleContext context;

public DefaultMuleMessage message;

public String payload;

DatabaseChangeRegistration dcr;

OracleConnection conn;



public static void main(String[] args) throws SQLException {

DatabaseChangeNotificationClient dcn = newDatabaseChangeNotificationClient();



try {

dcn.run();

} catch (final SQLException mainSQLException) {

mainSQLException.printStackTrace();

} catch (java.lang.RuntimeException e) {

if (dcn.conn != null)

dcn.conn.unregisterDatabaseChangeNotification(dcn.dcr);

throw e;

}

}



void run() throws SQLException {

conn = connect();

final Properties prop = new Properties();



prop.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");

prop.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true");



dcr = conn.registerDatabaseChangeNotification(prop);



try {



final DatabaseChangeNotificationListener list = newDatabaseChangeNotificationListener(

this);

dcr.addListener(list);

final Statement stmt = conn.createStatement();

((OracleStatement) stmt).setDatabaseChangeRegistration(dcr);

final ResultSet rs = stmt

.executeQuery("select EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from employees");

while (rs.next()) {

}

final String[] tableNames = dcr.getTables();

for (int i = 0; i < tableNames.length; i++)

System.out.println(tableNames[i]

+ " is part of the registration.");

rs.close();

stmt.close();

} catch (final SQLException ex) {

if (conn != null)

conn.unregisterDatabaseChangeNotification(dcr);

throw ex;

} finally {

try {

conn.close();

} catch (final Exception innerex) {

innerex.printStackTrace();

}

}

}



/**

* Creates a connection the database.

*/

public OracleConnection connect() throws SQLException {



final String USERNAME = "hr";

final String PASSWORD = "hr";

final String URL = "jdbc:oracle:thin:@localhost:1521:XE";

final OracleDriver dr = new OracleDriver();

final Properties prop = new Properties();



prop.setProperty("user", USERNAME);

prop.setProperty("password", PASSWORD);



return (OracleConnection) dr.connect(URL, prop);

}



protected void finalize() throws SQLException {

if (conn != null)

conn.unregisterDatabaseChangeNotification(dcr);

}

}



/**

* DCN listener

*/

class DatabaseChangeNotificationListener implements DatabaseChangeListener {

DatabaseChangeNotificationClient dcn;



DatabaseChangeNotificationListener(

final DatabaseChangeNotificationClient dem) {

dcn = dem;

}



public void onDatabaseChangeNotification(final DatabaseChangeEvent e) {

MuleClient client = null;

if (e.getEventType() == DatabaseChangeEvent.EventType.QUERYCHANGE

&& e.getRegId() == dcn.dcr.getRegId()) {

try {



final QueryChangeDescription[] q = e

.getQueryChangeDescription();

TableChangeDescription[] t = null;

RowChangeDescription[] r = null;



StringBuffer buf = null;

buf = new StringBuffer();

char oper=0;

​
for (int i = 0; i < q.length; i++) {

t = q[i].getTableChangeDescription();

for (int j = 0; j < t.length; j++) {

r = t[j].getRowChangeDescription();

for (int k = 0; k < r.length; k++) {

if (r[k].getRowOperation() == RowOperation.INSERT) {

oper = 'I';

} else if (r[k].getRowOperation() == RowOperation.UPDATE) {

oper = 'U';

} else {

oper = 'D';

}

final String rowId = r[k].getRowid().stringValue();

buf.append(oper + rowId.concat(","));

}

}

}



client = new MuleClient(true);

dcn.payload = buf.substring(0, buf.lastIndexOf(","));

dcn.message = new DefaultMuleMessage("",

client.getMuleContext());

dcn.message.setPayload(dcn.payload);

client.dispatch("tcp://localhost:60505", dcn.message, null);

client.dispose();



} catch (final Exception e1) {

e1.printStackTrace();

}

}

}

}


Note: Create a Java Project as DCNClient in Anypoint Studio and include com.db.dcnclient. DatabaseChangeNotificationClient.java class, as defined above. Include Mule Server 3.7.3 EE libraries and ojdbc6.jar in Java Build Path of the project. Run this class as a Java Application.

DatabaseChangeNotification Mule Application

Now that you have learnt the fundamentals of Oracle Database Change Notification and how to handle the database change events with the listener, let us proceed to the Mule application that is responsible for refreshing the cache with regards to the database change notifications. The Mule application is associated with the following flows and sub flows: (Note: Include ojdbc6.jar and ehcache-3.0.0.m3.jar in your mule project).

  1. DatabaseChangeNotificationFlow

This flow receives the comma separated ROWIDs of the database table (HR.EMPLOYEES) as a result of SQL INSERT, SQL UPDATE, or SQL DELETE through the TCP inbound endpoint and refreshes the cache accordingly. The listener communicates with the TCP endpoint and sends the comma separated ROWIDs for further processing.

Image title


Flow Elements:

Serial No.

Element Type

Display Name

Configurations

1

TCP Connector

DCN Receiver

Exchange Patterns: One Way

Host: {tcp.host}

Port: {tcp.port}

Global Elements

Name: TCP

2

Set Payload

Set Payload

Value: #[message.payload]

3

Splitter

Splitter

Expression: #[StringUtils.split(payload, ',')]

Global Configurations:

<configuration doc:name="Configuration">

<expression-language autoResolveVariables="true">

<import class="org.mule.util.StringUtils" />

</expression-language>

</configuration>


4

Variable

Insert Update Flag

Name: flag

Value: #[payload.charAt(0)]

5

Variable

Rowid

Name: rowed

Value: #[payload.substring(1)]

6

Choice

Choice

When

Route Message To

#[flowVars.flag == "I"]

Async-Insert Cache

#[flowVars.flag == "U"]

Async_Update Cache

#[flowVars.flag == "D"]

Delete Cache Processor Chain

7

Async

Async-Insert Cache

Wrapped Element

Configurations

Flow Reference

Flow name: Insert_Update_Cache_Sub_Flow

8

Async

Async-Update Cache

Wrapped Element

Configurations

Flow Reference

Flow name: Insert_Update_Cache_Sub_Flow

9

Flow Reference

Insert_Update_Cache_Sub_Flow

Flow name: Insert_Update_Cache_Sub_Flow

10

Processor Chain

Delete Cache

Wrapped Element

Configurations

Expression

Expression: #[app.registry.Caching_Strategy.getStore().remove(flowVars.rowid)]

Logger

Message: Employee record with ROWID #[flowVars.rowid] is removed from cache


Flow XML:

<flow name="DatabaseChangeNotificationFlow">

<tcp:inbound-endpoint exchange-pattern="one-way"

host="${tcp.host}" port="${tcp.port}" connector-ref="TCP"

responseTimeout="10000" transformer-refs="Object_to_String"doc:name="DCN Receiver" />

<set-payload value="#[message.payload]" doc:name="Set Payload" />

<splitter expression="#[StringUtils.split(payload, ',')]"

doc:name="Splitter" />

<set-variable variableName="flag" value="#[payload.charAt(0)]"

doc:name="Insert Update Flag" />

<set-variable variableName="rowid" value="#[payload.substring(1)]"

doc:name="Rowid" />

<choice doc:name="Choice">

<when expression="#[flowVars.flag == &quot;I&quot;]">

<async doc:name="Async-Insert Cache">

<flow-ref name="Insert_Update_Cache_Sub_Flow"doc:name="Insert_Update_Cache_Sub_Flow" />

</async>

</when>

<when expression="#[flowVars.flag == &quot;U&quot;]">

<async doc:name="Async-Update Cache">

<flow-ref name="Insert_Update_Cache_Sub_Flow"doc:name="Insert_Update_Cache_Sub_Flow" />

</async>

</when>

<when expression="#[flowVars.flag == &quot;D&quot;]">

<processor-chain doc:name="Delete Cache">

<expression-transformer

expression="#[app.registry.Caching_Strategy.getStore().remove(flowVars.rowid)]"

doc:name="Expression" />

<logger

message="Employee record with ROWID #[flowVars.rowid] is removed from cache"

level="INFO" doc:name="Logger" />

</processor-chain>

</when>

</choice>

</flow>


2. Insert_Update_Cache_Sub_Flow

This sub flow obtains the employee records pertaining to the ROWIDs through the Database connector, transforms them into Java objects and subsequently refreshes the cache.

Image title


The Transform Message component uses com.db.cache.Employee POJO as the transformed Java object. The complete code is shown below:

package com.db.cache;

@SuppressWarnings("serial")

public class Employee implements java.io.Serializable {

private int employeeId;

private int departmentId;

private String firstName;

private String lastName;

private float salary;

private String rowid;



public void setEmployeeId(int employeeId) {

this.employeeId = employeeId;

}



public int getEmployeeId() {

return employeeId;

}



public void setDepartmentId(int departmentId) {

this.departmentId = departmentId;

}



public int getDepartmentId() {

return departmentId;

}



public void setFirstName(String firstName) {

this.firstName = firstName;

}



public String getFirstName() {

return firstName;

}



public void setLastName(String lastName) {

this.lastName = lastName;

}



public String getLastName() {

return lastName;

}



public void setSalary(float salary) {

this.salary = salary;

}



public float getSalary() {

return salary;

}

​
public void setRowid(String rowid) {

this.rowid = rowid;

}

​
public String getRowid() {

return rowid;

}

}


Sub Flow Elements:

Serial No.

Element Type

Display Name

Configurations

1

Database Connector

Database

Connector configurations: Oracle Configuration

Name: Oracle_Configuration

Host: ${db.host}

Port: ${db.port}

Instance: ${db.instance}

User: ${db.user}

Password: ${db.password}

Enable DataSense: Checked

Oracle Driver: ojdbc6.jar

Operation: Select

Query Type: Parameterized

Parameterized query: select''||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_IDfrom EMPLOYEES where ROWID = #[payload.substring(1)]

2

Transform Message

Transform Message

%dw 1.0

%output application/java

---

payload map ((payload01 ,indexOfPayload01) -> {

departmentId:payload01.DEPARTMENT_ID as :numberdefault 0,


employeeId: payload01.EMPLOYEE_IDas :number,



firstName: payload01.FIRST_NAMEdefault "",




lastName: payload01.LAST_NAMEdefault "",


rowid: flowVars.flag ++payload01."''||ROWID",


salary: payload01.SALARY as :numberdefault 0

as :object {class:"com.db.cache.Employee"})

3

For Each

For Each

Wrapped Element

Configurations

Variable

Display Name: Cache Key

Name: key

Value: #[payload.getRowid()]

Flow Reference

Display Name: Ehcache_Sub_Flow

Flow name: Ehcache_Sub_Flow

Sub Flow XML:

<sub-flow name="Insert_Update_Cache_Sub_Flow">

<db:select config-ref="Oracle_Configuration" doc:name="Database">

<db:parameterized-query><![CDATA[select ''||ROWID, EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from EMPLOYEES where ROWID =#[payload.substring(1)]]]></db:parameterized-query>

</db:select>

<dw:transform-message metadata:id="c5016f4b-6468-4137-b96a-601feaca71f9"

doc:name="Transform Message">

<dw:set-payload><![CDATA[%dw 1.0

%output application/java

---

payload map ((payload01 , indexOfPayload01) -> {

departmentId: payload01.DEPARTMENT_ID as :number default 0,



employeeId: payload01.EMPLOYEE_ID as :number,



firstName: payload01.FIRST_NAME default "",



lastName: payload01.LAST_NAME default "",



rowid: flowVars.flag ++ payload01."''||ROWID",



salary: payload01.SALARY as :number default 0

} as :object {class: "com.db.cache.Employee"})]]></dw:set-payload>

</dw:transform-message>

<foreach doc:name="For Each">

<set-variable variableName="key" value="#[payload.getRowid()]"

doc:name="Cache Key" />

<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />

</foreach>

</sub-flow>


  1. Ehcache_Sub_Flow

This sub flow is associated with a Mule Cache scope with Ehcache as the data store. You need to configure Mule to use Ehcache to handle the caching. Follow the steps:

Image title

    1. Define cache manager and cache factory bean

<spring:beans>



<spring:bean id="cacheManager" name="cacheManager"

class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"

scope="singleton" />



<spring:bean id="cache" name="cache"

class="org.springframework.cache.ehcache.EhCacheFactoryBean"scope="singleton">

<spring:property name="cacheManager" ref="cacheManager" />

<spring:property name="cacheName" value="dbCache" />

<spring:property name="maxElementsInMemory" value="10000" />

<spring:property name="eternal" value="true" />

<spring:property name="timeToIdle" value="120" />

<spring:property name="timeToLive" value="120" />

<spring:property name="overflowToDisk" value="true" />

<spring:property name="maxElementsOnDisk" value="10000000" />

<spring:property name="diskPersistent" value="false" />

<spring:property name="diskExpiryThreadIntervalSeconds"

value="120" />

<spring:property name="memoryStoreEvictionPolicy"

value="LRU" />

</spring:bean>

</spring:beans>


    1. Create a custom object store

Once the cache manager and cache factory bean are configured, the next step is to define the custom object store that uses Ehcache to store and retrieve the data. We just need to create a new class that implementsorg.mule.api.store.ObjectStore interface, and use Ehcache to do the operations.

package com.db.cache;



import java.io.Serializable;

import net.sf.ehcache.Ehcache;

import net.sf.ehcache.Element;



import org.mule.api.store.ObjectStore;

import org.mule.api.store.ObjectStoreException;



public class EmployeeEhcacheObjectStore<T extends Serializable> implements

ObjectStore<T> {



private Ehcache cache;



@Override

public synchronized boolean contains(Serializable key)

throws ObjectStoreException {



return cache.isKeyInCache(key);

}



@Override

public synchronized void store(Serializable key, T value)

throws ObjectStoreException {

​
Serializable k = key;

if (k.toString().charAt(0) == 'U' || k.toString().charAt(0) == 'I') {

k = (Serializable) k.toString().substring(1);

}

​
Element element = new Element(k, value);

cache.put(element);

}



@SuppressWarnings("unchecked")

@Override

public synchronized T retrieve(Serializable key)

throws ObjectStoreException {



Element element = cache.get(key);

if (element == null) {

return null;

}

return (T) element.getValue();

}



@Override

public synchronized T remove(Serializable key) throws ObjectStoreException {

​
T value = retrieve(key);

cache.remove(key);

return value;

}



@Override

public boolean isPersistent() {

return false;

}



public Ehcache getCache() {

return cache;

}



public void setCache(Ehcache cache) {

this.cache = cache;

}



@Override

public void clear() throws ObjectStoreException {

// TODO Auto-generated method stub



}



}


    1. Define the Mule caching strategy

In order to use the newly created object store, the next step is to configure a caching strategy, as shown below.

<ee:object-store-caching-strategy name="Caching_Strategy" 
keyGenerationExpression="#[flowVars.key]"doc:name="Caching Strategy">

<custom-object-store class="com.db.cache. 
EmployeeEhcacheObjectStore">

<spring:property name="cache" ref="cache" />

</custom-object-store>

</ee:object-store-caching-strategy>


Sub Flow Elements:

Serial No.

Element Type

Display Name

Configurations

1

Cache

Cache

Caching strategy reference: Reference to a strategy

Name: Caching_Strategy

Object Store: core:custom-object-store class=com.db.cache.EmployeeEhcacheObjectStore

Spring property:

Name: cache

Reference: cache

Key Expression: #[flowVars.key]

Wrapped Element

Configurations

Variable

Name: msg

Value: #['ROWID: '+payload.getRowid().substring(1)+', EMPLOYEE_ID: '+payload.getEmployeeId()+', FIRSTNAME: '+payload.getFirstName()+', LASTNAME: '+payload.getLastName()+', DEPARTMENT_ID: '+payload.getDepartmentId()+', SALARY: '+payload.getSalary()]

Logger

Message: #[payload.getRowid().charAt(0) == 'I'? flowVars.msg +' loaded in cache' :flowVars.msg+' modified in cache']

Sub Flow XML:

<sub-flow name="Ehcache_Sub_Flow">

<ee:cache cachingStrategy-ref="Caching_Strategy" doc:name="Cache">

<set-variable variableName="msg"

value="#['ROWID: '+payload.getRowid().substring(1)+', EMPLOYEE_ID: '+payload.getEmployeeId()+', FIRSTNAME: '+payload.getFirstName()+', LASTNAME: '+payload.getLastName()+', DEPARTMENT_ID: '+payload.getDepartmentId()+', SALARY: '+payload.getSalary()]"

doc:name="Variable" />

<logger

message="#[payload.getRowid().charAt(0) == 'I' ? flowVars.msg +' loaded in cache' : flowVars.msg+' modified in cache']"

level="INFO" doc:name="Logger" />

</ee:cache>

</sub-flow>


  1. Initialize_Cache_Flow

This flow is used to initialize the cache with all the employee records of HR.EMPLOYEES database table as POJOs. The ROWID is considered as the object’s key to store them in the caching strategy.

Image title


The Transform Message component uses com.db.cache.Employee POJO as the transformed Java object. The complete code is shown below:

package com.db.cache;

@SuppressWarnings("serial")

public class Employee implements java.io.Serializable {

private int employeeId;

private int departmentId;

private String firstName;

private String lastName;

private float salary;

private String rowid;



public void setEmployeeId(int employeeId) {

this.employeeId = employeeId;

}



public int getEmployeeId() {

return employeeId;

}



public void setDepartmentId(int departmentId) {

this.departmentId = departmentId;

}



public int getDepartmentId() {

return departmentId;

}



public void setFirstName(String firstName) {

this.firstName = firstName;

}



public String getFirstName() {

return firstName;

}



public void setLastName(String lastName) {

this.lastName = lastName;

}



public String getLastName() {

return lastName;

}



public void setSalary(float salary) {

this.salary = salary;

}



public float getSalary() {

return salary;

}

​
public void setRowid(String rowid) {

this.rowid = rowid;

}


public String getRowid() {

return rowid;

}

}


Parse Template File: InitCacheResult.html

Place this HTML template file in src/main/resources project folder.

<html>

<head>

<meta http-equiv="Content-Type"

content="text/html; charset=WINDOWS-1252">

<meta name="generator" content="SQL*Plus 11.2.0">

<style type='text/css'>

body {

font: 10pt Arial, Helvetica, sans-serif;

color: black;

background: White;

}



p {

font: 10pt Arial, Helvetica, sans-serif;

color: black;

background: White;

}



table, tr, td {

font: 10pt Arial, Helvetica, sans-serif;

color: Black;

background: #f7f7e7;

padding: 0px 0px 0px 0px;

margin: 0px 0px 0px 0px;

}



th {

font: bold 10pt Arial, Helvetica, sans-serif;

color: #336699;

background: #cccc99;

padding: 0px 0px 0px 0px;

}



h1 {

font: 16pt Arial, Helvetica, Geneva, sans-serif;

color: #336699;

background-color: White;

border-bottom: 1px solid #cccc99;

margin-top: 0pt;

margin-bottom: 0pt;

padding: 0px 0px 0px 0px;

-

}



h2 {

font: bold 10pt Arial, Helvetica, Geneva, sans-serif;

color: #336699;

background-color: White;

margin-top: 4pt;

margin-bottom: 0pt;

}



a {

font: 9pt Arial, Helvetica, sans-serif;

color: #663300;

background: #ffffff;

margin-top: 0pt;

margin-bottom: 0pt;

vertical-align: top;

}

</style>

<title>Cache Initialization Result</title>

</head>

<body>

<h1>Cache initialized with the following employee records:</h1>

<p>

<table BORDER=2>

<tr>

<th scope="col">ROWID</th>

<th scope="col">DEPARTMENT_ID</th>

<th scope="col">EMPLOYEE_ID</th>

<th scope="col">FIRST_NAME</th>

<th scope="col">LAST_NAME</th>

<th scope="col">SALARY</th>

</tr>

#[payload]

</table>

<p>

</body>

</html >


Use the URL http://localhost:8081/Init to initialize the cache. After the initialization is done, the flow returns all the initialized employee records as an HTML report with the help of a Parse Template, as shown below.

Image title


Flow Elements:

Serial No.

Element Type

Display Name

Configurations

1

HTTP Inbound endpoint

HTTP

Connector Configuration:

Name: HTTP_Listener_Configuration

Protocol: HTTP

Host: Default

Port: 8081

Path: /Init

2

Database Connector

Database

Connector configurations: Oracle Configuration

Name: Oracle_Configuration

Host: ${db.host}

Port: ${db.port}

Instance: ${db.instance}

User: ${db.user}

Password: ${db.password}

Enable DataSense: Checked

Oracle Driver: ojdbc6.jar

Operation: Select

Query Type: Parameterized

Parameterized query: select'I'||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID fromEMPLOYEES

3

Transform Message

Transform Message

%dw 1.0

%output application/java

---

payload map ((payload01 ,indexOfPayload01) -> {





departmentId: payload01.DEPARTMENT_ID as:number default 0,




employeeId: payload01.EMPLOYEE_ID as:number,


firstName: payload01.FIRST_NAME default'',

lastName: payload01.LAST_NAME default '',


rowid: payload01."'I'||ROWID",




salary: payload01.SALARY as :numberdefault 0

as :object {class:"com.db.cache.Employee"})

4

For Each

For Each

Collection: #[payload]

Counter Variable Name: Default

Batch Size: Default

Root Message Variable Name: Default


Wrapped Element

Configuration

Variable

Display Name: HTML

Name: html

Value: #[flowVars['html'] + '<tr><td align="right">' +payload.getRowid().substring(1) + '</td><td align="right">'+payload.getDepartmentId() + '</td><td align="right">' +payload.getEmployeeId() + '</td><td>'+payload.getFirstName()+'</td><td>'+payload.getLastName()+'</td><td align="right">'+payload.getSalary()+'</td></tr>']

Variable

Display Name: Search Key

Name: key

Value: #[payload.getRowid()]

Flow Reference

Display Name: Ehcache_Sub_Flow

Flow name: Ehcache_Sub_Flow

5

Set Payload

Set Payload

Value: #[flowVars.html.substring(4)]

6

Parse Template

Parse Template

Location: src\main\resources\InitCacheResult.html

Flow XML:

<flow name="Initialize_Cache_Flow">

<http:listener config-ref="HTTP_Listener_Configuration"

path="/Init" doc:name="HTTP" />

<db:select config-ref="Oracle_Configuration" doc:name="Database">

<db:parameterized-query><![CDATA[select 'I'||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from EMPLOYEES]]></db:parameterized-query>

</db:select>

<dw:transform-message metadata:id="c21542f5-3971-4fc6-a5c1-26324e0b0b00"

doc:name="Transform Message">

<dw:set-payload><![CDATA[%dw 1.0

%output application/java

---

payload map ((payload01 , indexOfPayload01) -> {



departmentId: payload01.DEPARTMENT_ID as :number default 0,



employeeId: payload01.EMPLOYEE_ID as :number,



firstName: payload01.FIRST_NAME default '',

lastName: payload01.LAST_NAME default '',



rowid: payload01."'I'||ROWID",



salary: payload01.SALARY as :number default 0

} as :object {class: "com.db.cache.Employee"})]]></dw:set-payload>

</dw:transform-message>

<foreach doc:name="For Each">

<set-variable variableName="html"

value="#[flowVars['html'] + '&lt;tr&gt;&lt;td align=&quot;right&quot;&gt;' + payload.getRowid().substring(1) + '&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;'+ payload.getDepartmentId() + '&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;' +payload.getEmployeeId() + '&lt;/td&gt;&lt;td&gt;'+payload.getFirstName()+'&lt;/td&gt;&lt;td&gt;'+payload.getLastName()+'&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;'+payload.getSalary()+'&lt;/td&gt;&lt;/tr&gt;']"

doc:name="HTML" />

<set-variable variableName="key" value="#[payload.getRowid()]"

doc:name="Search Key" />

<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />

</foreach>

<set-payload value="#[flowVars.html.substring(4)]"

doc:name="Set Payload" mimeType="text/html" />

<parse-template

location="src\main\resources\InitCacheResult.html"

doc:name="Parse Template" />

</flow>


  1. Search_Cache_Flow

This flow is used to search the employee records from the cache with ROWID as the search key. Use the URL http://localhost:8081/cache?key=ROWID.

Image title


Parse Template File: SearchCacheResult.html

Place this HTML template file in src/main/resources project folder.

<html>

<head>

<meta http-equiv="Content-Type"

content="text/html; charset=WINDOWS-1252">

<meta name="generator" content="SQL*Plus 11.2.0">

<style type='text/css'>

body {

font: 10pt Arial, Helvetica, sans-serif;

color: black;

background: White;

}



p {

font: 10pt Arial, Helvetica, sans-serif;

color: black;

background: White;

}



table, tr, td {

font: 10pt Arial, Helvetica, sans-serif;

color: Black;

background: #f7f7e7;

padding: 0px 0px 0px 0px;

margin: 0px 0px 0px 0px;

}



th {

font: bold 12pt Arial, Helvetica, sans-serif;

color: #336699;

background: #cccc99;

padding: 0px 0px 0px 0px;

}



h1 {

font: 16pt Arial, Helvetica, Geneva, sans-serif;

color: #336699;

background-color: White;

border-bottom: 1px solid #cccc99;

margin-top: 0pt;

margin-bottom: 0pt;

padding: 0px 0px 0px 0px;

-

}



h2 {

font: bold 10pt Arial, Helvetica, Geneva, sans-serif;

color: #336699;

background-color: White;

margin-top: 4pt;

margin-bottom: 0pt;

}



a {

font: 9pt Arial, Helvetica, sans-serif;

color: #663300;

background: #ffffff;

margin-top: 0pt;

margin-bottom: 0pt;

vertical-align: top;

}

</style>

<title>Search Cache Result</title>

</head>

<body>

<h1>The following are the Cache search result based on Rowid:</h1>

<p>

<table BORDER=2>

<tr>

<th scope="col">ROWID</th>

<th scope="col">DEPARTMENT_ID</th>

<th scope="col">EMPLOYEE_ID</th>

<th scope="col">FIRST_NAME</th>

<th scope="col">LAST_NAME</th>

<th scope="col">SALARY</th>

</tr>

#[payload]

</table>

<p>

</body>

</html>


The search result is obtained with the help of a Parse Template, as shown below:

Image title


Flow Elements:

Serial No.

Element Type

Display Name

Configurations

1

HTTP Inbound endpoint

HTTP

Connector Configuration:

Name: HTTP_Listener_Configuration

Protocol: HTTP

Host: Default

Port: 8081

Path: /cache

2

Variable

Variable

Name: key

Value: #[message.inboundProperties.'http.query.params'.key]

3

Flow Reference

Ehcache_Sub_Flow

Flow name: Ehcache_Sub_Flow

4

For Each

For Each

Collection: #[payload]

Counter Variable Name: Default

Batch Size: Default

Root Message Variable Name: Default


Wrapped Element

Configuration

Variable

Display Name: HTML

Name: finalString

Value: #[flowVars['finalString'] + '<tr><td align="right">' +payload.getRowid().substring(1) +'</td><td align="right">' +payload.getDepartmentId() + '</td><td align="right">' +payload.getEmployeeId() + '</td><td>'+payload.getFirstName()+'</td><td>'+payload.getLastName()+'</td><td align="right">'+payload.getSalary()+'</td></tr>']

5

Set Payload

Set Payload

Value: #[flowVars.finalString.substring(4)]

6

Parse Template

Parse Template

Location: src\main\resources\SearchCacheResult.html

Flow XML:

<flow name="Search_Cache_Flow">

<http:listener config-ref="HTTP_Listener_Configuration"

path="/cache" doc:name="HTTP" />

<set-variable variableName="key"

value="#[message.inboundProperties.'http.query.params'.key]"

doc:name="Variable" />

<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />

<foreach collection="#[payload]" doc:name="For Each">

<set-variable variableName="finalString"

value="#[flowVars['finalString'] + '&lt;tr&gt;&lt;td align=&quot;right&quot;&gt;' + payload.getRowid().substring(1) +'&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;' + payload.getDepartmentId() + '&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;' +payload.getEmployeeId() + '&lt;/td&gt;&lt;td&gt;'+payload.getFirstName()+'&lt;/td&gt;&lt;td&gt;'+payload.getLastName()+'&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;'+payload.getSalary()+'&lt;/td&gt;&lt;/tr&gt;']"

doc:name="Variable" />

</foreach>

<set-payload value="#[flowVars.finalString.substring(4)]"

doc:name="Set Payload" />

<parse-template

location="src\main\resources\SearchCacheResult.html"

doc:name="Parse Template" />

</flow>


Complete Mule Application XML Code (DatabaseChangeNotification.xml):

<?xml version="1.0" encoding="UTF-8"?>



<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"

xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"xmlns:metadata="http://www.mulesoft.org/schema/mule/metadata"

xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"

xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"

xmlns:tcp="http://www.mulesoft.org/schema/mule/tcp" xmlns="http://www.mulesoft.org/schema/mule/core"

xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"

xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd

http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd

http://www.mulesoft.org/schema/mule/tcp http://www.mulesoft.org/schema/mule/tcp/current/mule-tcp.xsd

http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd

http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd

http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd

http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">



<configuration doc:name="Configuration">

<expression-language autoResolveVariables="true">

<import class="org.mule.util.StringUtils" />

</expression-language>

</configuration>



<spring:beans>



<spring:bean id="cacheManager" name="cacheManager"

class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"

scope="singleton" />



<spring:bean id="cache" name="cache"

class="org.springframework.cache.ehcache.EhCacheFactoryBean" scope="singleton">

<spring:property name="cacheManager" ref="cacheManager" />

<spring:property name="cacheName" value="dbCache" />

<spring:property name="maxElementsInMemory" value="10000" />

<spring:property name="eternal" value="true" />

<spring:property name="timeToIdle" value="120" />

<spring:property name="timeToLive" value="120" />

<spring:property name="overflowToDisk" value="true" />

<spring:property name="maxElementsOnDisk" value="10000000" />

<spring:property name="diskPersistent" value="false" />

<spring:property name="diskExpiryThreadIntervalSeconds"

value="120" />

<spring:property name="memoryStoreEvictionPolicy"

value="LRU" />

</spring:bean>

</spring:beans>



<ee:object-store-caching-strategy

name="Caching_Strategy" keyGenerationExpression="#[flowVars.key]"

doc:name="Caching Strategy">

<custom-object-store class="com.db.cache.EmployeeEhcacheObjectStore">

<spring:property name="cache" ref="cache" />

</custom-object-store>

</ee:object-store-caching-strategy>



<tcp:connector name="TCP" validateConnections="true"

sendBufferSize="0" receiveBufferSize="0" receiveBacklog="0"

clientSoTimeout="10000" serverSoTimeout="10000" socketSoLinger="0"

doc:name="TCP">

</tcp:connector>

<object-to-string-transformer name="Object_to_String"

doc:name="Object to String" />

<db:oracle-config name="Oracle_Configuration" host="${db.host}"

port="${db.port}" instance="${db.instance}" user="${db.user}"

password="${db.password}" doc:name="Oracle Configuration" />

<http:listener-config name="HTTP_Listener_Configuration"

host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration" />

<flow name="DatabaseChangeNotificationFlow">

<tcp:inbound-endpoint exchange-pattern="one-way"

host="${tcp.host}" port="${tcp.port}" connector-ref="TCP"

responseTimeout="10000" transformer-refs="Object_to_String" doc:name="DCN Receiver" />

<set-payload value="#[message.payload]" doc:name="Set Payload" />

<splitter expression="#[StringUtils.split(payload, ',')]"

doc:name="Splitter" />

<set-variable variableName="flag" value="#[payload.charAt(0)]"

doc:name="Insert Update Flag" />

<set-variable variableName="rowid" value="#[payload.substring(1)]"

doc:name="Rowid" />

<choice doc:name="Choice">

<when expression="#[flowVars.flag == &quot;I&quot;]">

<async doc:name="Async-Insert Cache">

<flow-ref name="Insert_Update_Cache_Sub_Flow" doc:name="Insert_Update_Cache_Sub_Flow" />

</async>

</when>

<when expression="#[flowVars.flag == &quot;U&quot;]">

<async doc:name="Async-Update Cache">

<flow-ref name="Insert_Update_Cache_Sub_Flow" doc:name="Insert_Update_Cache_Sub_Flow" />

</async>

</when>

<when expression="#[flowVars.flag == &quot;D&quot;]">

<processor-chain doc:name="Delete Cache">

<expression-transformer

expression="#[app.registry.Caching_Strategy.getStore().remove(flowVars.rowid)]"

doc:name="Expression" />

<logger

message="Employee record with ROWID #[flowVars.rowid] is removed from cache"

level="INFO" doc:name="Logger" />

</processor-chain>

</when>

</choice>

</flow>

<sub-flow name="Insert_Update_Cache_Sub_Flow">

<db:select config-ref="Oracle_Configuration" doc:name="Database">

<db:parameterized-query><![CDATA[select ''||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from EMPLOYEES where ROWID = #[payload.substring(1)]]]></db:parameterized-query>

</db:select>

<dw:transform-message metadata:id="c5016f4b-6468-4137-b96a-601feaca71f9"

doc:name="Transform Message">

<dw:set-payload><![CDATA[%dw 1.0

%output application/java

---

payload map ((payload01 , indexOfPayload01) -> {

departmentId: payload01.DEPARTMENT_ID as :number default 0,



employeeId: payload01.EMPLOYEE_ID as :number,





firstName: payload01.FIRST_NAME default "",



lastName: payload01.LAST_NAME default "",



rowid: flowVars.flag ++ payload01."''||ROWID",



salary: payload01.SALARY as :number default 0

} as :object {class: "com.db.cache.Employee"})]]></dw:set-payload>

</dw:transform-message>

<foreach doc:name="For Each">

<set-variable variableName="key" value="#[payload.getRowid()]"

doc:name="Cache Key" />

<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />

</foreach>

</sub-flow>

<sub-flow name="Ehcache_Sub_Flow">

<ee:cache cachingStrategy-ref="Caching_Strategy" doc:name="Cache">

<set-variable variableName="msg"

value="#['ROWID: '+payload.getRowid().substring(1)+', EMPLOYEE_ID: '+payload.getEmployeeId()+', FIRSTNAME: '+payload.getFirstName()+', LASTNAME: '+payload.getLastName()+', DEPARTMENT_ID: '+payload.getDepartmentId()+', SALARY: '+payload.getSalary()]"

doc:name="Variable" />

<logger

message="#[payload.getRowid().charAt(0) == 'I' ? flowVars.msg +' loaded in cache' : flowVars.msg+' modified in cache']"

level="INFO" doc:name="Logger" />

</ee:cache>

</sub-flow>

<flow name="Search_Cache_Flow">

<http:listener config-ref="HTTP_Listener_Configuration"

path="/cache" doc:name="HTTP" />

<set-variable variableName="key"

value="#[message.inboundProperties.'http.query.params'.key]"

doc:name="Variable" />

<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />

<foreach collection="#[payload]" doc:name="For Each">

<set-variable variableName="finalString"

value="#[flowVars['finalString'] + '&lt;tr&gt;&lt;td align=&quot;right&quot;&gt;' + payload.getRowid().substring(1) +'&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;' + payload.getDepartmentId() + '&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;' +payload.getEmployeeId() + '&lt;/td&gt;&lt;td&gt;'+payload.getFirstName()+'&lt;/td&gt;&lt;td&gt;'+payload.getLastName()+'&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;'+payload.getSalary()+'&lt;/td&gt;&lt;/tr&gt;']"

doc:name="Variable" />

</foreach>

<set-payload value="#[flowVars.finalString.substring(4)]"

doc:name="Set Payload" />

<parse-template

location="src\main\resources\SearchCacheResult.html"

doc:name="Parse Template" />

</flow>

<flow name="Initialize_Cache_Flow">

<http:listener config-ref="HTTP_Listener_Configuration"

path="/Init" doc:name="HTTP" />

<db:select config-ref="Oracle_Configuration" doc:name="Database">

<db:parameterized-query><![CDATA[select 'I'||ROWID,EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY, DEPARTMENT_ID from EMPLOYEES]]></db:parameterized-query>

</db:select>

<dw:transform-message metadata:id="c21542f5-3971-4fc6-a5c1-26324e0b0b00"

doc:name="Transform Message">

<dw:set-payload><![CDATA[%dw 1.0

%output application/java

---

payload map ((payload01 , indexOfPayload01) -> {



departmentId: payload01.DEPARTMENT_ID as :number default 0,



employeeId: payload01.EMPLOYEE_ID as :number,



firstName: payload01.FIRST_NAME default '',

lastName: payload01.LAST_NAME default '',



rowid: payload01."'I'||ROWID",



salary: payload01.SALARY as :number default 0

} as :object {class: "com.db.cache.Employee"})]]></dw:set-payload>

</dw:transform-message>

<foreach doc:name="For Each" collection="#[payload]">

<set-variable variableName="html"

value="#[flowVars['html'] + '&lt;tr&gt;&lt;td align=&quot;right&quot;&gt;' + payload.getRowid().substring(1) + '&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;'+ payload.getDepartmentId() + '&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;' +payload.getEmployeeId() + '&lt;/td&gt;&lt;td&gt;'+payload.getFirstName()+'&lt;/td&gt;&lt;td&gt;'+payload.getLastName()+'&lt;/td&gt;&lt;td align=&quot;right&quot;&gt;'+payload.getSalary()+'&lt;/td&gt;&lt;/tr&gt;']"

doc:name="HTML" />

<set-variable variableName="key" value="#[payload.getRowid()]"

doc:name="Search Key" />

<flow-ref name="Ehcache_Sub_Flow" doc:name="Ehcache_Sub_Flow" />

</foreach>

<set-payload value="#[flowVars.html.substring(4)]"

doc:name="Set Payload" mimeType="text/html" />

<parse-template

location="src\main\resources\InitCacheResult.html"

doc:name="Parse Template" />

</flow>

</mule>


Conclusion

Finally, let us run the DatabaseChangeNotificationClient.java and the Mule application and justify with the help of the log reports and search facility that the cache is refreshed when the DML operations are performed at the back end on the database object HR.EMPLOYEES associated with the registered SQL query for database change notifications.

  1. Run DatabaseChangeNotificationClient.java as a java application.Image title

  2. Run DatabaseChangeNotification.xml as Mule application.Image title

  3. SQL Insert DML operation on HR.EMPLOYEES table.Image title

  4. Mule Logger reports that the SQL Insert has refreshed the Mule cache with the two new records. You can also check with the URL http://localhost:8081/cache?key=ROWID to view the record obtained from the cache. (Note: Substitute the ROWID in the URL with the actual ROWID from the log below).

INFO 2016-02-17 00:00:24,065 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.1.03] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADPAAG, EMPLOYEE_ID: 223, FIRSTNAME: Christopher, LASTNAME: Lee, DEPARTMENT_ID: 0, SALARY: 1200.0 loaded in cache

INFO 2016-02-17 00:00:24,127 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.1.04] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADPAAH, EMPLOYEE_ID: 224, FIRSTNAME: Prashant, LASTNAME: Jha, DEPARTMENT_ID: 0, SALARY: 1200.0 loaded in cache

  1. SQL Update DML operation on HR.EMPLOYEES table.Image title
  2. Mule Logger reports that the SQL update has refreshed the Mule cache with the four modified records. You can also check with the URL http://localhost:8081/cache?key=ROWID to view the record obtained from the cache. (Note: Substitute the ROWID in the URL with the actual ROWID from the log below).

INFO 2016-02-17 00:11:30,514 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.2.02] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADMAAA, EMPLOYEE_ID: 207, FIRSTNAME: Peck, LASTNAME: Gregory, DEPARTMENT_ID: 90, SALARY: 7700.0 modified in cache

INFO 2016-02-17 00:11:30,650 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.2.03] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADNAAA, EMPLOYEE_ID: 100, FIRSTNAME: Steven, LASTNAME: King, DEPARTMENT_ID: 90, SALARY: 7700.0 modified in cache

INFO 2016-02-17 00:11:30,716 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.2.05] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADNAAC, EMPLOYEE_ID: 102, FIRSTNAME: Lex, LASTNAME: De Haan, DEPARTMENT_ID: 90, SALARY: 7700.0 modified in cache

INFO 2016-02-17 00:11:30,799 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.2.04] org.mule.api.processor.LoggerMessageProcessor: ROWID: AAAEAbAAEAAAADNAAB, EMPLOYEE_ID: 101, FIRSTNAME: Neena, LASTNAME: Kochhar, DEPARTMENT_ID: 90, SALARY: 7700.0 modified in cache

  1. SQL Delete DML operation on HR.EMPLOYEES table.Image title
  2. Mule Logger reports that the SQL delete has refreshed the Mule cache with the two deleted records. You can also check with the URL http://localhost:8081/cache?key=ROWID to view the result. In this case you will see the result as an exception stating the message as NullPayload (Note: Substitute the ROWID in the URL with the actual ROWID from the log below).

INFO 2016-02-17 00:19:01,898 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.stage1.05] org.mule.api.processor.LoggerMessageProcessor: Employee record with ROWID AAAEAbAAEAAAADPAAG is removed from cache

INFO 2016-02-17 00:19:01,906 [[DatabaseChangeNotification].DatabaseChangeNotificationFlow.stage1.05] org.mule.api.processor.LoggerMessageProcessor: Employee record with ROWID AAAEAbAAEAAAADPAAH is removed from cache

Email: pbghosh67@gmail.com

Interested in reducing database costs by moving from Oracle Enterprise to open source subscription?  Read the total cost of ownership (TCO) analysis. Brought to you in partnership with MariaDB.

Topics:
mule 3.7 ,mule ,database ,oracle

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}