Over a million developers have joined DZone.

Spring Integration with JMS and Map Transformers

Build APIs from SQL and NoSQL or Salesforce data sources in seconds. Read the Creating REST APIs white paper, brought to you in partnership with CA Technologies.

In this article I explained how spring built-in transformers  works for while transforming object message to Map Message.
Sometimes the messages need to be transformed before they can be consumed to achieve a business purpose. For example, a producer uses a plain XML as its payload to produce a message, while a
consumer is interested in Java Object or types like plain text ,name-value pairs, or json model. Spring Integration provides endpoints such as Service Activators, Channel Adapters,

Message Bridges, Gateways, Transformers, Filters, and Routers. In this example how Transformers endpoint transform object message to map message. 

References:


High Level View


spring-mockrunner.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:c="http://www.springframework.org/schema/c"
	xmlns:context="http://www.springframework.org/schema/context"
	default-autowire="default"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">


	  <bean id="destinationManager" class="com.mockrunner.jms.DestinationManager"/>

      <bean id="inBoundQueue" factory-bean="destinationManager" factory-method="createQueue">
      		<constructor-arg index="0" value="MOCKRUNNER-IN-QUEUE" />
      </bean>
      
      <bean id="outBoundQueue" factory-bean="destinationManager" factory-method="createQueue">
      		<constructor-arg index="0" value="MOCKRUNNER-OUT-QUEUE" />
      </bean>
      
      <bean id="configurationManager" class="com.mockrunner.jms.ConfigurationManager"/>
      
      <bean id="jmsQueueConnectionFactory" class="com.mockrunner.mock.jms.MockQueueConnectionFactory">
      		<constructor-arg index="0" ref="destinationManager" />
      		<constructor-arg index="1" ref="configurationManager" />
      </bean>
      
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      	<property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
      </bean>
     
     <jms:listener-container  connection-factory="jmsQueueConnectionFactory" >
			<jms:listener destination="MOCKRUNNER-OUT-QUEUE" ref="mapMessageListener" method="onMessage" />
	  </jms:listener-container>
	  
	  <bean id="mapMessageListener" class="com.spijb.listener.MapMessageListener" />
      
</beans>
In spring-mockrunner.xml file, I defined MockQueue, MockQueueConnectionFactory for inbound queue, and outbound queue for quick testing purpose. inBoundQueue is where you will publish object message from  
ObjectToMapTransformerTest.java class. outBoundQueue where this queue expecting MapMessage type object and this queue is listing MapMessageListener.java class. for more information mockrunner works please check my previous article  Mockrunner with Spring JMS .
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.springframework.samples</groupId>
  <artifactId>spring-int-jms-basic</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <properties>

		<!-- Generic properties -->
		<java.version>1.6</java.version>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

		<!-- Spring -->
		<spring-framework.version>3.2.3.RELEASE</spring-framework.version>
		
		<!-- Logging -->
		<logback.version>1.0.13</logback.version>
		<slf4j.version>1.7.5</slf4j.version>

		<!-- Test -->
		<junit.version>4.11</junit.version>

	</properties>
	
	<dependencies>
		<!-- Spring and Transactions -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${spring-framework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-tx</artifactId>
			<version>${spring-framework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-core</artifactId>
			<version>2.2.4.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-jmx</artifactId>
			<version>2.2.4.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration </groupId>
			<artifactId>spring-integration-jms</artifactId>
			<version>2.2.4.RELEASE</version>
		</dependency>
		

		<!-- Logging with SLF4J & LogBack -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>${slf4j.version}</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>${logback.version}</version>
			<scope>runtime</scope>
		</dependency>

	
		<!-- Test Artifacts -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>${spring-framework.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.mockrunner</groupId>
			<artifactId>mockrunner-jms</artifactId>
			<version>1.0.3</version>
		</dependency>
		<dependency>
			<groupId>javax.jms</groupId>
			<artifactId>jms</artifactId>
			<version>1.1</version>
		</dependency>
		<dependency>
		    <groupId>org.codehaus.jackson</groupId>
		    <artifactId>jackson-mapper-asl</artifactId>
		    <version>1.9.3</version>
		    <scope>compile</scope>
		</dependency>

	</dependencies>	
</project>
spring-int-jms.xml
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:c="http://www.springframework.org/schema/c"
	xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:context="http://www.springframework.org/schema/context"
	default-autowire="default"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
		http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.2.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
		
	  <import resource="spring-mockrunner.xml"/> 
      
      <int:poller default="true" fixed-delay="50"/>

      <int:channel id="inputChannel">
      	<int:queue  capacity="5"/>
      </int:channel>
      
       <int:channel id="outputChannel">
      	<int:queue  capacity="5"/>
      </int:channel>

	<int:object-to-map-transformer id="objectToMapTransformer" input-channel="inputChannel" output-channel="outputChannel">
	</int:object-to-map-transformer>      
       
	 <int-jms:inbound-channel-adapter  id="inBoundJMSAdapter"  connection-factory="jmsQueueConnectionFactory"  destination="inBoundQueue" channel="inputChannel">
	 	<int:poller  fixed-rate="1000" />
	 </int-jms:inbound-channel-adapter>
	 
	 <int-jms:outbound-channel-adapter id="outBoundJMSAdapter" channel="outputChannel" connection-factory="jmsQueueConnectionFactory" destination="outBoundQueue">
	 	<int:poller  fixed-rate="1000" />
	 </int-jms:outbound-channel-adapter>
	 

</beans>
The endpoint is configured to connect to a JMS Server, fetch the messages,and publish them onto a local channel i.e inputChannel. Where as connection-factory, and destination referred MockQueueConnectionFactory, and MockQueue(inBoundQueue)  beans from spring-mockrunner.xml file.
  • inputChannel and outPutChannel defined as queue channel
  • objectToMapTransformer: object-to-map-transformer element that takes the payload from the input channel original here MOCKRUNNER-IN-QUEUE object message and emits a name-value paired Map object onto the output channel i.e outputChannel and outBoundJMSAdapter bean fetch this message and publish to Queue i.e MOCKRUNNER-OUT-QUEUE.
  • inBoundJMSAdapter: inbound-channel-adapter  bean is responsible for receiving messages from a JMS Server here it is reading from mock queue name MOCKRUNNER-IN-QUEUE see ObjectToMapTransformerTest.java class. 
  • outBoundJMSAdapter: outbound-channel-adapter bean is responsible to fetch messages from the channel i.e outputChannel and publish them to JMS Queue or Topic. In this outbounJMSAdapter reading message outputChannel as MapMessage and publish to outBoundQueue(MOCKRUNNER-OUT-QUEUE).

MapMessageListener.java

package com.spijb.listener;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SessionAwareMessageListener;

public class MapMessageListener implements SessionAwareMessageListener<MapMessage> {

   private static final Logger log = LoggerFactory.getLogger(MapMessageListener.class);
   @Override
   public void onMessage(MapMessage message, Session session) throws JMSException {
      
      log.info("Message Received \r\n"+message);
      
   }

}

it is plain MapMessageListener class to print received message from queue.

Department.java

package com.spijb.domain;

import java.io.Serializable;

public class Department implements Serializable{

   private static final long serialVersionUID = 1L;
   private final Integer deptno;
   private final String  name;
   private final String  location;
   
   public Department()
   {
      deptno=10;
      name="SALES";
      location="TX";
   }
   
   public Department(Integer dno,String name,String loc)
   {
      this.deptno=dno;
      this.name=name;
      this.location=loc;
   }

   public Integer getDeptno() {
      return deptno;
   }

   public String getName() {
      return name;
   }

   public String getLocation() {
      return location;
   }
   @Override
   public String toString()
   {
      return this.deptno+"-> "+this.name+"->"+this.location;
   }
   
}

Domain object  to send as a message, by default constructor assign deptno 10 , name as SALES, location as TX also provide parameter constructor.

Spring JUnit  class ObjectToMapTransformerTest.java 

package com.spijb.invoker;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.mockrunner.mock.jms.MockQueue;
import com.spijb.domain.Department;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:spring-mockrunner.xml","classpath:spring-int-jms.xml"})
public class ObjectToMapTransformerTest {
   
   @Autowired
   private JmsTemplate jmsTemplate;
   
   @Autowired
   private MockQueue inBoundQueue;
   
   
   @Test
   public void shouldSendMessage() throws InterruptedException
   {
     final Department defaultDepartment = new Department();
     jmsTemplate.send(inBoundQueue,new MessageCreator() {
      
      @Override
      public Message createMessage(Session session) throws JMSException {
        
         ObjectMessage objectMessage = session.createObjectMessage();
         objectMessage.setObject(defaultDepartment);
         return objectMessage;
      }
   });
     Thread.sleep(5000);
   }
}

Spring with JUNIT class where you can send message to inputChannel i.e inBoundQueue using MockRunner.

output :

INFO: started inBoundJMSAdapter
Oct 06, 2014 1:24:25 PM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
13:24:26.882 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO  c.spijb.listener.MapMessageListener - Message Received 
com.mockrunner.mock.jms.MockMapMessage: {location=TX, name=SALES, deptno=10}
Oct 06, 2014 1:24:30 PM org.springframework.context.support.AbstractApplicationContext doClose
INFO: Closing org.springframework.context.support.GenericApplicationContext@5840979b: startup date [Mon Oct 06 13:24:25 CDT 2014]; root of context hierarchy
Oct 06, 2014 1:24:30 PM org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup stop
INFO: Stopping beans in phase 2147483647

In the above highlighted one is output as map.

The Integration Zone is brought to you in partnership with CA Technologies.  Use CA Live API Creator to quickly create complete application backends, with secure APIs and robust application logic, in an easy to use interface.

Topics:

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}