Transactional Caching for Camel With Infinispan
Join the DZone community and get the full member experience.
Join For FreeSome time ago I created
a Redis connector for Camel. Redis is an awesome key-value store (and a
lot more) but then I needed a cache running in the same JVM as Camel and
noticed Infinispan, which has just switched to ASL v2.
There are already other connectors in Camel for caching on the JVM,
like Hazelcast and EHCache, but if you are already using Camel as part
of other Red Hat products or want to see how LIRS eviction overperforms
LRU, Infinispan is worth trying.
Briefly, Infinispan is transactional in-memory key-value store and data
grid. When used in embedded mode, Infinispan resides in the same JVM as
Camel and allows Camel consumer to receive cache change notifications:
<route> <from uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders&eventTypes=CACHE_ENTRY_CREATED"/> <filter> <simple>${out.header.CamelInfinispanIsPre} == true</simple> <to uri="log:com.mycompany.order?showHeaders=true"/> </filter> </route>
In the example above, when a cache entry is created, Infinispan will
fire two events - one before and one after the cache entry has been
created. It is also possible to receive the events synchronously,
meaning in the same thread that processes the cache action, or
asynchronously in a separate thread without blocking the cache action.
Using Infinispan as a local cache is simple, it exposes a ConcurrentMap
interface and has the usual expiration, eviction, passivation,
persistent store, querying, etc features. What makes Infinispan also a
data grid is the ability of the nodes to discover other nodes and
replicate or distribute data among themselves. Replication allows
sharing data across a cluster whereas distribution uses consistent
hashing algorithm to achieve better scalability.
In client-server mode, Infinispan is running as standalone application,
and Camel producer can send messages using Infinispan's Hot Rod client.
Hot Rod is a binary, language neutral, intelligent protocol, allowing
interaction with Infinisnap servers in topology and
hash-distribution-aware fashion.
The Infinispan producer in Camel currently offers GET, PUT, REMOVE and CLEAR operations. Here is an example of the producer putting data to orders cache:
<route> <from uri="direct:orderCache"/> <setHeader headerName="CamelInfinispanKey"> <simple>${in.header.orderId}</simple> </setHeader> <setHeader headerName="CamelInfinispanValue"> <simple>${in.header.orderTotal}</simple> </setHeader> <setHeader headerName="CamelInfinispanOperation"> <simple>CamelInfinispanOperationPut</simple> </setHeader> <to uri="infinispan://localhost?caseName=orders"/> </route>
Let's create a more interesting example. Infinispan is also JTA
compliant and can participate in transactions. We will create a REST API
for registering persons, which will first persist the person in a
relational database using Camel sql component, and then will put the
firstName into Infinispan cache in the same transaction. We will do that
using a transacted Camel route, so if an error occurs during
routing, at any stage, Camel will make sure the transaction(for the
cache and the database) is rolled back, so that the database and the
cache are always in a consistent state.
<route> <from uri="restlet:/persons?restletMethod=POST"/> <transacted/> <!-- PERSIST TO DB --> <to uri="sql:insert into person(firstName, lastName) values(:#firstName,:#lastName)?dataSource=#dataSource"/> <!-- DAMN EXCEPTION THROWER--> <filter> <simple>${in.header.lastName} == "damn"</simple> <throwException ref="damn"/> </filter> <!-- PUT TO CACHE --> <to uri="sql:select id from person WHERE id = (select max(id) from person)?dataSource=#dataSource"/> <setHeader headerName="personId"> <simple>${body[0][ID]}</simple> </setHeader> <setHeader headerName="CamelInfinispanKey"> <simple>${headerAs(personId, String)}</simple> </setHeader> <setHeader headerName="CamelInfinispanValue"> <simple>${in.header.firstName}</simple> </setHeader> <setHeader headerName="CamelInfinispanOperation"> <simple>CamelInfinispanOperationPut</simple> </setHeader> <to uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders"/> </route>
As you can see there is no magic or extra configuration in the route, it is a standard
route. We have small bit of code that throws an exception when the
person lastName is damn to simulate errors in the middle of the route.
The application runs in a standalone mode with atomikos JTA transaction manager. First we create a JtaTransactionManager
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/> <bean id="userTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"/> <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <constructor-arg ref="userTransaction"/> <constructor-arg ref="userTransactionManager"/> </bean>
Then wrap our DataSource with it:
public AtomikosDataSourceBean atomikosDataSourceBean() throws Exception { EmbeddedXADataSource ds = new EmbeddedXADataSource(); ds.setCreateDatabase("create"); ds.setDatabaseName("target/testdb"); ds.setUser(""); ds.setPassword(""); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(ds); xaDataSource.setUniqueResourceName("xaDerby"); return xaDataSource; }
and using a TransactionManagerLookup tell Infinispan to participate in the same transaction:
public BasicCacheContainer basicCacheContainer() throws Throwable { GlobalConfiguration glob = new GlobalConfigurationBuilder().nonClusteredDefault().build(); Configuration loc = new ConfigurationBuilder() .transaction().transactionMode(TransactionMode.TRANSACTIONAL) .transactionManagerLookup(new TransactionManagerLookup() { @Override public TransactionManager getTransactionManager() throws Exception { return jtaTransactionManager.getTransactionManager(); } }).build(); return new DefaultCacheManager(glob, loc, true); }
After all this boilerplate code, we have our datasource, cache and Camel
route participating in the same transaction. To see the full REST
example with two phase commit and rollback, get the source code from github and play with it.
BTW, the Camel-infinispan component is still not part of Camel trunk, to run the example you will need that too.
Published at DZone with permission of Bilgin Ibryam, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
What ChatGPT Needs Is Context
-
Revolutionizing Algorithmic Trading: The Power of Reinforcement Learning
-
Implementing a Serverless DevOps Pipeline With AWS Lambda and CodePipeline
-
How To Check IP Addresses for Known Threats and Tor Exit Node Servers in Java
Comments