Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Embedded ActiveMQ Broker with Mule

DZone's Guide to

Embedded ActiveMQ Broker with Mule

· Integration Zone
Free Resource

Modernize your application architectures with microservices and APIs with best practices from this free virtual summit series. Brought to you in partnership with CA Technologies.

In this post I will show how an embedded ActiveMQ broker is automatically started when using the broker URL “vm://localhost” in a Mule JMS connector, without any additional configuration, and how to disable this behaviour. I will also look at how to explicitly configure an embedded ActiveMQ broker in a Mule configuration file.

As documented in the Mule documentation under ActiveMQ Integration, Mule will automatically start an embedded instance of ActiveMQ if an ActiveMQ JMS connector is defined with a broker URL equal to “vm://localhost”. The same holds if a regular JMS connector is created with a reference to an ActiveMQ connection factory with the URL above.
This behaviour can be verified using the following steps:

  • Create a Mule project in an IDE with the Mule Studio plug-in installed.
  • Add the ActiveMQ-all JAR-file to the library path of the new project.
  • In src/main/resources of the new project, create a file named “log4j.xml” with the following contents:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration>
    <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
        <param name="Target" value="System.out" />
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d %-5p %-30C - %m%n" />
        </layout>
    </appender>
    
    <logger name="org.mule">
        <level value="INFO"/>
    </logger>
    
    <logger name="org.apache.activemq">
        <level value="INFO"/>
    </logger>
    
    <root>
        <priority value="WARN" />
        <appender-ref ref="stdout" />
    </root>
</log4j:configuration>

Paste the following into the Mule configuration file of the new project:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:spring="http://www.springframework.org/schema/beans"
    xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:test="http://www.mulesoft.org/schema/mule/test"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd" version="CE-3.4.0">

    <spring:beans>
        <spring:bean name="amq-connection-factory"
            class="org.apache.activemq.ActiveMQConnectionFactory">
            <spring:property name="brokerURL" value="vm://localhost"/>
        </spring:bean>
    </spring:beans>
    
    <jms:connector name="my-jms-connector"
        connectionFactory-ref="amq-connection-factory"
        specification="1.1">
    </jms:connector>
</mule>
  • Right-click the Mule configuration file and select Run As -> Mule Application.
  • Stop the Mule application.

Looking at the log output in the console, the following can be observed:

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ Starting app 'testembeddedjms'                           +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2014-06-26 14:53:46,385 INFO  org.mule.module.logging.DispatchingLogger - Starting ResourceManager
2014-06-26 14:53:46,387 INFO  org.mule.module.logging.DispatchingLogger - Started ResourceManager
2014-06-26 14:53:46,496 INFO  org.mule.module.logging.DispatchingLogger - Loaded the Bouncy Castle security provider.
2014-06-26 14:53:46,611 INFO  org.mule.module.logging.DispatchingLogger - JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
2014-06-26 14:53:46,657 INFO  org.mule.module.logging.DispatchingLogger - PListStore:[/Volumes/BigHD/Development/Workspace/testembeddedjms/activemq-data/localhost/tmp_storage] started
2014-06-26 14:53:46,661 INFO  org.mule.module.logging.DispatchingLogger - Using Persistence Adapter: KahaDBPersistenceAdapter[/Volumes/BigHD/Development/Workspace/testembeddedjms/activemq-data/localhost/KahaDB]
2014-06-26 14:53:46,868 INFO  org.mule.module.logging.DispatchingLogger - KahaDB is version 5
2014-06-26 14:53:46,879 INFO  org.mule.module.logging.DispatchingLogger - Recovering from the journal ...
2014-06-26 14:53:46,882 INFO  org.mule.module.logging.DispatchingLogger - Recovery replayed 29 operations from the journal in 0.011 seconds.
2014-06-26 14:53:47,031 INFO  org.mule.module.logging.DispatchingLogger - Apache ActiveMQ 5.9.1 (localhost, ID:computer.lan-12345-123456789-0:1) is starting
2014-06-26 14:53:47,036 INFO  org.mule.module.logging.DispatchingLogger - Apache ActiveMQ 5.9.1 (localhost, ID:computer.lan-12345-123456789-0:1) started
2014-06-26 14:53:47,036 INFO  org.mule.module.logging.DispatchingLogger - For help or more information please see: http://activemq.apache.org
2014-06-26 14:53:47,062 INFO  org.mule.module.logging.DispatchingLogger - Connector vm://localhost started
2014-06-26 14:53:47,070 INFO  org.mule.module.logging.DispatchingLogger - Connected: JmsConnector

As can be seen in lines 13 and 14, an instance of ActiveMQ is started. This behaviour is implemented in the ActiveMQ class org.apache.activemq.transport.vm.VMTransportFactory and is thus not something Mule controls.
In some cases it can be desirable not to automatically start an embedded ActiveMQ, for instance if this is taken care of by another part of the application. This can be accomplished by adding the “create=false” parameter to the broker URL.
The modified Mule configuration file looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:spring="http://www.springframework.org/schema/beans"
    xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:test="http://www.mulesoft.org/schema/mule/test"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd" version="CE-3.4.0">

    <spring:beans>
        <spring:bean name="amq-connection-factory"
            class="org.apache.activemq.ActiveMQConnectionFactory">
            <spring:property name="brokerURL" value="vm://localhost?create=false"/>
        </spring:bean>
    </spring:beans>
    
    <jms:connector name="my-jms-connector"
        connectionFactory-ref="amq-connection-factory"
        specification="1.1">
    </jms:connector>
</mule>

If we now run the Mule application again, the result will be an exception saying that there is no broker named “localhost”:

Exception in thread "main" org.mule.module.launcher.DeploymentStartException: IOException: Broker named 'localhost' does not exist.
    at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:170)
    at org.mule.module.launcher.application.ApplicationWrapper.start(ApplicationWrapper.java:107)
    at org.mule.module.launcher.DefaultMuleDeployer.deploy(DefaultMuleDeployer.java:48)
    at org.mule.tooling.server.application.ApplicationDeployer.run(ApplicationDeployer.java:58)
    at org.mule.tooling.server.application.ApplicationDeployer.main(ApplicationDeployer.java:91)

In order to fix this error and assuming that more control over the embedded ActiveMQ instance is desired, the Mule configuration file is again modified to include an embedded ActiveMQ broker. The result looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:spring="http://www.springframework.org/schema/beans"
    xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:test="http://www.mulesoft.org/schema/mule/test"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd" version="CE-3.4.0">

    <spring:beans>
        <amq:broker id="amq-broker"
            useJmx="false"
            persistent="false"
            restartAllowed="false"
            useShutdownHook="true"
            startAsync="true">
            <amq:destinationPolicy>
                <amq:policyMap>
                    <amq:policyEntries>
                        <amq:policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
                            <amq:deadLetterStrategy>
                                <amq:individualDeadLetterStrategy
                                    queuePrefix="DLQ." useQueueForQueueMessages="true" />
                            </amq:deadLetterStrategy>
                        </amq:policyEntry>
                    </amq:policyEntries>
                </amq:policyMap>
            </amq:destinationPolicy>
            <amq:transportConnectors>
                <amq:transportConnector uri="vm://localhost"/>
            </amq:transportConnectors>
        </amq:broker>
    
        <spring:bean name="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="maximumRedeliveries" value="-1"/>
            <spring:property name="initialRedeliveryDelay" value="500"/>
            <spring:property name="useExponentialBackOff" value="false"/>
            <spring:property name="backOffMultiplier" value="5"/>
            <spring:property name="maximumRedeliveryDelay" value="-1"/>
        </spring:bean>
        
        <spring:bean name="amq-connection-factory"
            class="org.apache.activemq.ActiveMQConnectionFactory"
            depends-on="amq-broker">
            <spring:property name="brokerURL" value="vm://localhost?create=false"/>
        </spring:bean>
    </spring:beans>
    
    <jms:connector name="my-jms-connector"
        connectionFactory-ref="amq-connection-factory"
        specification="1.1">
    </jms:connector>
</mule>

Since the <amq:broker> element is used to define the embedded ActiveMQ broker, we also need to add the xbean-spring library to the classpath of the project. An appropriate version of this library can be downloaded using this link.
For additional information on how to configure ActiveMQ using XML configuration, please refer to the ActiveMQ Documentation.

If the Mule application is run again, it should start up without errors and the following can be seen in the console:

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ Starting app 'testembeddedjms'                           +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2014-06-26 15:17:28,392 INFO  org.mule.module.logging.DispatchingLogger - Starting ResourceManager
2014-06-26 15:17:28,394 INFO  org.mule.module.logging.DispatchingLogger - Started ResourceManager
2014-06-26 15:17:28,457 INFO  org.mule.module.logging.DispatchingLogger - Connector vm://localhost started
2014-06-26 15:17:28,459 INFO  org.mule.module.logging.DispatchingLogger - Apache ActiveMQ 5.9.1 (localhost, ID:computer.lan-12345-123456789-0:2) is starting
2014-06-26 15:17:28,471 INFO  org.mule.module.logging.DispatchingLogger - Connected: JmsConnector
{
  name=my-jms-connector
  lifecycle=initialise
  this=12345678
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[jms]
  serviceOverrides=<none>
}

On row 7 of the above log fragment, we can see that an embedded instance of ActiveMQ is (again) started.

Finally I just want to add that the “create=false” parameter is not necessary if you just want to customize the embedded ActiveMQ broker within a Mule application – ActiveMQ is smart enough not to start up a second broker instance if one already exists at the same host.
Also, the automatic starting of an embedded broker instance is specific to ActiveMQ and will not happen when using other JMS brokers with Mule unless implemented by the broker.

The Integration Zone is proudly sponsored by CA Technologies. Learn from expert microservices and API presentations at the Modernizing Application Architectures Virtual Summit Series.

Topics:

Published at DZone with permission of Ivan K. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}