Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Mocking RabbitMQ for Java Integration Tests

DZone's Guide to

Mocking RabbitMQ for Java Integration Tests

How to use an embedded Java version of Apache Qpid to mock a RabbitMQ Broker inside integration tests.

· Integration Zone ·
Free Resource

The new Gartner Critical Capabilities report explains how APIs and microservices enable digital leaders to deliver better B2B, open banking and mobile projects.

In this article I will show how I used an embedded Java version of Apache Qpid to mock a RabbitMQ Broker inside integration tests. It was tricky to make everything work and I couldn't find anything related on the web, so it's worthy to share.

The Challenge

My team's product uses Apache Flink to stream events (source) from a RabbitMQ queue and, after processing them, sink the data to an Elastic Search database.

This product was lacking tests. Our team decided that unit testing would be hard and verbose, as a lot of mocking would be needed just to remove all Flink infrastructure out of the way. So we decided to go straight with integration tests. The requirements for the integration tests was that (I) it should be fast enough and (II) easily run from any IDE.

RabbitMQ is built using Erlang, a programming language designed at Ericsson. For that reason it cannot run inside a JVM. This constraint forced myself to spend hours looking for a compatible Java mock replacement as running RabbitMQ inside some Docker image (or having it running manually inside a VM) would break the second test requirement (II).

I found the following candidates:

The first option was excluded straight as Spring AMQP is just a client to manage a AMQP Broker and we needed an embedded broker. So my first try was ActiveMQ Java broker.

After a few code setups later I tried to connect our product to the embedded broker and got an exception: 

protocol mismatch: 1.0 / 0.9

The reason for that exception was that ActiveMQ implements version > 1.0 of the AMQP protocol and our product was using the legacy version 0.9, same as RabbitMQ. As ActiveMQ doesn't support any version bellow AMQP 1.0 I had to exclude that option as well.

The Qpid Challenge

At first sign, after reading Qpid documentation, I though I could make it work easily but them I noticed that the documentation explains a lot about the standalone installation and almost nothing about the embedded broker. Moreover, I found some posts on the internet (here and here) mentioning that depending on the version of Qpid the implementation of the Security Providers has changed radically making it hard to connect an AMQP client to the Qpid Broker.

After a lot of research and tweaks I was able to make a specific version of Qpid work. I share the solution bellow.

The Code

I was able to make it work only with Qpid version 0.28. Above this version the Security Providers were refactored and the communication with our legacy product code was impossible as we couldn't enable SSL from our side. Here is the maven dependency:

<dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-broker</artifactId>
    <version>0.28</version>
<scope>test</scope>
</dependency>

And the code to make the broker start up:

import java.io.*;

import org.apache.qpid.server.Broker;
import org.apache.qpid.server.BrokerOptions;

import com.google.common.io.Files;

public class EmbeddedAMQPBroker {

    public static final int BROKER_PORT = findAvailableTcpPort();

    private final Broker broker = new Broker();

    public EmbeddedAMQPBroker() throws Exception {
        final String configFileName = "qpid-config.json";
        final String passwordFileName = "passwd.properties";
        // prepare options
        final BrokerOptions brokerOptions = new BrokerOptions();
        brokerOptions.setConfigProperty("qpid.amqp_port", String.valueOf(BROKER_PORT));
        brokerOptions.setConfigProperty("qpid.pass_file", findResourcePath(passwordFileName));
        brokerOptions.setConfigProperty("qpid.work_dir", Files.createTempDir().getAbsolutePath());
        brokerOptions.setInitialConfigurationLocation(findResourcePath(configFileName));
        // start broker
        broker.startup(brokerOptions);
    }

    private String findResourcePath(final String file) throws IOException {
// get the configuration file in the classpath
    }

The qpid-config.json file:

{
  "name" : "EmbeddedBroker",
  "defaultVirtualHost" : "default",
  "modelVersion" : "1.0",
  "storeVersion" : 1,
  "authenticationproviders" : [ {
    "name" : "passwordFile",
    "path" : "${qpid.pass_file}",
    "type" : "PlainPasswordFile"
  } ],
  "ports" : [ {
    "name" : "AMQP",
    "port" : "${qpid.amqp_port}",
    "authenticationProvider" : "passwordFile"
  } ],
  "virtualhosts" : [ {
    "name" : "default",
    "storePath" : "${qpid.work_dir}/derbystore/default",
    "storeType" : "DERBY"
  } ]
}

And passwd.properties:

guest:guest

Now, I used the Spring AMQP to connect to this mock broker and create/delete queues and exchanges. Here are the snippets:

Maven dependency:

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.5.2.RELEASE</version>
<scope>test</scope>
</dependency>

Code to create and delete exchange and queue:

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;

private void createExchange(final String identifier) {
        final CachingConnectionFactory cf = new CachingConnectionFactory(EmbeddedAMQPBroker.BROKER_PORT);
        final RabbitAdmin admin = new RabbitAdmin(cf);
        final Queue queue = new Queue("myQueue", false);
        admin.declareQueue(queue);
        final TopicExchange exchange = new TopicExchange("myExchange");
        admin.declareExchange(exchange);
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("#"));
        cf.destroy();
    }

    private void deleteExchange() {
        final CachingConnectionFactory cf = new CachingConnectionFactory(EmbeddedAMQPBroker.BROKER_PORT);
        final RabbitAdmin admin = new RabbitAdmin(cf);
        admin.deleteExchange("myExchange");
        cf.destroy();
    }

Code to send messages:

    public void sendMessage(final EiffelEvent event) throws Exception {
        final CachingConnectionFactory cf = new CachingConnectionFactory(EmbeddedAMQPBroker.BROKER_PORT);
        final RabbitTemplate template = new RabbitTemplate(cf);
        final String message = "hello world message!";
        template.convertAndSend("myExchange", "#", message);
        cf.destroy();
        // waitForMessageBeConsumed();
    }

Just put all this together in a test class starting the Broker at @BeforeClass, initializing the exchange/queue at @Before so your test can send the message and check the post-condition.

Hope this helps you!

The new Gartner Critical Capabilities for Full Lifecycle API Management report shows how CA Technologies helps digital leaders with their B2B, open banking, and mobile initiatives. Get your copy from CA Technologies.

Topics:
rabbitmq ,java ,mock ,spring amqp ,apache qpid ,integration testing

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}