A JMS Connection Leak Scenario in Camel
Join the DZone community and get the full member experience.
Join For FreeI came across this interesting problem today and want to capture it.
Suppose this simple Camel route, consuming from ActiveMQ and logging the message:
<camelcontext id="camelContext" xmlns="http://camel.apache.org/schema/spring"> <route id="jms-consumer4"> <from uri="amq:queue:Test" /> <to uri="log:JMSConsumer?level=INFO&showBody=true" /> </route> </camelcontext> <bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amq"> <property name="connectionFactory" ref="singleCF" /> <property name="useSingleConnection" value="true" /> <property name="usePooledConnection" value="false" /> <property name="preserveMessageQos" value="true" /> </bean> <bean class="org.springframework.jms.connection.SingleConnectionFactory" id="singleCF"> <property name="targetConnectionFactory" ref="AMQCF" /> <property name="reconnectOnException" value="true" /> </bean> <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AMQCF"> <property name="userName" value="admin" /> <property name="password" value="admin" /> <property name="brokerURL" value="tcp://localhost:61616" /> <property name="copyMessageOnSend" value="false" /> <property name="useAsyncSend" value="true" /> </bean>
After deploying this to ServiceMix (JBoss Fuse 6.0 in my test) it works just fine.
The problem occurs when stopping this Camel route (via an osgi:stop of
the corresponding bundle). The connection into the ActiveMQ broker
does not get closed! Drilling into the JMX view of the broker the
connection is still registered in JMX.
Even worse each restart and stop of the Camel route leaks another
connection.
I did a bit of root cause analysis and found:
When the Camel route is stopped it calls into
SingleConnectionFactory.destroy(). This cleans up the internally held
ActiveMQConnection. At this stage the connection is properly removed
from the broker's JMX view, which you will probably only notice when
debugging through the code.
However the Spring JMS listener used by the Camel JMS consumer is
still running and it detects that the connection is down and tries to
transparently reconnect. This calls into
SingleConnectionFactory.createConnection again (full stack trace below
[1]). The SingleConnectionFactory does happily reopen a new connection
into the broker which then remains registered in JMX and open although the route gets stopped.
So how do you resolve this?
Instead of using Springs SingleConnectionFactory I recommend to use ActiveMQs PooledConnectionFactory instead. So the above Camel route configuration becomes<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amq"> <property name="connectionFactory" ref="pooledCF" /> <property name="useSingleConnection" value="true" /> <property name="usePooledConnection" value="false" /> <property name="preserveMessageQos" value="true" /> </bean> <bean class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" id="pooledCF" init-method="start"> <property name="connectionFactory" ref="AMQCF" /> <property name="maxConnections" value="1" /> </bean> <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AMQCF"> <property name="userName" value="admin" /> <property name="password" value="admin" /> <property name="brokerURL" value="tcp://localhost:61616" /> <property name="copyMessageOnSend" value="false" /> <property name="useAsyncSend" value="true" /> </bean>
When using the ActiveMQs PooledConnectionFactory instead, things behave
pretty much the same with one subtle but important difference.
Similar to above, stopping the Camel bundle calls into
PooledConnectionFactory.stop(). This internally closes all
ActiveMQConnections (only one in this example case but potentially
more) which also unregisters the connection from the brokers JMX view.
Now, Springs JMS listener used by the Camel JMS consumer is still
running and detects the connection closure and tries to transparently
reconnect. This calls into PooledConnectionFactory.createConnection().
This implementation however contains the following check:
if (stopped.get()) { LOG.debug("PooledConnectionFactory is stopped, skip create new connection."); return null; }
AtomicBoolean stopped will be set to true and no new connection is established!
Springs SingleConnectionFactory does not have this logic. It happily
reopens a new connection after it got destroyed.
Please note the properties init-method="start" destroy-method="stop" on
the PooledConnectionFactory bean definition are important as otherwise
you may also leak connections when shutting down your bundles.
[1] Daemon Thread [Camel (camelContext) thread #21 - JmsConsumer[EwdTest1]] (Suspended (breakpoint at line 280 in org.springframework.jms.connection.SingleConnectionFactory)) owns: java.lang.Object (id=9254) owns: java.lang.Object (id=9255) owns: java.lang.Object (id=9286) org.springframework.jms.connection.SingleConnectionFactory.initConnection() line: 280 org.springframework.jms.connection.SingleConnectionFactory.createConnection() line: 225 org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.support.JmsAccessor).createConnection() line: 184 org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.AbstractJmsListeningContainer).createSharedConnection() line: 404 org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.AbstractJmsListeningContainer).refreshSharedConnection() line: 389 org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.DefaultMessageListenerContainer).refreshConnectionUntilSuccessful() line: 869 org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.DefaultMessageListenerContainer).recoverAfterListenerSetupFailure() line: 851 org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 982 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(java.lang.Runnable) line: 895 java.util.concurrent.ThreadPoolExecutor$Worker.run() line: 918 java.lang.Thread.run() line: 680
Published at DZone with permission of Torsten Mielke, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments