Composite Source in Mule
Composite Source in Mule
Learn about how and when to use the composite source scope in Mule flows to listen to multiple channels for incoming messages.
Join the DZone community and get the full member experience.Join For Free
How to Transform Your Business in the Digital Age: Learn how organizations are re-architecting their integration strategy with data-driven app integration for true digital transformation.
Composite source is a scope available with Mule ESB which listens to multiple channels for incoming messages.
Composite source is when multiple inbound endpoints are configured to listen to the request or work as a message source. Composite source is a collection of inbound endpoints which listen for the request on different endpoints, and when any of the inbound endpoints receives a message, it triggers the flow.
We can have n endpoints, which are configured as composite source and ready to accept messages on any inbound endpoint.
How It Works
To accept incoming messages from multiple input channels, place two or more message sources (also known as receivers) into a composite source. A message entering the composite source on any supported channel triggers the processing flow.
There can be a case where business is expecting the message to receive from the multiple sources. For example, you can receive business orders from various customers and everyone has to send a message through different channels like File Share, Web Service or REST Call, JMS, FTP or SFTP, etc. But the way of handling or business process defined for each customer is same, so we can use the composite source in such a case.
<composite-source> <file:inbound-endpoint ... /> <http:inbound-endpoint ... /> <jms:inbound-endpoinT ... /> </composite-source>
Let's walk through how to use a composite source with Mule Flow.
First, search for "composite source" in Mule Palette and drag and drop it to the source region of the flow.
Now, drag and drop the file connector in message processor region. Whenever an incoming message is received by any listener, it will pass to next processor. In this case, the message will be passed to the file connector in the message processor region.
Now we can drag and drop multiple connectors under the composite source (i.e. wrap multiple connectors with the composite source). Here, we will use one HTTP Listener and one File Connector as inbound endpoints and one JMS (ActiveMQ) endpoint to receive messages from different sources. In this application, we are using choice flow control and, based on the inbound properties of the inbound endpoint, we are dynamically routing the messages.
Below are the flow diagram and code for the application.
Choice flow control dynamically routes messages based on message payload or properties.
<mule xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd"> <file:connector name="File" autoDelete="true" doc:name="File" /> <http:listener-config name="HTTP_Listener_Configuration" host="localhost" port="8081" basePath="/composite" doc:name="HTTP Listener Configuration" /> <jms:activemq-connector name="Active_MQ" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ"> <receiver-threading-profile maxThreadsActive="200" maxBufferSize="100" /> </jms:activemq-connector> <flow name="compositeSourceFlow"> <composite-source doc:name="Composite Source"> <file:inbound-endpoint connector-ref="File" path="/file/in/processing/" moveToDirectory="/file/data/processed" doc:name="File"> <file:filename-wildcard-filter pattern="*.xml" caseSensitive="false" /> </file:inbound-endpoint> <http:listener config-ref="HTTP_Listener_Configuration" path="/composite" doc:name="HTTP"> </http:listener> <jms:inbound-endpoint queue="myqueue1" connector-ref="Active_MQ" doc:name="JMS"> <jms:selector expression="JMSPriority = 4" /> </jms:inbound-endpoint> </composite-source> <logger level="INFO" message="#[message]" doc:name="Logger"></logger> <choice doc:name="Choice"> <when expression="#[message.inboundProperties['http.query.params']]"> <logger level="INFO" message="Message is triggered from the Http Source" doc:name="Logger"></logger> <byte-array-to-string-transformer doc:name="Byte Array to String" /> </when> <when expression="#[message.inboundProperties['originalDirectory']]"> <logger level="INFO" message="Message is triggered from the File Channel" doc:name="Logger"></logger> <file:file-to-string-transformer doc:name="File to String" /> </when> <when expression="#[message.inboundProperties['JMSPriority']]"> <logger level="INFO" message="Message is triggered from the JMS call" doc:name="Logger"></logger> <byte-array-to-string-transformer doc:name="Byte Array to String" /> </when> <otherwise> <logger level="INFO" message="Message is triggered from the Unknown Source" doc:name="Logger"></logger> </otherwise> </choice> </flow> </mule>
Hope this helps.
Opinions expressed by DZone contributors are their own.