WSO2 Enterprise Integrator for Distributed Messaging
We take an up close look at various messaging patterns, the architectures that make them up, and how they contribute to a larger piece of software.
Join the DZone community and get the full member experience.Join For Free
HA - High Availability
SAN - Storage Area Network
AMQP - Advanced Message Queuing Protocol
What Is Distributed Messaging?
Message brokering is an enterprise integration pattern to route messages to the proper destination without the originating application being aware of the ultimate destination of the message. Going further, guaranteed message delivery and ordered message delivery requirements are also catered using a message broker.
The WSO2 Enterprise integrator can deliver this integration pattern using the “message broker profile” bundled into it. From here onward in this article, this profile is referred to as “Message Broker.”
Messaging Architecture Patterns
There are different architecture patterns publishers and subscribers can connect to a message broker server. They have their own pros and cons.
Using a central Message Broker in a system deployment is sometimes referred to as the hub-and-spoke architectural style. In that architecture, a Message Broker server is positioned in the middle and publishers and subscribers are connected to it. The routing of messages to relevant subscribers from the publishers is handled by the broker.
This architecture has its own flows. If the message broker crashes for some reason, the whole system cannot communicate with each other. To solve this problem, the active-passive architecture style can be used.
In this messaging style, all publishers and subscribers are aware of a set of brokers with which they can connect. If the first broker in the list is down, it will connect to the next. In this way, high availability of the system is preserved as long as at least a single broker instance is available to connect. Popular message broker like ActiveMQ , uses this mechanism to achieve HA.
An active-active message broker setup would provide HA and also scalability. All subscribers and publishers know the set of brokers which they can connect, and, interestingly, they can select any broker node on the setup to connect. If the connected broker node crashed for some reason, it will be connected to another broker on the set.
This way, the load distribution between the brokers is achieved. Going forward, we can introduce a new routing layer on top of the message broker servers layer and present all outside publishers and subscribers a one broker endpoint to connect to. It will also handle broker failures and reroute or ask to reconnect to the client. This would be the best architecture style for a project having high messaging demands or whose messaging demands are likely to grow.
Scaling Persistence Layer
Note that the persistence layer also needs to scale along with the message broker cluster. Just scaling the broker software layer, which is lying on top of the persistence layer, is not going to deliver the scaling factor expected. Below are some of the technologies used for scaling the persistence layer.
- Database mirroring
- Network file systems
- Cloud storages
- Storage-area network (SAN) 
One of the major thing to keep in mind when selecting a DB scaling mechanism is reliabiity. Choosing a NoSQL DB like Cassandra or MongoDB is not good as it gives up the reliability aspect of scaling and performance.
However, designing an active-active broker system is not trivial due to various factors. Scaling/high availability and reliability are contradicting non-functional requirements. Thus, designing a scalable broker without giving up reliability is a challenge. It needs to cater for node failures, route messages, and clients to other nodes on failure, distribute messages to the correct node where subscribers are, making sure no duplicates occur across the cluster, keep track of published messages, and load balance them between broker nodes.
Let us review how the WSO2 Message Broker team faced this challenge by going through the architecture of the product. This product is shipped as a profile inside WSO2 Enterprise Integrator now.
Distributed Messaging Architecture of WSO2 EI Broker Profile
High-Level Architecture of a Typical Deployment
The WSO2 Message broker cluster is comprised of two or more broker nodes. All nodes are capable of handling AMQP messaging traffic. All nodes are pointed to the same configuration store and message store databases so that they can share information. One node, usually the first node started on the cluster, becomes a special node in the cluster. We name it “coordinator node,” and it keeps track of other nodes' information, etc. (described later on in this article).
If you log into the console of any broker node, you will see the same view, as all information is shared across nodes in the cluster.
Message Slots and Distribution
When a message is published to any node on the cluster, it is persisted to the database which is shared across the cluster nodes. It is not enough for it to get to the consumer, as the consumer can be on any different node. It is too expensive notifying all the nodes on each message's arrival, too, as it will kill the performance of the broker cluster when delivering messages. It will not scale as well when the number of nodes added to the cluster grows.
To solve the problem, the WSO2 Message broker uses the concept of “Message Slots.” Messages that are published to a node are tracked by that node, and, from time to time, notify the coordinator node of message slots. This notification consists of the following information.
- Start message id.
- End message id.
- Name of the queue to which the message is addressed.
These slots are kept by the coordinator node in memory and it is the responsibility of the coordinator node to manage them. We will drill into more information under “coordinating message slots” section.
We will look into how subscribers are registered in the broker cluster first.
Subscribers can come to the cluster via any node in the broker cluster. When it is subscribed, a TCP connection is made to that node only, and therefore only that node can deliver messages to that subscriber. According to the AMQP Specification , subscribers are bound to a message router by a binding key, and it has a unique queue associated. These semantics are evaluated at the broker core, and each subscriber is attached to a queue in the cluster. In simple terms, the WSO2 Message broker will do the following.
- Queue subscribers: the broker will keep one queue for the whole cluster by the name of the queue to which the subscriber is subscribing.
- Topic subscribers: the broker will keep different queues for the different binding key topics that subscribers have, as well as per node. If we keep one temporary queue for each subscriber, in a scenario where there are a lot of topic subscribers, there will be thousands of queues registered in the cluster, creating too much overhead for coordinating message slots and increasing communication between the nodes (hence making it less scalable). Thus, for a topic, all the subscribers for the same topic on the same node will share the same queue.
Subscriber information is persisted to the database by the node used for subscription. However, a copy of the subscriber's information is kept in the subscriber cache at each node. Every node in the cluster should have the same view of the subscriber cache. This prevents the broker from going to the database every time it needs to evaluate one of the following:
- To which subscriber a message should be delivered.
- In which node the actual subscribers for a queue reside.
The WSO2 Message broker is built on top of the Carbon 4 framework . This framework uses Apache Hazelcast for coordinating cluster messages. The same view of the subscribers in memory of each node is achieved using Hazelcast-based cluster coordination. Whenever a subscription is made, that node will send a notification to all other nodes in the broker cluster updating its in-memory cache.
Thus, the subscriber information is kept as an in-memory grid in the WSO2 MB cluster.
There are instances where we need to maintain the view of the subscriber nodes properly and very carefully. Please note the below situations:
- The subscriber closes and leaves a node.
- The client application that the subscriber is serving crashes.
- The TCP connection between the subscriber-client and the broker is broken.
- A node having a set of subscribers is shut down. Here, that node will send notifications on its all subscribers closings and shutdowns.
- A node having a set of subscribers is crashed. In this scenario, the coordinator of the cluster will cleanup subscriber entries from the DB and notify all other existing members in the cluster.
If the coordinator is the node that crashed, a newly elected coordinator will do it.
Now we are in a position to discuss what happens to a message published to the broker cluster and how it is routed the to appropriate subscriber.
Keep Track of Published Messages
Whenever a message is published to the broker cluster, the node receiving the message will generate a new message id for the message. The message id is timestamp based, thus if messages are ordered by the id, they will get ordered to according to the first in, first out (FIFO) method.
The node receiving the published messages will persist them as batches (if not transacted) to the persistence store. Meanwhile, it will keep track of message IDs and mark slots. By default, a slot will have 1000 messages. If message publishing is slow, a slot can have a fewer number of messages.
This slot information (a slot has no messages in it, just boundary information) is then published to the coordinator node in the broker cluster asynchronously. For this, the WSO2 MB uses the “Thrift” protocol. Being a binary protocol, it can pass information fast and with little load. Every node has a thrift connection pool with connections made to the coordinator node in the cluster. Then, again, this brings some more overhead to state sharing across the nodes in the cluster. Imagine the coordinator node was shut down or that it crashed, then all the nodes on the broker have to:
- Invalidate all the created thrift connections to the coordinator node.
- The new coordinator election can take some time. But message slots shouldn't be too loose. The publish node has to keep slot information in memory and flush them to the new coordinator when it is elected.
- Update the coordinator node flag.
- When one node becomes the coordinator, which was earlier a normal node, it has to check the subscribers/publishers of the crashed node and remove them from the persistent store and notify all other members of the cluster.
Thus, the state sharing and making going well on a node failure is not trivial in an active-active broker cluster. It must be designed carefully.
Thus, when a message slot reaches the coordinator, all the messages pertaining to the slot are already persisted to the database and available for any node to read.
This needs to be thought of in a concurrent perspective, where different publishers to different queues and topics are publishing messages concurrently to different nodes at the same time. Then message slots belonging to different queues will be notified from different broker nodes to the coordinator. If multiple publishers from different nodes publish to the same queue, the slots they generate can overlap at the coordinator node.
Coordinating Slots and Routing
The coordinator node is responsible for keeping track of all the slots in the cluster. The most important thing it does is to manage message slots. It already knows (essentially every node does) who the subscribers are and to which node they are subscribed. It assigns different message slots to different nodes that are sending requests for message slots.
The following diagram shows the coordinator node and the other nodes.
- The publisher nodes are sending slot information to the coordinator node using Thrift connections.
- The coordinator node queues the message slots queue-wise. If a new slot overlaps with an existing slot for a queue, it marks the existing slot as overlapped if it is already assigned to a node, and creates a new slot for the non-overlapping part.
- It also keeps track of subscribers attached to queues and to which nodes they are subscribed in the cluster.
- Nodes which have a subscriber for a particular queue or a topic (ultimately a queue), keep asking the coordinator for a new message slot of that queue. If the coordinator has slots belonging to that queue received from publishers, it assigns a slot to that node and persists that assignment information on the shared persistence DB. The coordinator needs to do it because if the cluster had to elect a new coordinator, it needs this information to continue with its slot coordination task.
- An assigned slot to a node is never given to any other node unless the first node returns the slot back to the coordinator.
- Slots of a particular queue are assigned in the order that the slot poll requests are coming from different nodes.
- Note that if there are subscribers for the same queue subscribed to multiple nodes, the coordinator will assign slots to all those nodes as they come in. As assignments are marked and slots are not overlapped, no message is duplicated from the cluster's point of view. Both node subscribers will get messages in parallel.
- There are several instances a slot can be returned by a node. Those slots contain messages those are not yet delivered to any subscriber.
- The node owning the slot crashed. Then the coordinar will mark the slot as returned when a member leaves the cluster notification.
- The node owning the slot gracefully shuts down. Then the node should return all the slots to the coordinator before it leaves the cluster.
- The last subscriber for a particular queue on the slot owning a node is disconnected (client crashed/network connection to client disconnect/client closed connection) and there is no subscriber left to deliver messages to.
- Returned slots and overlapped slots are assigned to the nodes who are polling for slots before the new ones. This is because they contain messages that arrived earlier to the broker cluster. Also, the coordinator will make sure to assign overlapped slots to the same node that it was assigned to before (so that it can read and and filter new messages only).
- When a slot needs to be removed, the node owning the slot will send a slot delete request and get confirmation from the coordinator node.
By the above information, it is evident that the coordinator is doing a critical job in the distributed message broker architecture. To sum it up, the message routing is done batch-wise at the coordinator, taking all failure scenarios into account as well.
The following diagram shows how messages of a slot received from the coordinator node are delivered to the target subscriber at a particular broker node.
When to Delete Messages and Message Slots
The WSO2 message broker deletes a slot when either all the messages in that slot are delivered to the subscribers and they are acknowledged or messages of the queue are purged. Even if one message belonging to the slot is left, it will not delete the slot. For topics, a reference of the same message is delivered to multiple topic subscribers, thus it waits until all the subscribers acknowledge that message in order to remove it from the persistence layer.
When all messages are delivered, the node owning the slot from the coordinator node sends a slot delete request to the coordinator. The coordinator checks and verifies that the slot can be removed.
Why is such a heavy verification needed to delete a slot?
The answer comes from the fact that there is no guarantee that there will be more messages to be added to the range denoted by the slot when delivering the slot from the coordinator to be assigned to a node. This situation can arise in a concurrent publisher scenario where we're sending the same queue through to several nodes. Thus, the broker cluster needs a mechanism to make sure that there will be no more messages that could be added to the slot range before it deletes a slot and moves on.
To achieve that, every node in the cluster will keep sending a heartbeat-like notification using a thrift connection to the coordinator node with the current message ID that the node generates if a message arrives at that instance. If we get the minimum of those message ID values at a given instance, it gives an assurance that, thereafter, there will be no message generated below that message ID by any node in the cluster. This value is called the “slot deletion safe zone value.”
If a upper limit of a slot is “safely below” the slot deletion safe zone value, the coordinator responds with “OK” back to the node requesting to delete the slot and the node executes the delete.
This brings up the critically of broker nodes in the cluster being time synced. If they are not in sync, slots whose messages are already delivered to the subscribers may get gathered in the cluster, making nodes consume more RAM memory, and ultimately going Out of Memory.
There are situations where message broker clusters can be affected by connection failures and node crashes.
- A node in the cluster can be crashed. It can be the coordinator node or some other node.
- In this case, nodes should elect a new coordinator and every existing node should agree on the elected coordinator. Subscribers connected to the crashed node should be removed and notified within the cluster as well.
- The network between nodes can break down.
- A split brain situation can arise and the cluster can act as two individual clusters. We have introduced “RDBMS-based cluster coordination” to solve some of the issues where cluster notifications from one node to other is done by a shared persistence layer itself.
- Thrift connections between nodes can be interrupted.
- In this scenario, message slot coordination cannot be done. If thrift connections are broken and cannot be re-established, after a few attempts the broker node will give up and close all incoming and outgoing messaging traffic.
- DB outages.
- There are situations DB can be down for maintenance or temporary there can be DB outages. If such happens, MB cannot persist messages or its status. Broker node will immediately close all the channels (publishers and subscribers) connected to it and start trying to reconnect to DB. If succeeded it will again open up messaging transports so that subscribers and publishers can reconnect.
A message broker is used for reliable asynchronous messaging. Making it clustered and highly available without giving up the reliability factor is not trivial. There are different aspects and unhappy paths to take care of to make sure no message is lost or go unnoticed by internal algorithms. The necessity to share state between the nodes make it more complex. Even clustered, there should be some kind of cluster-wide coordination happening somewhere to make sure no duplication and proper routing of messages from publishers to subscribers. However, when you know building blocks of the product and responsibilities of them it is easy to troubleshoot the product and tune it to suit the requirements. In the few next articles, we will discuss the components of WSO2 Message Broker architecture and how they are organized.
Opinions expressed by DZone contributors are their own.