Writing Integration Tests for RabbitMQ-Based Components

DZone 's Guide to

Writing Integration Tests for RabbitMQ-Based Components

This post is the result of days of misery, reverse-engineering, GitHub browsing, and finding (sort of) clever ways to get yet one step closer to the goal mentioned in the title. As the topics described in this post are not very well documented, this write-up may turn out to be useful.

· Integration Zone ·
Free Resource

This post is the result of days of misery, reverse-engineering, GitHub browsing, and finding (sort of) clever ways to get yet one step closer to the goal mentioned in the title. As the topics described in this post are not very well documented, this write-up may turn out to be useful.

Before we dive into the topic, let me clarify a few things. RabbitMQ, an AMQP (advanced message queueing protocol) compatible message-oriented middleware, (in my understanding) has no in-memory message broker implementation. ActiveMQ, another AMQP-compatible implementation, on the other hand does provide such a component—easy to configure and use. The problem is that ActiveMQ implements version 1.0 of the protocol, while RabbitMQ is on version 0.9.1, and the two versions, of course, are not compatible. That is the main reason one might need QPID, a third MOM implementation that comes with an in-memory message broker and is able to "speak" multiple versions of the protocol.

What I'm going to show here, is a way of covering RabbitMQ (and I guess any other 0.9.1 compatible implementation) based applications with integration tests. In this post, I am going to use the freshest QPID (6.0.2) in-memory message broker, and RabbitMQ client (3.6.1) for receiving (and sending) messages. Beware, this process is a bit complex, but if you need to get those integration tests in place, the following tips can help.

Note: There's already a similar post about this topic here, but it only works with QPID 0.28. Also, it goes with the approach of plain password files, which is, according to QPID's documentation, a deprecated authentication method. If that is ok with you, then you should definitely check the link out, as it is somewhat easier to implement.

Let's see how we can solve this integration puzzle.

Application Under Test

First of all, let's see a tiny application I am going to use for demonstration purposes. The base situation is the following: we have a message receiver component, that is listening to a message queue, waiting for text messages. Every time a new text message comes along, we save it into a really simple cache together with an ever-increasing message number. As an integration test, we would like to send three different text messages, process them and verify whether they are indeed present in the cache (of course, with the right IDs). The task is to get this whole scenario tested with a JUnit-based test, and without any kind of external message brokers running. For diagram lovers (like me), this looks like the one below.

Image title

Step 1: The Message Receiver

This is the easiest step, as chances are high you already have a component that receives messages (it is part of the system you want to test). Still, for the sake of completeness, I will put here the source of a very dumb message receiver (you have a better one already):

public class Receiver {

    private static final String BROKER_URI = "amqp://<username>:<password>@<host>:<port>";
    private final static String QUEUE_NAME = "jms/queue";
    private SimpleCache cache;

    public Receiver(SimpleCache cache) {
        this.cache = cache;

    public void receive() throws Exception {
        // let's setup evrything and start listening
        ConnectionFactory factory = createConnectionFactory();

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicConsume(QUEUE_NAME, true, newConsumer(channel));

    protected ConnectionFactory createConnectionFactory() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.useSslProtocol(); // Note this, we'll get back to it soon...
        return factory;

    private DefaultConsumer newConsumer(Channel channel) {
        return new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                cache.update(new String(body));  // put each message into the cache

You might have noticed the line comment in the previous snippet next to useSslProtocol. That is there for a good reason: the QPID broker is unable to offer PLAIN user and password authentication mode (the one we want to use) unless SSL is enabled.

(If you're using a message receiver without SSL in production, you can make ConnectionFactory a dependency of your receiver so it's easy to go with different implementations for tests and production. Also, a fake implementation could override the ConnectionFactory creator method, call super(), and add the usage of SSL).

Step 2: The Message Sender

Now, QPID has a message client, capable of sending messages, but well, I was unable to make it work in this scenario. It required me (maybe I was just not persistent enough) to generate trust-stores and certificates, a process that seemed very tiring for a simple integration test. RabbitMQ's client, on the other hand, was much easier to use: it only made me add the SSL enabler line of code, and I was set.

The message sender, looks like this:

public class Sender {

    private static final String QUEUE_NAME = "jms/queue";
    private static final String BROKER_URL = "amqp://<username>:<password>@<host>:<port>";

    private ConnectionFactory factory = new ConnectionFactory();

    public void sendMessage(String text) throws Exception {

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicPublish("", QUEUE_NAME,  null, text.getBytes());

So far so good, sender and receiver are ready. Now we need the message broker.

Step 3: The Message Broker

This is going to be the most complex step—there are plenty of things to watch out for. Don't get scared, it all comes together in the end.

First of all, this is the Maven dependency for QPID:


Now, a JSON configuration is needed for the QPID in-memory broker to define all the details it needs for startup. Let's see what it looks like:

 "name" : "broker",
 "modelVersion" : "6.0",
 "authenticationproviders" : [ {
     "name" : "plain",
     "type" : "Plain",
     "users" : [ {
         "name" : "guest",
         "type" : "managed",
         "password" : "guest"
     } ]
 } ],
 "ports" : [ {
     "name" : "AMQP",
     "port" : "${qpid.amqp_port}",
     "authenticationProvider" : "plain",
     "keyStore" : "default",
     "transports" : [ "SSL" ],
     "virtualhostaliases" : [ {
         "name" : "defaultAlias",
         "type" : "defaultAlias"
     }, {
         "name" : "hostnameAlias",
         "type" : "hostnameAlias"
     }, {
         "name" : "nameAlias",
         "type" : "nameAlias"
     } ]
 } ],
 "virtualhostnodes" : [ {
     "name" : "default",
     "type" : "JSON",
     "defaultVirtualHostNode" : "true",
     "virtualHostInitialConfiguration" : "{\"type\" : \"Memory\",\"name\" : \"default\",\"modelVersion\" : \"6.0\"}"
 } ],
 "keystores" : [ {
     "name" : "default",
     "password" : "password",
     "storeUrl": "/path/to/the/keystore"

Ok, let's take a look at what we've done. The configuration file specifies the authentication method (PLAIN), declares the username and the password ("guest" and "guest"), defines the port and security for AMQP (note the placeholder ${qpid.amqp_port}; we are going to supply that param from the source code), declares a virtual host with the name default, and specifies a keystore for security. The good news is that the keystore does not have to be a very secure one, you can create it for yourself, as described here. The bad news is that you actually need a keystore :-(

Now that the metadata is in place, let's start the Broker. The code for that is presented next:

public class BrokerManager {

    private static final String INITIAL_CONFIG_PATH = "<your_path_to_the_above_json_file>";
    private static final String PORT = "<your_port>";
    private final Broker broker = new Broker();

    public void startBroker() throws Exception {
        final BrokerOptions brokerOptions = new BrokerOptions();
        brokerOptions.setConfigProperty("qpid.amqp_port", PORT);


    public void stopBroker() {

Step 4: The Integration Test

And now, let's put everything together. Before we can write the integration test itself, there are two more classes to be presented.

SimpleCache (used by the message listener):

public class SimpleCache {

    private List<CacheEntry> cache = new CopyOnWriteArrayList<CacheEntry>();
    private AtomicInteger counter = new AtomicInteger(0);

    public void update(String text) {
        cache.add(new CacheEntry(text, counter.getAndIncrement()));

    public List<CacheEntry> getContent() {
        return Collections.unmodifiableList(cache);

and CacheEntry (used by SimpleCache):

public class CacheEntry {
    private String text;
    private int sequenceNr;

    public CacheEntry(String text, int sequenceNr) {
        this.text = text;
        this.sequenceNr = sequenceNr;

    public String getText() {
        return text;

    public int getSequenceNr() {
        return sequenceNr;

    // ... hashCode and equals

As everything is done, now, as a final step we are ready to create a JUnit test to see whether this whole scenario works or not. Here is the source for the test:

public class IntegrationTest {

    private static final String FIRST = "first";
    private static final String SECOND = "second";
    private static final String THIRD = "third";

    private static BrokerManager brokerStarter;

    public static void startup() throws Exception {
        brokerStarter = new BrokerManager();

    public static void tearDown() throws Exception {

    private SimpleCache cache = new SimpleCache();

    public void cacheShouldContainThreeEntries_afterThreeReceivedMessages() throws Exception {
        Sender sender = new Sender();
        new Receiver(cache).receive();

        Thread.sleep(500); // This, of course can and should be replaced with something smarter
        List<CacheEntry> cacheContent = cache.getContent();

        assertEquals(3, cacheContent.size());
        assertEquals(new CacheEntry(FIRST, 0), cacheContent.get(0));
        assertEquals(new CacheEntry(SECOND, 1), cacheContent.get(1));
        assertEquals(new CacheEntry(THIRD, 2), cacheContent.get(2));


That's it, folks. This now should work, and be up to date, without any kind of deprecation. Hope it helps.

activemq, integration test, qpid, rabbitmq

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}