Websockets, AngularJS, RabbitMQ and C++ in Spring 4
Join the DZone community and get the full member experience.
Join For FreeIn this post, I will walk you through a Spring Framework 4-based application that makes the most of WebSockets, RabbitMQ and OpenCV to implement a simple computer vision application. It counts coins that an iOS application sends in a video stream over a binary WebSocket.
The Result
Perhaps strangely, I will begin by showing you the completed result. You are seeing the iOS application talking to the Spring webapp over binary WebSocket; decoding the video, processing each frame in the computer vision code behind RabbitMQ, then receiving the responses and pushing the results to an AngularJS application.
SpringOne2GX from Cake Solutions Ltd. on Vimeo.
Getting Started
Let’s begin with the RabbitMQ / computer vision components. This is a typical Spring AMQP task. At the highest level, we’ll be creating the RecogService
andRecogServiceActivator
. The RecogService
is the entry point into our system. We submit chunks of the video stream (or full frames), drive them though Spring Integration, then Spring AMQP, then RabbitMQ and the C++ application, then back to Spring Integration and finally the response is received in the implementation RecogServiceActivator
.
Here is how many lines of code we arrive at:
class RecogService(recogChannel: MessageChannel) { private def sendWithContentType(contentType: String, correlationId: CorrelationId, chunk: ChunkData): Unit = { val message = MessageBuilder. withPayload(chunk). setCorrelationId(correlationId). setHeader("content-type", contentType). build() recogChannel.send(message) } def imageChunk(correlationId: CorrelationId)(chunk: ChunkData) = sendWithContentType(ContentTypes.`image/*`, correlationId, chunk) def mjpegChunk(correlationId: CorrelationId)(chunk: ChunkData) = sendWithContentType(ContentTypes.`video/mjpeg`, correlationId, chunk) }
Spring Integration
I have already given away part of the core of the unicorn. It depends on the MessageChannel
, which it uses to send the chunks of the input data. The channel sends the messages down a *chain* (illustrated below):
In the first step, we decode the chunk, potentially resulting in multiple frames. In Scala code, we turn a single ChunkData
into Collection[FrameData]
.
class ChunkDecoder(mjpegDecoder: MJPEGDecoder) { def decodeFrame(@Header correlationId: CorrelationId, @Header("content-type") contentType: String, @Payload chunk: ChunkData): util.Collection[ImageData] = contentType match { case `video/mjpeg` => decodeMJPEGFrames(correlationId, chunk) case `image/*` => decodeSingleImage(correlationId, chunk) } private def decodeSingleImage(correlationId: CorrelationId, chunk: ChunkData): util.Collection[ImageData] = Collections.singletonList(chunk) private def decodeMJPEGFrames(correlationId: CorrelationId, chunk: ChunkData): util.Collection[ImageData] = mjpegDecoder.decodeFrames(correlationId, chunk) }
By now, we’ve decoded as many frames as we could given the new chunk of data. However, the following components do not operate on collections of frames, but on a single frame. Therefore, we must split the Message
of Collection[ImageData]
into as many Message
s of ImageData
as there are elements in the collection. Spring Integration’s *splitter* splits a Message
by default with a Collection[A]-
type payload into multiple Message
s, each containing an element of the original collection. We then take each decoded frame and pass it on to the AMQP outbound endpoint, remembering to map all outgoing and incoming headers. We also set the maximum timeout.
When we receive a response from the native code at the end of the RabbitMQ queues, we have an Array[Byte]
that we needs to be converted to a String
. Finally, we execute the onCoinResponse
method of the recogServiceActivator
bean. We represent the chain in XML as:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="..."> <int:chain input-channel="recogRequest"> <int:service-activator method="decodeFrame" ref="chunkDecoder"/> <int:splitter apply-sequence="false"/> <int-amqp:outbound-gateway exchange-name="sogx.exchange" routing-key="sogx.recog.key" reply-timeout="250" mapped-reply-headers="*" mapped-request-headers="*" amqp-template="amqpTemplate"/> <int:object-to-string-transformer/> <int:service-activator ref="recogServiceActivator" method="onCoinResponse"/> </int:chain> <rabbit:connection-factory id="connectionFactory" host="localhost" channel-cache-size="10" /> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/> <rabbit:queue name="sogx.recog.queue" declared-by="rabbitAdmin"/> <rabbit:direct-exchange name="sogx.exchange" declared-by="rabbitAdmin"> <rabbit:bindings> <rabbit:binding queue="sogx.recog.queue" key="sogx.recog.key" /> </rabbit:bindings> </rabbit:direct-exchange> </beans>
I have included the minimal RabbitMQ configuration, assuming that the RabbitMQ server runs on localhost
and does not require authentication.
NEXT PAGE >> Command-line Application
Command-line Application
Now it's time to wire together a command-line application. To do so, we need to provide configuration for the Spring Framework to create all the beans that make up our application. The task is made slightly more difficult by the fact that we will want to use the same structure for the web application. We shall define the core beans in the CoreConfig
trait, which will provide default implementations of all the @Bean
s.
trait CoreConfig { // "boring" beans @Bean def mjpegDecoder() = new MJPEGDecoder() @Bean def chunkDecoder() = new ChunkDecoder(mjpegDecoder()) @Bean def recogService(): RecogService = new RecogService(recogRequest()) // abstract beans @Bean def asyncExecutor(): Executor @Bean def recogServiceActivator(): RecogServiceActivator // SI channel @Bean def recogRequest() = new DirectChannel() // the message converter for the payloads @Bean def messageConverter() = new DelegatingJsonMessageConverter( new MappingJackson2MessageConverter()) // the channel that connects to the WS clients @Bean def dispatchChannel() = new ExecutorSubscribableChannel( asyncExecutor()) // MessagingTemplate (and MessageChannel) to dispatch messages to // for further processing // All MessageHandler beans above subscribe to this channel @Bean def dispatchMessagingTemplate(): SimpMessageSendingOperations = { val template = new SimpMessagingTemplate(dispatchChannel()) template.setMessageConverter(messageConverter()) template } }
Unfortunately, the CoreConfig
trait is not sufficient to boot a Spring application. We need to create a class
that mixes in this trait and implements the asyncExecutor()
and recogServiceActivator()
methods. We then sprinkle it with a few more annotations.
@Configuration @ImportResource( Array("classpath:/META-INF/spring/integration/module-context.xml")) class App extends CoreConfig { @Bean def asyncExecutor(): Executor = new SyncTaskExecutor @Bean def recogServiceActivator() = new RecogServiceActivator { def onCoinResponse(@Header correlationId: CorrelationId, @Payload coins: CoinResponse): Unit = println(">>> " + correlationId + ": " + coins) } }
class App
is sufficient to use as configuration for the AnnotationConfigApplicationContext
, and doing so gives us a full Spring application.
// Create the Spring ApplicationContext implementation, // register the @Configuration class and load it val ctx = new AnnotationConfigApplicationContext() ctx.register(classOf[App]) ctx.refresh() // Grab the created RecogService implementation val recogService = ctx.getBean(classOf[RecogService]) recogService.mjpegChunk(UUID.randomUUID().toString)(...) // clean up ctx.close()
This is the core of the CLI object, which contains a convenient command loop.
object Cli extends App { import Commands._ import Utils.reader._ @Configuration @ImportResource( Array("classpath:/META-INF/spring/integration/module-context.xml")) class App extends CoreConfig { ... } @tailrec def commandLoop(): Unit = { Console.readLine() match { case QuitCommand => return case ImageCommand(fileName) => readAll(fileName) (recogService.imageChunk(UUID.randomUUID().toString)) case MJPEGCommand(fileName, fps) => readChunks(fileName, fps) (recogService.mjpegChunk(UUID.randomUUID().toString)) case null => // do nothing case _ => println("wtf??") } // in tail position commandLoop() } // Create the Spring ApplicationContext implementation, // register the @Configuration class and load it val ctx = new AnnotationConfigApplicationContext() ctx.register(classOf[App]) ctx.refresh() // Grab the created RecogService implementation val recogService = ctx.getBean(classOf[RecogService]) // start processing the user input commandLoop() // clean up ctx.close() }
Native Components
Assuming you have the RabbitMQ broker running, you can now execute this application and issue the mjpeg:/coins2.mjpeg
command. Unfortunately, this will end with a message indicating an error in chain handler. It’s because we do not have the native components running.
The native component is implemented in C++ using OpenCV. We need to implement its RabbitMQ RCP server, where we use the computer vision code (in coins.cpp
).
class Main : public RabbitRpcServer { private: CoinCounter coinCounter; protected: virtual std::string handleMessage( const AmqpClient::BasicMessage::ptr_t message, const AmqpClient::Channel::ptr_t channel); public: Main(const std::string queue, const std::string exchange, const std::string routingKey); }; Main::Main(const std::string queue, const std::string exchange, const std::string routingKey) : RabbitRpcServer::RabbitRpcServer(queue, exchange, routingKey) { } std::string Main::handleMessage(const AmqpClient::BasicMessage::ptr_t message, const AmqpClient::Channel::ptr_t channel) { // return a std::string that represents the result return "{\"succeeded\":false}"; } int main(int argc, char** argv) { Main main("sogx.recog.queue", "sogx.exchange", "sogx.recog.key"); main.runAndJoin(8); return 0; }
The code shows the core concepts: we inherit from RabbitRpcServer
and implement the handleMessage
method. The code above shows the minimal implementation, which is insufficient. The full implementation uses the CoinCounter
to perform the computer vision code.
std::string Main::handleMessage(const AmqpClient::BasicMessage::ptr_t message, const AmqpClient::Channel::ptr_t channel) { Json::Object responseJson; try { // get the message, read the image ImageMessage imageMessage(message); auto imageData = imageMessage.headImage(); auto imageMat = cv::imdecode(cv::Mat(imageData), 1); // ponies & unicorns Jzon::Array coinsJson; auto result = coinCounter.count(imageMat); for (auto i = result.coins.begin(); i != result.coins.end(); ++i) { Jzon::Object coinJson; Jzon::Object centerJson; centerJson.Add("x", i->center.x); centerJson.Add("y", i->center.y); coinJson.Add("center", centerJson); coinJson.Add("radius", i->radius); coinsJson.Add(coinJson); } #ifdef WITH_RINGS responseJson.Add("hasRing", result.hasRing); #endif responseJson.Add("coins", coinsJson); responseJson.Add("succeeded", true); } catch (std::exception &e) { // bantha poodoo! std::cerr << e.what() << std::endl; responseJson.Add("succeeded", false); } catch (...) { // more bantha fodder! responseJson.Add("succeeded", false); } Jzon::Writer writer(responseJson, Jzon::NoFormat); writer.Write(); return writer.GetResult(); }
Now you run the native/recog
application and issue the same mjpeg:/coins2.mjpeg
command, and the application will succeed and print out the coin responses.
NEXT PAGE >> The Web Application
Web Application
Unfortunately, seeing some text on standard output is not quite what the users expect in 2013. (When did they get so soft? I remember when I was 20 years old… Oh, never mind!) We would like to provide a nice responsive web application that allows the code to *push* messages to it rather than polling for changes. WebSocket is a perfect solution for this. To make it even easier for us to handle, we are going to use the SockJS library and use the STOMP protocol over WebSocket!
We are simply going to “front” our RecogService
by using a RecogController
, and we are going to provide a special implementation of the RecogServiceActivator.
This implementation is going to remember the responses for all recog sessions. And, as we’ll see later, each change to the sessions is going to push a message out over the WebSocket.
The implementation of the RecogController
uses, in addition to the familiar Spring MVC annotations, the new Spring messaging annotations.
@Controller class RecogController @Autowired()(recogService: RecogService, recogSessions: RecogSessions) { @MessageMapping(Array("/app/recog/image")) def image(@SessionId sessionId: RecogSessionId, @MessageBody body: ChunkData): Unit = { recogService.imageChunk(sessionId.value)(body) } @MessageMapping(Array("/app/recog/mjpeg")) def mjpeg(@SessionId sessionId: RecogSessionId, @MessageBody body: ChunkData): Unit = { recogService.mjpegChunk(sessionId.value)(body) } @RequestMapping(Array("/app/predef/image")) @ResponseBody def foo(): String = { val id = UUID.randomUUID().toString Utils.reader.readAll("/coins2.png")(recogService.imageChunk(id)) recogSessions.sessionEnded(RecogSessionId(id)) "image" } @RequestMapping(Array("/app/predef/coins")) @ResponseBody def bar(@RequestParam(defaultValue = "10") fps: Int): String = { val id = UUID.randomUUID().toString Utils.reader.readChunks("/coins2.mjpeg", fps)(recogService.mjpegChunk(id)) recogSessions.sessionEnded(RecogSessionId(id)) "coins" } }
The image
and mjpeg
methods will be executed when we receive the message over the WebSocket at the given URL from our iOS application. We then pass the body of the message to the RecogService
and then down the chain we have already explored. The implementation of the RecogSessions
depends on the SimpMessageSendingOperations
, which is part of the Spring messaging core. It provides an entry point to the underlying message bus.
class RecogSessions(messageSender: SimpMessageSendingOperations) { val sessions = new util.HashMap[RecogSessionId, CoinResponse]() def onCoinResponse(correlationId: CorrelationId, coins: CoinResponse): Unit = { sessions.put(RecogSessionId(correlationId.value), coins) sendSessions() } def sessionEnded(sessionId: RecogSessionId): Unit = { sessions.remove(sessionId) sendSessions() } private def sendSessions(): Unit = messageSender.convertAndSend("/topic/recog/sessions", sessions.values().toString) }
Messaging
This brings us to the messaging buses. Our application is going to use two main buses: one specifically for the WebSocket messages, and one for general messages.
Let’s explore the life of an incoming MJPEG chunk from the iOS client. The message hits the MessagingWebSocketHandler
first through its SimpleUrlHandlerMapping
. The MessagingWebSocketHandler
builds a message from the incoming WebSocket frame and sends it to the dispatchChannel
. From there, both subscribers see the message, but only the AnnotationMessageHandler
can process it. The AnnotationMessageHandler
is aware of all @Controller
–annotated beans and can execute the methods in these controllers.
As a result, our RecogController#image
method executes, invoking the mjpegChunk
method of the RecogService
. This activates our chain through RabbitMQ, our OpenCV application and back to RecogServiceActivator
, which then calls the onCoinResponse
method of the RecogSessions
bean. This bean updates its sessions
map and puts a message on the dispatchChannel
.
The messages go to both the SimpleBrokerMessageHandler
and AnnotationMessageHandler
subscribers, but this time, only the SimpleBrokerMessageHandler
can process the message. The processing involves placing it on the webSocketHandlerChannel
, where it gets picked up by the SubProtocolWebSocketHandler
and heads over to our application running in the browser.
In the code, we will need the WebConfig
trait, which defines a self-type dependency on the CoreConfig
. This expresses that the concrete implementations of WebConfig
must also mix into the CoreConfig
.
trait WebConfig { // require instances to be mixed in with CoreConfig this: CoreConfig => // Channel for sending STOMP messages to connected WebSocket // sessions (mostly for internal use) @Bean def webSocketHandlerChannel(): SubscribableChannel = new ExecutorSubscribableChannel(asyncExecutor()) @Bean def taskScheduler(): TaskScheduler = { val taskScheduler = new ThreadPoolTaskScheduler() taskScheduler.setThreadNamePrefix("SockJS-") taskScheduler.setPoolSize(4) taskScheduler.afterPropertiesSet() taskScheduler } // MessageHandler that acts as a "simple" message broker @Bean def simpleBrokerMessageHandler(): SimpleBrokerMessageHandler = { val handler = new SimpleBrokerMessageHandler( webSocketHandlerChannel(), util.Arrays.asList("/topic/", "/queue/")) dispatchChannel().subscribe(handler) handler } // WS -[SockJS]-> /sockjs/** ~> sockJsSocketHandler // SockJS WS handler mapping @Bean def sockJsHandlerMapping(): SimpleUrlHandlerMapping = { val handler = new SubProtocolWebSocketHandler(dispatchChannel()) handler.setDefaultProtocolHandler(new StompProtocolHandler()) webSocketHandlerChannel().subscribe(handler) val sockJsService = new DefaultSockJsService(taskScheduler()) val requestHandler = new SockJsHttpRequestHandler(sockJsService, handler) val hm = new SimpleUrlHandlerMapping() hm.setOrder(-2) hm.setUrlMap(Collections.singletonMap("/sockjs/**", requestHandler)) hm } // WS -[Raw]-> /websocket/** ~> websocketSocketHandler // Raw WS handler mapping @Bean def webSocketHandlerMapping(): SimpleUrlHandlerMapping = { val handler = new MessagingWebSocketHandler(dispatchChannel()) { override def afterConnectionClosed(session: WebSocketSession, closeStatus: CloseStatus) { recogSessions().sessionEnded(RecogSessionId(session.getId)) } } handler.setUriPrefix("/websocket/") val requestHandler = new WebSocketHttpRequestHandler(handler) val hm = new SimpleUrlHandlerMapping() hm.setOrder(-1) hm.setUrlMap(Collections.singletonMap("/websocket/**", requestHandler)) hm } // MessageHandler for processing messages by delegating to // @Controller annotated methods @Bean def annotationMethodMessageHandler(): AnnotationMethodMessageHandler = { val handler = new AnnotationMethodMessageHandler( dispatchMessagingTemplate(), webSocketHandlerChannel()) handler.setCustomArgumentResolvers( util.Arrays.asList(new SessionIdMehtodArgumentResolver)) handler.setDestinationPrefixes(util.Arrays.asList("/app/")) handler.setMessageConverter(messageConverter()) dispatchChannel().subscribe(handler) handler } }
The only thing that remains is to provide the implementation that mixes in the WebConfig
and CoreConfig
traits, and also, some configuration for the XML-less web applications. Let’s begin with the Webapp
class.
@Configuration @EnableWebMvc @ComponentScan(basePackages=Array("org.eigengo.sogx")) class Webapp extends WebMvcConfigurerAdapter with WebConfig with CoreConfig { @Bean def asyncExecutor() = { val executor = new ThreadPoolTaskExecutor executor.setCorePoolSize(4) executor.setCorePoolSize(8) executor.setThreadNamePrefix("MessageChannel-") executor } @Bean def recogServiceActivator() = new RecogServiceActivator { def onCoinResponse(@Header correlationId: CorrelationId, @Payload coins: CoinResponse): Unit = recogSessions().onCoinResponse(correlationId, coins) } // Allow serving HTML files through the default Servlet override def configureDefaultServletHandling( configurer: DefaultServletHandlerConfigurer) = { configurer.enable() } }
To complete the picture, we implement the DispatcherServletInitializer
for XML-less web application that we shall deploy into Jetty.
class DispatcherServletInitializer extends AbstractAnnotationConfigDispatcherServletInitializer { protected def getRootConfigClasses: Array[Class[_]] = { Array[Class[_]](classOf[Webapp]) } protected def getServletConfigClasses: Array[Class[_]] = { Array[Class[_]](classOf[Webapp]) } protected def getServletMappings: Array[String] = { Array[String]("/") } protected override def customizeRegistration( registration: ServletRegistration.Dynamic): Unit = { registration.setInitParameter("dispatchOptionsRequest", "true") } }
AngularJS Application
To complete the picture, we provide the user interface in an AngularJS application. The main component, SessionsCtrl
uses the SockJS library to establish a STOMP-based connection over a WebSocket.
function SessionsCtrl($scope) { // initialization $scope.sessions = []; // Connect to the server on path /sockjs and then create // the STOMP protocol client var socket = new SockJS('/sockjs'); var stompClient = Stomp.over(socket); stompClient.connect('', '', function(frame) { // receive notifications on the recog/sessions topic stompClient.subscribe("/topic/recog/sessions", function(message) { $scope.$apply(function() { $scope.sessions = angular.fromJson(message.body); }); }); }, function(error) { console.log("STOMP protocol error " + error); } ); }
The actual HTML is as close as it gets to example AngularJS applicaiton.
< !doctype html> <html ng-app="coins"> <head> <title>Coin counter</title> <meta http-equiv="Cache-Control" content="no-store, no-cache, must-revalidate, max-age=0"/> <!-- jQuery --> <script src="assets/js/jquery-2.0.3.min.js"></script> <!-- Bootstrap --> <link href="assets/css/bootstrap.min.css" rel="stylesheet"/> <script src="assets/js/bootstrap.min.js"></script> <!-- WS --> <script src="assets/js/sockjs-0.3.4.js"></script> <script src="assets/js/stomp.js"></script> <!-- Application & AngularJS --> <link href="assets/css/coins.css" rel="stylesheet"/> <script src="assets/js/angular.min.js"></script> <script src="assets/js/sessions.js"></script> <script src="assets/js/components.js"></script> </head> <body> <div ng-controller="SessionsCtrl"> <tabs> <pane title="Raw"> <h3>Raw data</h3> <pre>{{sessions}}</pre> </pane> <pane title="Canvas"> <h3>Visual representation</h3> <div ng-repeat="coins in sessions"> <canvas display="{{coins}}" fill="red" scale="0.4" width="500" height="386"></canvas> </div> </pane> </tabs> </div> </body> </html>
This completes the application. You can use the iOS application to send images over binary WebSockets to the Spring-based application. The Spring code accepts the messages and routes them to the AMQP broker to be processed by our native (CPU or GPU) computer vision code. The responses are then routed back to the applications running in the browsers using WebSockets.
Messaging Simulation
Before I let you see the source code, let me show you the RabbitMQ simulator showing the flow of the messages. The video shows that the Spring application produces the messages carrying the bytes that make up the frames, sending them to the sogx.exchange
using the sogx.recog.key
routing key. This means that they arrive on the sogx.recog.queue
and are consumed by the consumers (the video shows 2 consumers, but we actually have 8). Because the Spring application needs to receive the responses, it creates a temporary queue with a generated name, and it expects the responses to arrive on that queue. And so, when the recog
application sends the replies, they arrive on the temporary queue where they are consumed by the Spring AMQP receivers.
RabbitMQ Simulator from Cake Solutions Ltd. on Vimeo.
Summary and Code
You can follow the code by groll
ing through the commits in https://github.com/eigengo/springone2gx2013. The README.md contains useful information for building the application. The most important thing to remember (as of 14th September 2013) is that you will need the nightly build of the Spring Framework.
Published at DZone with permission of Jan Machacek, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
How to Submit a Post to DZone
-
DZone's Article Submission Guidelines
-
Avoiding Pitfalls With Java Optional: Common Mistakes and How To Fix Them [Video]
-
Extending Java APIs: Add Missing Features Without the Hassle
Comments