JMS Throttling With Camunda Application
In this article, take an in-depth look at how to perform JMS throttling with Camunda, which is running as a Spring boot application in Eclipse IDE.
Join the DZone community and get the full member experience.
Join For FreeIBM MQ exposes the JMS interface to connect with Camunda Spring boot using the mq-jms-spring-boot-starter
library as a dependency in the POM.xml file. The message being delivered to Camunda is modeled as XML- SOAP format and posted on queue through MQJ explorer. Here, MQJ explorer acts as a client to IBM MQ. We will have methods to start and stop the JMS listener, as well as methods to know the status and setting maxConnectionSize.
Prerequisites
- Eclipse (any version) with Maven capabilities
- Java 8+
- IBM MQ and MQJ Explorer
- Camunda
Installing Eclipse-IDE on Windows
- Click on the link: https://www.eclipse.org/downloads/download.php?file=/oomph/epp/2020-09/R/eclipse-inst-jre-win64.exe.
- Download the eclipse-inst-jre-win64.exe file and run the eclipse installer.
- Select Eclipse IDE for Eclipse committers and install.
Creating a Maven Project in Eclipse IDE
- Open the Eclipse IDE.
- Go to File > New > Project.
- Go to Maven -> Maven Project and click Next.
- Select your workspace location and click Next.
- Select quick start maven archetype and click Next.
- Enter Group Id, Artifact Id, and package name.
Group Id: Fill in a groupId for the project of your choice.
Artifact Id: Fill artifactId for the project of your choice
Package: Java package structure of your choice
The above process will create a project structure like below:
7. Create a package like com.example.demo.delegate
under src/main/java
folder and create a source folder src/main/resources
folder.
8. Place the CamundaApplication.java
file in the com.example.demo
package.
package com.example.demo;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.ProcessEngines;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.variable.Variables;
import org.camunda.bpm.engine.variable.value.ObjectValue;
import org.camunda.bpm.spring.boot.starter.annotation.EnableProcessApplication;
import org.camunda.bpm.spring.boot.starter.event.PostDeployEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
@ComponentScan("com.example.demo")
@SpringBootApplication(scanBasePackages = "com.example.demo.*",exclude={JmsAutoConfiguration.class})
@EnableProcessApplication()
@EnableJms
public class CamundaApplication {
Object lock = new Object();
boolean firstTimeP = true;
@Autowired
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
public static void main(String[] args) {
SpringApplication.run(CamundaApplication.class, args);
}
String xmlRequestPublishBilling = null;
@EventListener
public void onPostDeploy(PostDeployEvent event) {
synchronized (lock) {
lock.notifyAll();
firstTimeP = false;
}
}
@JmsListener(destination = "${ibm.mq.queue.publishBilling}")
public void listenerPublish(Object message) {
if (firstTimeP) {
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
try {
if (message instanceof com.ibm.jms.JMSBytesMessage) {
com.ibm.jms.JMSBytesMessage mess = (com.ibm.jms.JMSBytesMessage) message;
mess.acknowledge();
byte[] payload = new byte[(int) mess.getBodyLength()];
mess.readBytes(payload);
xmlRequestPublishBilling = new String(payload);
} else {
com.ibm.jms.JMSTextMessage mess = (com.ibm.jms.JMSTextMessage) message;
xmlRequestPublishBilling = mess.getText();
mess.acknowledge();
}
String transactionId = null;
byte[] byteArray;
byteArray = xmlRequestPublishBilling.getBytes("UTF-8");
ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray);
XMLInputFactory inputFactory = XMLInputFactory.newInstance();
XMLStreamReader streamReader = inputFactory.createXMLStreamReader(inputStream);
while (streamReader.hasNext()) {
// Move to next event
streamReader.next();
// Check if its 'START_ELEMENT'
if (streamReader.getEventType() == XMLStreamReader.START_ELEMENT) {
String tagName = streamReader.getLocalName();
if (tagName.equalsIgnoreCase("transactionId")) {
transactionId = streamReader.getElementText();
}
}
}
RuntimeService runtimeService = engine.getRuntimeService();
Map<String, Object> processVariableMap = new HashMap<String, Object>();
ObjectValue xmlValue = Variables.objectValue(xmlRequestPublishBilling)
.serializationDataFormat("application/xml").create();
processVariableMap.put(Constant.REQUEST, xmlValue);
runtimeService.createMessageCorrelation("RecpSrvc_Billing_Initiator_Message")
.processInstanceBusinessKey(transactionId).setVariables(processVariableMap).correlateStartMessage();
} catch (Exception e) {
e.printStackTrace();
}
}
}
9. Add the Constant.java
file in the com.example.demo
package.
package com.example.demo;
public class Constant {
// ****** Error State Constants*****
public static final String REQUEST = "request";
}
10. Add O2JMSController.java
file in com.example.demo
package.
package com.example.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQSimpleConnectionManager;
@RestController
@RequestMapping("/jms")
public class O2JMSController {
private static final Logger LOGGER = LoggerFactory.getLogger(O2JMSController.class);
@Autowired
JmsListenerEndpointRegistry jmsListenerEndpointRegistry;
static MQSimpleConnectionManager connectionMgr = null;
static int size = 40;
@RequestMapping(value = "/publishBilling/stop", method = RequestMethod.POST)
String stopPublishBilling(@RequestBody String command) {
MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");
container.stop();
return "PublishBilling JMS Listener Stopped";
}
@RequestMapping(value = "/publishBilling/start", method = RequestMethod.POST)
String startPublishBilling(@RequestBody String command) {
MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");
container.start();
return "PublishBilling JMS Listener Started";
}
@RequestMapping(value = "/publishBilling/status", method = RequestMethod.POST)
String getStatusPublishBilling(@RequestBody String command) {
MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");
boolean status = container.isRunning();
if (status) {
return "PublishBilling JMS Listener Running";
} else {
return "PublishBilling JMS Listener not Running";
}
}
@RequestMapping(value = "/maxConnectionSize", method = RequestMethod.POST)
String setMaxConnections(@RequestBody String connectionSize) {
size = Integer.parseInt(connectionSize);
stopPublishBilling(null);
if (connectionMgr == null) {
connectionMgr = new MQSimpleConnectionManager();
}
connectionMgr.setActive(MQSimpleConnectionManager.MODE_AUTO);
connectionMgr.setMaxConnections(size);
MQEnvironment.setDefaultConnectionManager(connectionMgr);
startPublishBilling(null);
return "maxConnections is set :: " + size;
}
@RequestMapping(value = "/maxConnectionSize", method = RequestMethod.GET)
String getMaxConnections() {
// int connectionSize = 0;
return "maxConnection size is :: " + size;
}
}
11. Add the BillingDelegate.java
file in com.example.demo.delegate
package.
package com.example.dem.delegate;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.camunda.bpm.engine.variable.value.ObjectValue;
import org.springframework.stereotype.Component;
import com.example.demo.Constant;
@Component("BillingDelegate")
public class BillingDelegate implements JavaDelegate {
public void execute(DelegateExecution execution) {
ObjectValue xmlRequestObj = (ObjectValue) execution.getVariableTyped(Constant.REQUEST);
String xmlRequest = (String) xmlRequestObj.getValue().toString();
System.out.println("Request :" + xmlRequest);
}
}
12. Add application.properties
, application.yaml
, and RcsBillingEventInitiator.bpmn
in /src/main/resources
folder.
application.properties
ibm.mq.queueManager=M0DCRMT3
ibm.mq.channel=BOSS.SVRCONN
ibm.mq.connName=iv4239.uname.telecom.co.nz(1434)
ibm.mq.user=
ibm.mq.password=
ibm.mq.pool.enabled=true
ibm.mq.pool.maxConnections=40
ibm.mq.pool.maxSessionsPerConnection=500
ibm.mq.queue.publishBilling=DEV/BOSS/PUBLISH_BILLING_EVENT
server.port=8080
application.yaml
spring.h2.console.enabled: true
spring.datasource:
url: jdbc:h2:./camunda-h2-database;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;AUTO_SERVER=TRUE;
username: sa
password: sa
camunda.bpm:
admin-user:
id: demo
password: demo
firstName: Demo
lastName: Demo
filter:
create: All Tasks
RcsBillingEventInitiator.bpmn
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" id="Definitions_0xx4klu" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="4.4.0">
<bpmn:collaboration id="Collaboration_1g0hs7s">
<bpmn:participant id="RCS_BillingEvent_Initiator" name="Receptor Service Billing Event Initiator" processRef="RCS_BillingEvent_Initiator_Group" />
</bpmn:collaboration>
<bpmn:process id="RCS_BillingEvent_Initiator_Group" isExecutable="true">
<bpmn:startEvent id="RecpSrvc_Billing_Initiator_Start" name="RecpSrvc Billing Initiator Start">
<bpmn:outgoing>Flow_1jexk4p</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_0qqrwpx" messageRef="Message_1suodic" />
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1jexk4p" sourceRef="RecpSrvc_Billing_Initiator_Start" targetRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" />
<bpmn:endEvent id="Event_1tqof64" name="End Eent">
<bpmn:incoming>Flow_06bsz4a</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_06bsz4a" sourceRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" targetRef="Event_1tqof64" />
<bpmn:serviceTask id="RecpSrvc_Billing_Initiator_DuplicateEventValidation" name="Validating the Received message" camunda:delegateExpression="#{BillingDelegate}">
<bpmn:incoming>Flow_1jexk4p</bpmn:incoming>
<bpmn:outgoing>Flow_06bsz4a</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:textAnnotation id="TextAnnotation_0kpt6hp">
<bpmn:text>Receives publish billing event from MQ series</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_0ee69pk" sourceRef="RecpSrvc_Billing_Initiator_Start" targetRef="TextAnnotation_0kpt6hp" />
<bpmn:textAnnotation id="TextAnnotation_1k38r2c">
<bpmn:text>Validating the received XML</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_1kyku2l" sourceRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" targetRef="TextAnnotation_1k38r2c" />
<bpmn:textAnnotation id="TextAnnotation_1qzlsj7">
<bpmn:text>Ending the flow</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_1my84xb" sourceRef="Event_1tqof64" targetRef="TextAnnotation_1qzlsj7" />
</bpmn:process>
<bpmn:message id="Message_1suodic" name="RecpSrvc_Billing_Initiator_Message" />
<bpmn:message id="Message_1vnhkcs" name="RecpSrvc_Billing_Pre_Processor_Msg" />
<bpmn:error id="Error_14k81br" name="BPMN_ERROR" errorCode="BPMN_ERROR" />
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_1g0hs7s">
<bpmndi:BPMNShape id="Participant_07448el_di" bpmnElement="RCS_BillingEvent_Initiator" isHorizontal="true">
<dc:Bounds x="160" y="80" width="670" height="350" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_1k38r2c_di" bpmnElement="TextAnnotation_1k38r2c">
<dc:Bounds x="530" y="150" width="100" height="40" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_1qzlsj7_di" bpmnElement="TextAnnotation_1qzlsj7">
<dc:Bounds x="710" y="220" width="100" height="30" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1jexk4p_di" bpmnElement="Flow_1jexk4p">
<di:waypoint x="246" y="327" />
<di:waypoint x="410" y="327" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_06bsz4a_di" bpmnElement="Flow_06bsz4a">
<di:waypoint x="510" y="327" />
<di:waypoint x="672" y="327" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Event_1btznrj_di" bpmnElement="RecpSrvc_Billing_Initiator_Start">
<dc:Bounds x="210" y="309" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="190" y="352" width="81" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_0kpt6hp_di" bpmnElement="TextAnnotation_0kpt6hp">
<dc:Bounds x="230" y="150" width="100" height="68" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1tqof64_di" bpmnElement="Event_1tqof64">
<dc:Bounds x="672" y="309" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="667" y="352" width="47" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1efnk0s_di" bpmnElement="RecpSrvc_Billing_Initiator_DuplicateEventValidation">
<dc:Bounds x="410" y="287" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Association_0ee69pk_di" bpmnElement="Association_0ee69pk">
<di:waypoint x="234" y="310" />
<di:waypoint x="263" y="218" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Association_1kyku2l_di" bpmnElement="Association_1kyku2l">
<di:waypoint x="484" y="287" />
<di:waypoint x="541" y="190" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Association_1my84xb_di" bpmnElement="Association_1my84xb">
<di:waypoint x="701" y="313" />
<di:waypoint x="749" y="250" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
13. Replace the pom.xml
with the below content.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>JMSCamundaThrottleDemo</groupId>
<artifactId>JMSCamundaThrottleDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>JMSCamundaThrottleDemo</name>
<description>JMSCamundaThrottleDemo</description>
<properties>
<camunda.version>7.14.0</camunda.version>
<cxf.version>3.3.6</cxf.version>
<camundaSpringBoot.version>7.14.0</camundaSpringBoot.version>
<springBoot.version>2.2.5.RELEASE</springBoot.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<version.java>1.8</version.java>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<failOnMissingWebXml>false</failOnMissingWebXml>
</properties>
<dependencies>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
<artifactId>camunda-bpm-spring-boot-starter-webapp</artifactId>
<version>7.16.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
<artifactId>camunda-bpm-spring-boot-starter-rest</artifactId>
<version>7.16.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.camunda.spin</groupId>
<artifactId>camunda-spin-dataformat-all</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.camunda.bpm</groupId>
<artifactId>camunda-engine-plugin-spin</artifactId>
<version>7.16.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.camunda.spin</groupId>
<artifactId>camunda-spin-core</artifactId>
<version>1.10.1</version>
</dependency>
<!-- PostgreSQL -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.22</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm/camunda-engine -->
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-spring-jaeger-cloud-starter</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.8</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>camunda-bpm-nexus</id>
<name>Camunda Maven Repository</name>
<url>https://app.camunda.com/nexus/content/groups/public</url>
</repository>
<!-- enable this for EE dependencies (requires credentials in ~/.m2/settings.xml)-->
<!-- <repository>
<id>camunda-bpm-nexus-ee</id>
<name>Camunda Enterprise Maven Repository</name>
<url>https://app.camunda.com/nexus/content/repositories/camunda-bpm</url>
</repository> -->
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.5.1</version>
</plugin>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>0.4.0</version>
</plugin>
</plugins>
</build>
</project>
Testing
- Run the Camunda Application.
- Starting the JMS listener:
3. Stopping the JMS listener:
4. Knowing the status:
5. Setting maxConnectionSize:
This concludes what we have learned about JMS throttling with Camunda Application.
Opinions expressed by DZone contributors are their own.
Comments