Messaging Channels decouple message producers from message consumers. Message Channels can be Pollable or Subscribable. In this section we look at the Message Channel types provided out of the box by Spring Integration.
Channels
To create a Channel that allows you to broadcast Messages to any of its subscribers, you use the publish-subscribe-channel element from the core Spring Integration XML name space.
int:publish-subscribe-channel id="exampleChannel"/>
If you want to have a Channel in which a consumer can poll for messages, you create channel with a queue. Even if this channel has multiple consumers, only one of them will receive any Message sent to that channel. You can configure a queue channel in the following way.
int:channel id="queueChannel"
queue capacity="25"
If you want to have Messages sent based on a priority, you can configure a priority channel.
int:channel id="priorityChannel">
int:priority-queue capacity="20"
int:channel
A priority value in the message header is used for prioritizing messages. However, you can also provide a comparator to customize prioritization.
If you want to have a Channel with a queue associated with it, but at the same time want to also make sure that a Message is being handed off to a consumer before accepting new Messages, you need to create a rendezvous channel. The sending thread will be blocked until a consumer receives the messages. Here is how you may create one.
int:channel id="rendezvousChannel"
int:rendezvous-queue
int:channel
The default channel provided by Spring Integration is called the Direct Channel. It allows you to dispatch any Messages sent to it to a single subscriber, thus blocking the sender thread until the Message is subscribed. It is fairly simple to create a Direct Channel.
int:channel id="directChannel"
Direct Channel can also be configured with a load-balancing strategy (default uses a round-robin strategy) and supports failover. In some cases, you want to have a Channel that can be subscribed, but at the same time can also scale. In this case, Spring Integration allows you to attach a task executor with the channel definition. This Channel then essentially becomes an Executor Channel. This allows for crafting send methods that don't block and therefore the handler invocation most likely will not occur in the sender's thread. An example is below.
int:channel id="executorChannel"
int:dispatcher task-executor="someExecutor"
int:channel
MessageChannels can be further customized to restrict messages based on certain data types and scope. Here is an example of creating a Direct Channel that only dispatches messages of the type java.lang.String. Also, if a Spring conversion service is registered with the application context, an attempt will be made to convert the payload if an appropriate converter is available.
int:channel id="numberChannel" datatype="java.lang.Number"
Data types can be grouped together also, as in the following.
int:channel id="stringOrNumberChannel"
datatype="java.lang.String,java.lang.Number
Channel Adapter
A Channel Adapter is a Message Endpoint that connects a single sender or receiver to a Message Channel. Spring Integration provides adapters for a number of popular technologies, which we will briefly introduce later. However, there are two core channel adapters provided by core Spring Integration that provide you with Method-invoking Chanel Adapter support. On the inbound side, it is simply called the Inbound Chanel Adapter and on the outbound side, it is the Outbound Channel Adapter.
An Inbound Channel Adapter can invoke methods on a Spring-managed bean and send non-null values to a Message Channel as Spring Integration Messages. Here are some examples of configuring an Inbound Channel Adapter.
int:inbound-channel-adapter ref="source1" method="method1"
channel="channel1"
int:inbound-channel-adapter
int:inbound-channel-adapter ref="source2" method="method2"
channel="channel2"
int:channel-adapter
An Outbound Channel Adapter takes message payloads sent to a channel and invokes methods on a consumer POJO. Here is an example.
int:outbound-channel-adapter channel="channel" ref="target"
method="handle"
int:outbound-channel-adapter
All Channel Adapters can be created without a channel attribute, in which case it implicitly instantiates a Direct Channel. The Channel name will be the ID of the Channel Adapter. Therefore, either the channel attribute or ID is required on the Channel Adapter.
Message Bridge
Oftentimes it is necessary to connect two types of Message Channels. For example, you might want to connect a PollableChannel to a SubscribableChannel and don't want the subscribing endpoint to deal with a Poller. To achieve this, you use a special type of endpoint called the Messaging Bridge, which will provide the Poller. By setting the max-messages-per-poll and other scheduling attributes appropriately, Messaging Bridge can be used to throttle inbound Messages. Configuring a Messaging Bridge is really simple. Here is an example of connecting a PollableChannel to a SubscribableChannel using a Messaging Bridge that is enabled with throttling by appropriately choosing Poller attributes.
int:bridge input-channel="pollable" output-channel="subscribable"
int:poller max-messages-per-poll="10" fixed-rate="5000"
int:bridge
Routers
Payload Type Router
A Payload Type Router allows you to send messages to a specific channel based on the payload type.
int:payload-type-router input-channel="routingChannel">
int:mapping type="java.lang.String" channel="stringChannel"
int:mapping type="java.lang.Integer" channel="integerChannel"
int:payload-type-router
Header Value Router
A Recipient List Router sends each message to a list of statically defined Message Channels. Recipients can also have an optional selector-expression, which can be used to determine which recipients will receive the message.
int:header-value-router input-channel="routingChannel"
header-name="testHeader"
int:mapping value="someHeaderValue" channel="channelA"
int:mapping value="someOtherHeaderValue" channel="channelB"
int:header-value-router
Recipient List Router
A Recipient List Router sends each message to a list of statically defined Message Channels. Recipients can also have an optional selector-expression, which can be used to determine which recipients will receive the message.
int:header-value-router input-channel="routingChannel"
header-name="testHeader"
int:mapping value="someHeaderValue" channel="channelA"
int:mapping value="someOtherHeaderValue" channel="channelB"
int:header-value-router
Generic Router
A generic custom router can be created by extending a Spring Integration class called AbstractMessageRouter. In the following configuration, the ref attribute references the bean name of the custom Router implementation.
< int:router ref="customRouter" input-channel="input1"
default-output-channel="defaultOutput1" />
Filter
Filter is another endpoint akin to a Router. However, a Filter will not make any decision as to where the Message should be routed; rather it determines whether the Message is sent to its output channel based on certain criteria. The following illustrates the common way of configuring a Filter.
< int:filter input-channel="input" output-channel="output"
ref="exampleObject" method="someBooleanReturningMethod" />
The referenced method must return a Boolean value. If it returns true, the message will be sent to the output channel; otherwise, it will not be sent. If it is false, you can set the throw-exception-on-rejection attribute to true so an exception will be thrown. Additionally if you want rejected messages to be routed to a specific channel, you can provide a reference to the channel using the discard-channel attribute. You can use SpEL (Spring Expressions Language) expressions as well in a Filter to avoid using helper methods in a bean as in the following.
< int:filter input-channel="input" expression="payload.equals('ok')" />
Splitter
A Splitter is a component that splits a Message into several parts to be processed independently. In Spring Integration, any POJO can function as a Splitter, given that it has a method that takes a single input argument and return a single, collection or array of Message (or non-Message) objects. Although you can use the Spring Integration API to accomplish splitting, this is the recommended approach as this decouples the application from any Spring Integration knowledge. Here is how you may configure a Splitter using XML.
< int:splitter id="splitter" ref="messageSplitterBean"
method="splitMessage"
input-channel="inputChannel"
output-channel="outputChannel" />
In this configuration, the splitMessage method in the object referenced by messageSplitterBean would split the incoming Message into different parts. If no ref or method attribute is provided it would be expected that the incoming payload is already a Collection.
Aggregator
An Aggregator is the reverse of a Splitter, but it is more complex than a Splitter as it needs to maintain state and know when Messages can be combined to form a single Message. Any POJO can act like an Aggregator as long as it has a method that can accept a single java.util.List. Below is an example of configuring an Aggregator using a Spring bean.
< int:aggregator
input-channel="inputChannel"
output-channel="outputChannel"
discard-channel="throwAwayChannel"
message-store="messageStore"
ref="aggregatorBean"
method="aggregate"
expire-groups-upon-completion="false" />
The message-store attribute is used to store messages until the message aggregation is completed. By default, a volatile in-memory store is used. When using a ref/bean combination for aggregation, the method must implement the logic for aggregation. By default, the aggregated Messages will be part of the output payload if no bean available to implement an aggregation logic. If the expire-groups-upon-completion attribute is set to true, the completed groups will be removed from the MessageStore.
Correlation and release strategies can be used with an Aggregator to combine Messages. With the default correlation strategy, Messages with the same CORRELATION_ID in the header are combined. You can override this behavior by implementing the CorrelationStrategy interface. The following is an example of using a correlation strategy with the provided method. For brevity, all other attributes are omitted.
< int:aggregator correlation-strategy="correlationStrategyBean"
correlation-strategy-method="correlate" />
The following illustrates using a correlation strategy using a SpEL expression. Remember that you can use one or the other, not both.
< int:aggregator correlation-strategy="correlationStrategyBean"
correlation-strategy-expression="headers['foo']" />
Similar to the correlation strategy, either the ref/bean or SpEL can be used, but not both at the same time.
You can add further statefulness to the aggregators by using the MessageGroupStore support available in Spring Integration.
Resequencer
A Resequencer is very similar to an Aggregator, but it does not do any processing on the Messages like an Aggregator does. It simply re-sequences the Messages based on the SEQUENCE-NUMBER header value. Both correlation and release strategy semantics are equivalent to that of the Aggregator. The XML configuration is mostly similar to that of the Aggregator. The namespace element used for resequencer is < int:resequencer>.
Message Handler Chain
The Message Handler Chain can be treated as a single endpoint. Using a Message handler chain, you can initiate a Spring Integration flow easily. It takes an input channel and if the final component is capable of producing a reply, an output channel can also be provided. Here is an example of a flow defined through a message handler chain. Some of the components used in this example will be explained later.
< int:chain input-channel="input" output-channel="output">
<int:filter ref="filtRef" />
<int:service-activator ref="someService" method="someMethod" />
</int:chain>
As you can see, you can put several components together as a single large endpoint using the Message handler chain support. You can even call other chains from within an outer chain.
{{ parent.title || parent.header.title}}
{{ parent.tldr }}
{{ parent.linkDescription }}
{{ parent.urlSource.name }}