WSO2 Enterprise Integrator With a Message Broker Profile
Make the WSO2 Enterprise Integrator work for you. Let's make use of message broker profiles as we set up a system to test your endpoints.
Join the DZone community and get the full member experience.
Join For FreeWSO2 Enterprise Integrator is shipped with a separate message broker profile (WSO2 MB). In this post, I will be using a message broker profile in EI (6.3.0).
Setting Up the Message Broker Profile
Copy the following JAR files from the <EI_HOME>/wso2/broker/client-lib/ directory to the <EI_HOME>/lib/ directory.
andes-client-3.2.13.jar
geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
org.wso2.securevault-1.0.0-wso2v2.jar
Open the <EI_HOME>/conf/jndi.properties file and add the following line after the queue.MyQueue = example.MyQueue line:
queue.JMSMS=JMSMS
Open axis2.xml in <EI_HOME>/confconf/axis2/axis2.xml and uncomment configure to JMS transport support with WSO2 EI Broker Profile. There you will find transportReceiver and transportSender.
Add a transport.jms.SessionTransacted value for each transportReceiver:
<parameter name="transport.jms.SessionTransacted">true</parameter>
Eg:
<parameter name="myQueueConnectionFactory" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">conf/jndi.properties</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
</parameter>
Start the EI and Broker profiles.
Implementation
First, we will put a message in the queue in WSO2 MB with the API:
<?xml version="1.0" encoding="UTF-8"?>
<api context="/dl-test" name="dl-test" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<inSequence>
<log level="custom">
<property name="property_name" value="DL-test API is called" />
</log>
<property xmlns="http://ws.apache.org/ns/synapse" name="HEADER" value="VALUE" scope="transport" type="STRING" />
<property description="xml" name="ContentType" scope="axis2" type="STRING" value="application/xml" />
<property name="messageType" scope="axis2" type="STRING" value="application/xml" />
<property name="OUT_ONLY" scope="default" type="STRING" value="true" />
<property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true" />
<send>
<endpoint>
<address uri="jms:/jmsms?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=conf/jndi.properties&transport.jms.DestinationType=queue" />
</endpoint>
</send>
</inSequence>
<outSequence />
<faultSequence>
<log level="custom">
<property name="property_name" value="faultSequence - test API is hitted" />
</log>
</faultSequence>
</resource>
</api>
Then create an inbound endpoint for this queue:
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="abc-inbound-ep" onError="c2b-1-integration-v1-common-fault-sequence" protocol="jms" sequence="test-seq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="interval">1000</parameter>
<parameter name="sequential">true</parameter>
<parameter name="coordination">true</parameter>
<parameter name="transport.jms.Destination">jmsms</parameter>
<parameter name="transport.jms.CacheLevel">3</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
<parameter name="java.naming.provider.url">conf/jndi.properties</parameter>
<parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.SubscriptionDurable">false</parameter>
<parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
<parameter name="transport.jms.SharedSubscription">false</parameter>
</parameters>
</inboundEndpoint>
Here is a sample Inbound sequence:
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="test-seq" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log level="custom">
<property name="property_name" value="test-seq ABC Queue is called" />
<property expression="$trp:HEADER" name="property_name" />
</log>
<log level="full" />
</sequence>
Then improve the sequence to send the value (XML) to the endpoint.
If the endpoint has issues, the message should retry and move to the Dead Letter Channel.
The Dead Letter Channel (DLC) is a subset of a queue specifically designed to persist messages that are typically marked for deletion, providing you with a choice of whether to delete, retrieve, or reroute the messages from the DLC.
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="test-seq" onError="test-fault-sequence" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log level="custom">
<property name="property_name" value="test-seq ABC Queue is hitted" />
<property expression="$trp:HEADER" name="property_name" />
</log>
<log level="full" />
<log level="custom">
<property name="Message" value="test-proxy is hitted" />
</log>
<property name="OUT_ONLY" scope="default" type="STRING" value="true" />
<call blocking="true">
<endpoint key="dl-test-ep" />
</call>
</sequence>
Add a new sequence for errors that sends messages to the DLC.
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="test-fault-sequence" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<log level="full">
<property name="MESSAGE" value="Executing default "fault" sequence" />
<property expression="get-property('ERROR_CODE')" name="ERROR_CODE" />
<property expression="get-property('ERROR_MESSAGE')" name="ERROR_MESSAGE" />
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" />
<log level="custom">
<property name="Transaction Action" value="Rollbacked" />
</log>
</sequence>
Or you can have proxy listen for the Queue (Inbound is much better):
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="test-proxy" startOnLoad="true" transports="http https jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<log level="custom">
<property value="test-proxy is hitted" name="Message" />
</log>
<property name="OUT_ONLY" scope="default" type="STRING" value="true" />
<call blocking="true">
<endpoint key="dl-test-ep" />
</call>
</inSequence>
<outSequence />
<faultSequence>
<log level="full">
<property name="MESSAGE" value="Executing default "fault" sequence" />
<property expression="get-property('ERROR_CODE')" name="ERROR_CODE" />
<property expression="get-property('ERROR_MESSAGE')" name="ERROR_MESSAGE" />
</log>
<property name="SET_ROLLBACK_ONLY" scope="axis2" type="STRING" value="true" />
<log level="custom">
<property name="Transaction Action" value="Rollbacked" />
</log>
</faultSequence>
</target>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
</proxy>
Test the use case:
Make a small change to the endpoint for testing:
<?xml version="1.0" encoding="UTF-8"?>
<endpoint name="dl-test-ep" xmlns="http://ws.apache.org/ns/synapse">
<http method="get" uri-template="http://bacde.com/175" />
</endpoint>
Wrong URI
Then you will see it retrying 10 times.
If you go to the MB Dead Letter Channel, you will find the message.
Home -> Manage -> Dead Letter Channel -> List:
You can restore your message, delete any message, or reroute.
Once you restore, you will see that it worked (after fixing EP URL)
Published at DZone with permission of Madhuka Udantha, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments