Over a million developers have joined DZone.
Platinum Partner

Websockets, AngularJS, RabbitMQ and C++ in Spring 4

· Integration Zone

The Integration Zone is brought to you in partnership with 3scale. Discover why Rails + jspm >= ECMAScript 6 awesomeness.

In this post, I will walk you through a Spring Framework 4-based application that makes the most of WebSockets, RabbitMQ and OpenCV to implement a simple computer vision application. It counts coins that an iOS application sends in a video stream over a binary WebSocket.

The Result

Perhaps strangely, I will begin by showing you the completed result. You are seeing the iOS application talking to the Spring webapp over binary WebSocket; decoding the video, processing each frame in the computer vision code behind RabbitMQ, then receiving the responses and pushing the results to an AngularJS application.

SpringOne2GX from Cake Solutions Ltd. on Vimeo.


Getting Started

Let’s begin with the RabbitMQ / computer vision components. This is a typical Spring AMQP task. At the highest level, we’ll be creating the RecogService andRecogServiceActivator. The RecogService is the entry point into our system. We submit chunks of the video stream (or full frames), drive them though Spring Integration, then Spring AMQP, then RabbitMQ and the C++ application, then back to Spring Integration and finally the response is received in the implementation RecogServiceActivator.


Here is how many lines of code we arrive at:

class RecogService(recogChannel: MessageChannel) {

  private def sendWithContentType(contentType: String, 
                                  correlationId: CorrelationId, 
                                  chunk: ChunkData): Unit = {
    val message = MessageBuilder.
      setHeader("content-type", contentType).


  def imageChunk(correlationId: CorrelationId)(chunk: ChunkData) = 
  	sendWithContentType(ContentTypes.`image/*`, correlationId, chunk)

  def mjpegChunk(correlationId: CorrelationId)(chunk: ChunkData) = 
  	sendWithContentType(ContentTypes.`video/mjpeg`, correlationId, chunk)


Spring Integration

I have already given away part of the core of the unicorn. It depends on the MessageChannel, which it uses to send the chunks of the input data. The channel sends the messages down a *chain* (illustrated below):


In the first step, we decode the chunk, potentially resulting in multiple frames. In Scala code, we turn a single ChunkData into Collection[FrameData].

class ChunkDecoder(mjpegDecoder: MJPEGDecoder) {

  def decodeFrame(@Header correlationId: CorrelationId, 
                  @Header("content-type") contentType: String,
                  @Payload chunk: ChunkData): util.Collection[ImageData] = 
    contentType match {
      case `video/mjpeg` => decodeMJPEGFrames(correlationId, chunk)
      case `image/*`     => decodeSingleImage(correlationId, chunk)

  private def decodeSingleImage(correlationId: CorrelationId, 
                                chunk: ChunkData): util.Collection[ImageData] = 

  private def decodeMJPEGFrames(correlationId: CorrelationId, 
                                chunk: ChunkData): util.Collection[ImageData] = 
  	mjpegDecoder.decodeFrames(correlationId, chunk)


By now, we’ve decoded as many frames as we could given the new chunk of data. However, the following components do not operate on collections of frames, but on a single frame. Therefore, we must split the Message of Collection[ImageData] into as many Messages of ImageData as there are elements in the collection. Spring Integration’s *splitter* splits a Message by default with a Collection[A]-type payload into multiple Messages, each containing an element of the original collection. We then take each decoded frame and pass it on to the AMQP outbound endpoint, remembering to map all outgoing and incoming headers. We also set the maximum timeout.

When we receive a response from the native code at the end of the RabbitMQ queues, we have an Array[Byte] that we needs to be converted to a String. Finally, we execute the onCoinResponse method of the recogServiceActivator bean. We represent the chain in XML as:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"

  <int:chain input-channel="recogRequest">
      <int:service-activator method="decodeFrame" ref="chunkDecoder"/>
      <int:splitter apply-sequence="false"/>
      <int-amqp:outbound-gateway exchange-name="sogx.exchange" 
          mapped-reply-headers="*" mapped-request-headers="*"
      <int:service-activator ref="recogServiceActivator" 

  <rabbit:connection-factory id="connectionFactory" 
    host="localhost" channel-cache-size="10" />
  <rabbit:template id="amqpTemplate"
    connection-factory="connectionFactory" />

  <rabbit:admin id="rabbitAdmin" 

  <rabbit:queue name="sogx.recog.queue" declared-by="rabbitAdmin"/>

  <rabbit:direct-exchange name="sogx.exchange" declared-by="rabbitAdmin">
          <rabbit:binding queue="sogx.recog.queue" key="sogx.recog.key" />

I have included the minimal RabbitMQ configuration, assuming that the RabbitMQ server runs on localhost and does not require authentication.

NEXT PAGE >> Command-line Application

Command-line Application

Now it's time to wire together a command-line application. To do so, we need to provide configuration for the Spring Framework to create all the beans that make up our application. The task is made slightly more difficult by the fact that we will want to use the same structure for the web application. We shall define the core beans in the CoreConfig trait, which will provide default implementations of all the @Beans.

trait CoreConfig {
  // "boring" beans
  @Bean def mjpegDecoder() = new MJPEGDecoder()
  @Bean def chunkDecoder() = new ChunkDecoder(mjpegDecoder())
  @Bean def recogService(): RecogService = new RecogService(recogRequest())

  // abstract beans
  @Bean def asyncExecutor(): Executor
  @Bean def recogServiceActivator(): RecogServiceActivator

  // SI channel
  @Bean def recogRequest() = new DirectChannel()

  // the message converter for the payloads
  @Bean def messageConverter() = new DelegatingJsonMessageConverter(
                                   new MappingJackson2MessageConverter())

  // the channel that connects to the WS clients
  @Bean def dispatchChannel() = new ExecutorSubscribableChannel(

  // MessagingTemplate (and MessageChannel) to dispatch messages to 
  // for further processing
  // All MessageHandler beans above subscribe to this channel
  @Bean def dispatchMessagingTemplate(): SimpMessageSendingOperations = {
    val template = new SimpMessagingTemplate(dispatchChannel())


Unfortunately, the CoreConfig trait is not sufficient to boot a Spring application. We need to create a class that mixes in this trait and implements the asyncExecutor() and recogServiceActivator() methods. We then sprinkle it with a few more annotations.

class App extends CoreConfig {

  @Bean def asyncExecutor(): Executor = new SyncTaskExecutor

  @Bean def recogServiceActivator() = new RecogServiceActivator {
    def onCoinResponse(@Header correlationId: CorrelationId, 
                       @Payload coins: CoinResponse): Unit = 
      println(">>> " + correlationId + ": " + coins)

class App is sufficient to use as configuration for the AnnotationConfigApplicationContext, and doing so gives us a full Spring application.

// Create the Spring ApplicationContext implementation,
// register the @Configuration class and load it
val ctx = new AnnotationConfigApplicationContext()

// Grab the created RecogService implementation
val recogService = ctx.getBean(classOf[RecogService])

// clean up

This is the core of the CLI object, which contains a convenient command loop.

object Cli extends App {
  import Commands._
  import Utils.reader._

  class App extends CoreConfig { ... }

  def commandLoop(): Unit = {
    Console.readLine() match {
      case QuitCommand                 => 
      case ImageCommand(fileName)      => 
      case MJPEGCommand(fileName, fps) => 
        readChunks(fileName, fps)

      case null                        => 
        // do nothing
      case _                           => 

    // in tail position

  // Create the Spring ApplicationContext implementation,
  // register the @Configuration class and load it
  val ctx = new AnnotationConfigApplicationContext()

  // Grab the created RecogService implementation
  val recogService = ctx.getBean(classOf[RecogService])

  // start processing the user input

  // clean up

Native Components

Assuming you have the RabbitMQ broker running, you can now execute this application and issue the mjpeg:/coins2.mjpeg command. Unfortunately, this will end with a message indicating an error in chain handler. It’s because we do not have the native components running.

The native component is implemented in C++ using OpenCV. We need to implement its RabbitMQ RCP server, where we use the computer vision code (in coins.cpp).

class Main : public RabbitRpcServer {
    CoinCounter coinCounter;
    virtual std::string handleMessage(
        const AmqpClient::BasicMessage::ptr_t message, 
        const AmqpClient::Channel::ptr_t channel);
    Main(const std::string queue, const std::string exchange, 
         const std::string routingKey);

Main::Main(const std::string queue, const std::string exchange, 
           const std::string routingKey) :
RabbitRpcServer::RabbitRpcServer(queue, exchange, routingKey) {

std::string Main::handleMessage(const AmqpClient::BasicMessage::ptr_t message, 
                                const AmqpClient::Channel::ptr_t channel) {
    // return a std::string that represents the result

    return "{\"succeeded\":false}";

int main(int argc, char** argv) {
    Main main("sogx.recog.queue", "sogx.exchange", "sogx.recog.key");
    return 0;

The code shows the core concepts: we inherit from RabbitRpcServer and implement the handleMessage method. The code above shows the minimal implementation, which is insufficient. The full implementation uses the CoinCounter to perform the computer vision code.

std::string Main::handleMessage(const AmqpClient::BasicMessage::ptr_t message, 
                                const AmqpClient::Channel::ptr_t channel) {
	Json::Object responseJson;
	try {
		// get the message, read the image
		ImageMessage imageMessage(message);
		auto imageData = imageMessage.headImage();
		auto imageMat = cv::imdecode(cv::Mat(imageData), 1);

		// ponies & unicorns
		Jzon::Array coinsJson;
		auto result = coinCounter.count(imageMat);
		for (auto i = result.coins.begin(); i != result.coins.end(); ++i) {
			Jzon::Object coinJson;
			Jzon::Object centerJson;
			centerJson.Add("x", i->center.x);
			centerJson.Add("y", i->center.y);
			coinJson.Add("center", centerJson);
			coinJson.Add("radius", i->radius);
		responseJson.Add("hasRing", result.hasRing);
		responseJson.Add("coins", coinsJson);
		responseJson.Add("succeeded", true);
	} catch (std::exception &e) {
		// bantha poodoo!
		std::cerr << e.what() << std::endl;
		responseJson.Add("succeeded", false);
	} catch (...) {
		// more bantha fodder!
		responseJson.Add("succeeded", false);

	Jzon::Writer writer(responseJson, Jzon::NoFormat);

	return writer.GetResult();

Now you run the native/recogapplication and issue the same mjpeg:/coins2.mjpeg command, and the application will succeed and print out the coin responses.

NEXT PAGE >> The Web Application

Web Application

Unfortunately, seeing some text on standard output is not quite what the users expect in 2013. (When did they get so soft? I remember when I was 20 years old… Oh, never mind!) We would like to provide a nice responsive web application that allows the code to *push* messages to it rather than polling for changes. WebSocket is a perfect solution for this. To make it even easier for us to handle, we are going to use the SockJS library and use the STOMP protocol over WebSocket!

We are simply going to “front” our RecogService by using a RecogController, and we are going to provide a special implementation of the RecogServiceActivator.  This implementation is going to remember the responses for all recog sessions. And, as we’ll see later, each change to the sessions is going to push a message out over the WebSocket.


The implementation of the RecogController uses, in addition to the familiar Spring MVC annotations, the new Spring messaging annotations.

class RecogController @Autowired()(recogService: RecogService, 
                                   recogSessions: RecogSessions) {

  def image(@SessionId sessionId: RecogSessionId, 
            @MessageBody body: ChunkData): Unit = {

  def mjpeg(@SessionId sessionId: RecogSessionId, 
            @MessageBody body: ChunkData): Unit = {

  def foo(): String = {
    val id = UUID.randomUUID().toString

  def bar(@RequestParam(defaultValue = "10") fps: Int): String = {
    val id = UUID.randomUUID().toString
    Utils.reader.readChunks("/coins2.mjpeg", fps)(recogService.mjpegChunk(id))


The image and mjpeg methods will be executed when we receive the message over the WebSocket at the given URL from our iOS application. We then pass the body of the message to the RecogService and then down the chain we have already explored. The implementation of the RecogSessions depends on the SimpMessageSendingOperations, which is part of the Spring messaging core.  It provides an entry point to the underlying message bus.

class RecogSessions(messageSender: SimpMessageSendingOperations) {
  val sessions = new util.HashMap[RecogSessionId, CoinResponse]()

  def onCoinResponse(correlationId: CorrelationId, 
                     coins: CoinResponse): Unit = {
    sessions.put(RecogSessionId(correlationId.value), coins)

  def sessionEnded(sessionId: RecogSessionId): Unit = {

  private def sendSessions(): Unit = 


This brings us to the messaging buses. Our application is going to use two main buses: one specifically for the WebSocket messages, and one for general messages.


Let’s explore the life of an incoming MJPEG chunk from the iOS client. The message hits the MessagingWebSocketHandler first through its SimpleUrlHandlerMapping. The MessagingWebSocketHandler builds a message from the incoming WebSocket frame and sends it to the dispatchChannel. From there, both subscribers see the message, but only the AnnotationMessageHandler can process it. The AnnotationMessageHandler is aware of all @Controller–annotated beans and can execute the methods in these controllers.

As a result, our RecogController#image method executes, invoking the mjpegChunk method of the RecogService. This activates our chain through RabbitMQ, our OpenCV application and back to RecogServiceActivator, which then calls the onCoinResponse method of the RecogSessions bean. This bean updates its sessions map and puts a message on the dispatchChannel.

The messages go to both the SimpleBrokerMessageHandler and AnnotationMessageHandler subscribers, but this time, only the SimpleBrokerMessageHandler can process the message. The processing involves placing it on the webSocketHandlerChannel, where it gets picked up by the SubProtocolWebSocketHandler and heads over to our application running in the browser.

In the code, we will need the WebConfig trait, which defines a self-type dependency on the CoreConfig. This expresses that the concrete implementations of WebConfig must also mix into the CoreConfig.

trait WebConfig {
  // require instances to be mixed in with CoreConfig
  this: CoreConfig =>

  // Channel for sending STOMP messages to connected WebSocket
  // sessions (mostly for internal use)
  @Bean def webSocketHandlerChannel(): SubscribableChannel = 
    new ExecutorSubscribableChannel(asyncExecutor())

  @Bean def taskScheduler(): TaskScheduler = {
    val taskScheduler = new ThreadPoolTaskScheduler()


  // MessageHandler that acts as a "simple" message broker
  def simpleBrokerMessageHandler(): SimpleBrokerMessageHandler = {
    val handler = new SimpleBrokerMessageHandler(
      webSocketHandlerChannel(), util.Arrays.asList("/topic/", "/queue/"))

  // WS -[SockJS]-> /sockjs/** ~> sockJsSocketHandler

  // SockJS WS handler mapping
  @Bean def sockJsHandlerMapping(): SimpleUrlHandlerMapping = {
    val handler = new SubProtocolWebSocketHandler(dispatchChannel())
    handler.setDefaultProtocolHandler(new StompProtocolHandler())

    val sockJsService = new DefaultSockJsService(taskScheduler())
    val requestHandler = new SockJsHttpRequestHandler(sockJsService, handler)

    val hm = new SimpleUrlHandlerMapping()
    hm.setUrlMap(Collections.singletonMap("/sockjs/**", requestHandler))


  // WS -[Raw]-> /websocket/** ~> websocketSocketHandler

  // Raw WS handler mapping
  @Bean def webSocketHandlerMapping(): SimpleUrlHandlerMapping = {
    val handler = new MessagingWebSocketHandler(dispatchChannel()) {
      override def afterConnectionClosed(session: WebSocketSession, 
                                         closeStatus: CloseStatus) {

    val requestHandler = new WebSocketHttpRequestHandler(handler)

    val hm = new SimpleUrlHandlerMapping()
    hm.setUrlMap(Collections.singletonMap("/websocket/**", requestHandler))


  // MessageHandler for processing messages by delegating to 
  // @Controller annotated methods
  @Bean def annotationMethodMessageHandler(): AnnotationMethodMessageHandler = {
    val handler = new AnnotationMethodMessageHandler(
      dispatchMessagingTemplate(), webSocketHandlerChannel())

      util.Arrays.asList(new SessionIdMehtodArgumentResolver))


The only thing that remains is to provide the implementation that mixes in the WebConfig and CoreConfig traits, and also, some configuration for the XML-less web applications. Let’s begin with the Webapp class.

class Webapp extends WebMvcConfigurerAdapter with WebConfig with CoreConfig {

  @Bean def asyncExecutor() = {
    val executor = new ThreadPoolTaskExecutor

  @Bean def recogServiceActivator() = new RecogServiceActivator {
    def onCoinResponse(@Header correlationId: CorrelationId, 
                       @Payload coins: CoinResponse): Unit =
      recogSessions().onCoinResponse(correlationId, coins)

  // Allow serving HTML files through the default Servlet
  override def configureDefaultServletHandling(
    configurer: DefaultServletHandlerConfigurer) = {


To complete the picture, we implement the DispatcherServletInitializer for XML-less web application that we shall deploy into Jetty.

class DispatcherServletInitializer extends AbstractAnnotationConfigDispatcherServletInitializer {

  protected def getRootConfigClasses: Array[Class[_]] = {

  protected def getServletConfigClasses: Array[Class[_]] = {

  protected def getServletMappings: Array[String] = {

  protected override def customizeRegistration(
    registration: ServletRegistration.Dynamic): Unit = {
    registration.setInitParameter("dispatchOptionsRequest", "true")


AngularJS Application

To complete the picture, we provide the user interface in an AngularJS application. The main component, SessionsCtrl uses the SockJS library to establish a STOMP-based connection over a WebSocket.

function SessionsCtrl($scope) {
    // initialization
    $scope.sessions = [];

    // Connect to the server on path /sockjs and then create 
    // the STOMP protocol client
    var socket = new SockJS('/sockjs');
    var stompClient = Stomp.over(socket);
    stompClient.connect('', '',
        function(frame) {
            // receive notifications on the recog/sessions topic
            stompClient.subscribe("/topic/recog/sessions", function(message) {
                $scope.$apply(function() {
                    $scope.sessions = angular.fromJson(message.body);
        function(error) {
            console.log("STOMP protocol error " + error);


The actual HTML is as close as it gets to example AngularJS applicaiton.

< !doctype html>
<html ng-app="coins">
    <title>Coin counter</title>
    <meta http-equiv="Cache-Control" 
             content="no-store, no-cache, must-revalidate, max-age=0"/>
    <!-- jQuery -->
    <script src="assets/js/jquery-2.0.3.min.js"></script>
    <!-- Bootstrap -->
    <link href="assets/css/bootstrap.min.css" rel="stylesheet"/>
    <script src="assets/js/bootstrap.min.js"></script>
    <!-- WS -->
    <script src="assets/js/sockjs-0.3.4.js"></script>
    <script src="assets/js/stomp.js"></script>
    <!-- Application & AngularJS -->
    <link href="assets/css/coins.css" rel="stylesheet"/>
    <script src="assets/js/angular.min.js"></script>
    <script src="assets/js/sessions.js"></script>
    <script src="assets/js/components.js"></script>
<div ng-controller="SessionsCtrl">
        <pane title="Raw">
            <h3>Raw data</h3>
        <pane title="Canvas">
            <h3>Visual representation</h3>
            <div ng-repeat="coins in sessions">
                <canvas display="{{coins}}" fill="red" scale="0.4" 
                           width="500" height="386"></canvas>

This completes the application. You can use the iOS application to send images over binary WebSockets to the Spring-based application. The Spring code accepts the messages and routes them to the AMQP broker to be processed by our native (CPU or GPU) computer vision code. The responses are then routed back to the applications running in the browsers using WebSockets.

Messaging Simulation

Before I let you see the source code, let me show you the RabbitMQ simulator showing the flow of the messages. The video shows that the Spring application produces the messages carrying the bytes that make up the frames, sending them to the sogx.exchange using the sogx.recog.key routing key. This means that they arrive on the sogx.recog.queue and are consumed by the consumers (the video shows 2 consumers, but we actually have 8). Because the Spring application needs to receive the responses, it creates a temporary queue with a generated name, and it expects the responses to arrive on that queue. And so, when the recog application sends the replies, they arrive on the temporary queue where they are consumed by the Spring AMQP receivers.

RabbitMQ Simulator from Cake Solutions Ltd. on Vimeo.

Summary and Code

You can follow the code by grolling through the commits in https://github.com/eigengo/springone2gx2013. The README.md contains useful information for building the application. The most important thing to remember (as of 14th September 2013) is that you will need the nightly build of the Spring Framework.

The Integration Zone is brought to you in partnership with 3scale. Learn how to deploy an NGINX API gateway on Heroku.


Published at DZone with permission of Jan Machacek , DZone MVB .

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}