DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
The Latest "Software Integration: The Intersection of APIs, Microservices, and Cloud-Based Systems" Trend Report
Get the report
  1. DZone
  2. Data Engineering
  3. AI/ML
  4. JMS Clustering by Example

JMS Clustering by Example

Felipe Gutierrez user avatar by
Felipe Gutierrez
·
May. 26, 10 · Interview
Like (0)
Save
Tweet
Share
15.55K Views

Join the DZone community and get the full member experience.

Join For Free

It's amazing how the JBoss Team put together an easy way to do JMS Clustering, out of the box!!.

 I'll start with an easy example, creating a Queue named "MyClusteredQueue".

In this example I'm using JBoss AS 5.1. and two computers connected on the same network, with these IP's:

- Computer A: 192.168.0.143
- Computer B: 192.168.0.210

So, here are the steps:

1) Install the JBoss on both computers. We are going to use the "all" configuration for both computers.

2) We create our Queue on both servers.

Go to $JBOSS_HOME/server/all/deploy/messaging/  and edit the destinations-service.xml file. Add the MyClusteredQueue before the last server tag. It looks like this:

<!-- Cluster JMS -->
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=MyClusteredQueue"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
<attribute name="Clustered">true</attribute>
</mbean>

 

This is how you add a Queue to the JBoss, and the people how are familiar with this, the only new thing is to add the attribute "Clustered".  This step must be set on both computers. At the end of the article you can find the files.

3) Write the MDB  to consume the messages, and deploy it on the two computers. (I'm using an EJB 3 - MDB style).

 

import java.net.InetAddress;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.log4j.Logger;

/**
* @author felipeg
*
*/
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="queue/MyClusteredQueue")
})
public class JMSClusterClientHandler implements MessageListener {
Logger log = Logger.getLogger(JMSClusterClientHandler.class);

@Override
public void onMessage(Message message) {

try{
if (message instanceof ObjectMessage)
{
InetAddress addr = InetAddress.getLocalHost();
log.info("########## Processing Host: " + addr.getHostName() + " ##########" );

ObjectMessage objMessage = (ObjectMessage) message;
Object obj = objMessage.getObject();

log.info("Object received:" + obj.toString());
}
} catch (Exception e) {
e.printStackTrace();
}

}

}

 

4) Start the jboss with the following options:

Computer A:
$ cd $JBOSS_HOME/bin
$ ./run.sh -c all -b 192.168.0.143 -Djboss.messaging.ServerPeerID=1

Computer B:
$ cd $JBOSS_HOME/bin
$ ./run.sh -c all -b 192.168.0.210 -Djboss.messaging.ServerPeerID=2


It is necesary to give an ID to each server and this is accomplished with this directive:
-Djboss.messaging.ServerPeerID

When you start the jboss on computer A, you should see the logs (server.log) telling you that there is one node ready and listening, and once you start the jboss on computer B, on the log will appear the two nodes, the two IP's ready to consume messages.

5) Now it's time to send a Message to the Queue. To accomplish this it's necessary to change the connection factory to "ClusteredConnectionFactory" (JMSDispatcher.java - See the code below).

Also on the jndi.properties (if you are using the default InitialContext) file it's necessary to add the two computers ip's separated by comma to the java.naming.provider.url property. (In my case a create a Properties variable and I set all the necessary properties, JMSDispatcher.java - see the code below).

java.naming.provider.url=192.168.0.143:1099,192.168.0.210:1099


The client that I wrote is a web application, that consist in one index.jsp page, which contains a form that prompts you for the name of the queue, the type of messaging (Queue or Topic), the server ip and port, how many times it will send the message and the actual message to be sent; also the web application has a Servlet (JMSClusteredClient.java - see code below) that receives the postback and helper class (JMSDispatcher.java - see code below) that sends the message to the jboss servers.
 
You can to deploy it in any computer. In my case I deployed it on the Computer A. And you can access it through this URL: http://192.168.0.143:8080/JMSWeb/ (just modify the IP where the client war was deployed).
If you notice (on the index.jsp - code below) I've already put some default values that reflects the name of the Queue, and the IP's of my two computers. Now, If you increment the number of times that the message will be sent (maybe a 10) and fill out the message box, and click "Send" you should see on the two servers some of the messages being consumed by the MDB.
 
Here are the Files to create the client:
index.jsp
 

<html>
<body>
<div>
<form method="POST" action='<%= request.getRequestURI() + "JMSClusteredClient" %>'>
<fieldset>
<legend>JMS Clustered - Test Client</legend>
<table>
<tr>
<td>Server:</td><td><input type="text" name="server" value="192.168.0.143:1099,192.168.0.210:1099" /></td>
</tr>
<tr>
<td>
<select name="messageType">
<option value="QUEUE" selected="selected">Queue</option>
<option value="TOPIC" >Topic</option>
</select>
</td>
<td><input type="text" name="topicqueue" value="queue/MyClusteredQueue" /></td>
</tr>
<tr>
<td>Times:</td><td><input type="text" name="times" value="3" /></td>
</tr>
<tr>
<td>Message:</td><td><textarea rows="3" cols="20" name="message"></textarea></td>
</tr>
</table>
<input type="submit" value="Send">
</fieldset>
</form>
</div>
</body>
</html>
 
Servlet:  JMSClusteredClient.java
 
public class JMSClusteredClient extends HttpServlet {
private static final long serialVersionUID = 1L;

/**
* @see HttpServlet#service(HttpServletRequest request, HttpServletResponse response)
*/
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
PrintWriter out = response.getWriter();

String topicqueue = request.getParameter("topicqueue");
String message = request.getParameter("message");
String server = request.getParameter("server");
String messageType = request.getParameter("messageType");
String times = request.getParameter("times");

int intTimes = Integer.parseInt(times);

JMSDispatcher dispatcher = new JMSDispatcher();
dispatcher.setTopicQueueName(topicqueue);
dispatcher.setServer(server);
dispatcher.setMessageType(messageType);

try {

for(int count =1; count <= intTimes;count++){
dispatcher.sendMessage( count + " of " + times + " " + message);
}
out.println("Message [" + message + "] sent successfully to [" + topic + "] to the [" + server + "] server " + times + " times.");
} catch (JMSException e) {
e.printStackTrace();
out.println("Error:" + e.getMessage());
} catch (NamingException e) {
out.println("Error:" + e.getMessage());
e.printStackTrace();
} finally{
out.close();
}
}

}
 
A utility to send the messages: JMSDispatcher.java
 
public class JMSDispatcher {

/**
*
*/
private static final long serialVersionUID = 7105145023422143880L;
private static Logger log = Logger.getLogger(JMSDispatcher.class);


private final String CONNECTION_FACTORY_CLUSTERED = "ClusteredConnectionFactory";
private final String CONNECTION_FACTORY = "ConnectionFactory";

private final String TOPIC = "TOPIC";
private final String QUEUE = "QUEUE";

private String topicQueueName;
private String server;
private String messageType;


public void setTopicQueueName(String value){
this.topicQueueName = value;
}

public void setServer(String value){
this.server = value;
}

public void setMessageType(String value){
this.messageType = value;
}

public void sendMessage(Object objectMessage) throws JMSException, NamingException{
log.debug("##### Setting up a Queue/Topic Message: #####");
if (TOPIC.equals(messageType)){
sendTopicMessage(objectMessage);
} else if (QUEUE.equals(messageType)){
sendQueueMessage(objectMessage);
}
log.debug("##### Publishing Message: Done #####");
}


private void sendQueueMessage(Object objectMessage) throws JMSException, NamingException{
try{

InitialContext initialContext = getInitialContext();

QueueConnectionFactory qcf = (QueueConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
QueueConnection queueConn = qcf.createQueueConnection();
Queue queue = (Queue) initialContext.lookup(topicQueueName);
QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueConn.start();

QueueSender send = queueSession.createSender(queue);
ObjectMessage om = queueSession.createObjectMessage((Serializable)objectMessage);
setMessageProperties(om);
log.debug("##### Publishing Message to a Queue: " + queueName + "#####");
send.send(om);
send.close();

queueConn.stop();
queueSession.close();
queueConn.close();
}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
}

}

private void sendTopicMessage(Object objectMessage) throws JMSException, NamingException{

try{
InitialContext initialContext = getInitialContext();

TopicConnectionFactory tcf = (TopicConnectionFactory)initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
TopicConnection topicConn = tcf.createTopicConnection();
Topic topic = (Topic) initialContext.lookup(topicQueueName);
TopicSession topicSession = topicConn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE);
topicConn.start();

TopicPublisher send = topicSession.createPublisher(topic);

ObjectMessage om = topicSession.createObjectMessage();
om.setObject((Serializable)objectMessage);
setMessageProperties(om);
log.debug("##### Publishing Message to a Topic: " + topicName + "#####");
send.publish(om);
send.close();

topicConn.stop();
topicSession.close();
topicConn.close();

}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
}
}

private InitialContext getInitialContext() throws NamingException{
Properties jboss = new Properties();
jboss.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
jboss.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
jboss.put("java.naming.provider.url", server);
return new InitialContext(jboss);

}
}
 
And the web.xml
 
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">
<display-name>JMSWeb</display-name>
<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>

<servlet>
<description></description>
<display-name>JMSClusteredClient</display-name>
<servlet-name>JMSClusteredClient</servlet-name>
<servlet-class>com.blogspot.felipeg48.jms.web.JMSClusteredClient</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>JMSClusteredClient</servlet-name>
<url-pattern>/JMSClusteredClient</url-pattern>
</servlet-mapping>
</web-app>
 
 Happy Clustering!!
Computer clustering JBoss

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • DZone's Article Submission Guidelines
  • How to Submit a Post to DZone
  • The Path From APIs to Containers
  • Spring Boot, Quarkus, or Micronaut?

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: