{{announcement.body}}
{{announcement.title}}

Data Streaming in OSGi R7 applications With OSGi R7 Push Stream and Server Sent Events

DZone 's Guide to

Data Streaming in OSGi R7 applications With OSGi R7 Push Stream and Server Sent Events

Learn how OSGi R7 Push Stream, OSGi R7 HTTP Whiteboard, and OSGi R7 JAX-RS Whiteboard can be applied to provide data streaming functionality in your web application.

· Web Dev Zone ·
Free Resource

Image title

Providing users of your application with feedback regarding long running operations used to require utilizing pull based solutions such as polling. In modern web applications, however, utilizing push based data streaming solutions to provide such feedback is the norm. But what options do we have if we’re building modern cloud native OSGi applications and we wish to implement such monitoring solutions?

In this article, I will show you how implementations of some of the latest OSGi specifications — i.e. OSGI R7 Push Stream, OSGi R7 HTTP Whiteboard, and OSGi R7 JAX-RS Whiteboard  can be applied, along with Server Sent Events, to implement such push based data streaming solutions. So, instead of having a background JavaScript worker slamming the server with requests for status every few seconds, we’ll have the server push data updates and have them displayed onto our GUI.

All of the examples presented in this article are based on a complete application I put together, available for cloning and deploying yourself at https://github.com/ideas-into-software/automated-linguistic-analysis

To keep our focus and length of this article reasonable, let’s skip the basics such as DAO layer and other mundane parts — we have them working already and we can move on to these higher layers. The ‘service-status-impl’ https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/service-status-impl , ‘rest-status’ https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/rest-status and ‘rest-common’ https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/rest-common modules contain all the code referred to in this article.

1. Starting with the service layer, in the ‘service-status-impl’ (https://github.com/ideas-into-
software/automated-linguistic-analysis/tree/master/service-status-impl
) module you will find the software.into.ala.service.status.impl.StatusUpdatesServiceImpl class responsible for pushing events received onto an OSGi R7 Push Stream instance.

2. Here, we have the StatusUpdatesConsumer private class implemented as Runnable, which uses Camel’s consumer template to receive messages from a RabbitMQ topic pertaining to particular file being processed and pushes these onto event source, i.e.:

try {
  ConsumerTemplate consumerTemplate = getCamelContext().createConsumerTemplate();
  Endpoint endpoint = messagingService.registerStatusUpdatesEndpoint(fileId);
  FileMessageDTO fmDTO = consumerTemplate.receiveBody(endpoint,
  FileMessageDTO.class);
  while (fmDTO != null) {
    simplePushEventSource.publish(fmDTO);
    if (isProcessingFinished(fmDTO)) {
      break;
    }
    fmDTO = consumerTemplate.receiveBody(endpoint, FileMessageDTO.class);
  }
  consumerTemplate.cleanUp();
  simplePushEventSource.endOfStream();
} catch (Exception e) {
  simplePushEventSource.error(e);
}

3. In the same software.into.ala.service.status.impl.StatusUpdatesServiceImpl class we have the   software.into.ala.service.status.impl.StatusUpdatesServiceImpl.getStatusUpdates(String) method which starts a new instance of StatusUpdatesConsumer to receive status updates for a particular file being processed, then creates and returns a push based stream of processing status updates obtained from the push event source, i.e.:

public PushStream<String> getStatusUpdates(String fileId) {
  Objects.requireNonNull(fileId, "File ID must be specified!");
  fileProcessingStatusConsumerExec.execute(new StatusUpdatesConsumer(fileId));
  return pushStreamProvider.createStream(simplePushEventSource).map(fmDTO -> fmDTO.status);
}

4. Now, we move to the ‘rest-status’ module
(https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/rest-status)
which contains  software.into.ala.rest.status.StatusUpdatesRestController . It registers itself as a service with the JAX-RS Whiteboard using the   org.osgi.service.jaxrs.whiteboard.propertytypes.JaxrsResource  annotation, i.e.:

@JaxrsResource
public class StatusUpdatesRestController {
  (...)
}

5. Here, in  the software.into.ala.rest.status.StatusUpdatesRestController.getStatusUpdates(SseEventSink,String)  method, which produces Server Sent Events, we call the  software.into.ala.service.status.StatusUpdatesService.getStatusUpdates(String)  method from the service layer below (‘service-status-impl’ module) to receive status update push notifications — registering method to execute for each event received, as well as two callback methods for failure and resolved scenarios, i.e.:

PushStream<String> fPushStream = statusUpdateService.getStatusUpdates(fileId);
fPushStream.forEach(this::deliverEvent).onFailure(this::failure).onResolve(this::resolved);

6. The action being executed for each event received is handled by the  software.into.ala.rest.status.StatusUpdatesRestController.deliverEvent(String)  method, which simply wraps each event in OutboundSseEvent  and sends it via the  javax.ws.rs.sse.SseEventSink.send(OutboundSseEvent)  method.

7. The two callback methods — software.into.ala.rest.status.StatusUpdatesRestController.failure(Throwable)  for handling failures and software.into.ala.rest.status.StatusUpdatesRestController.resolved()  called when Promise is resolved — are very similar in that they also wrap these different types of events in OutboundSseEvent and send them via the javax.ws.rs.sse.SseEventSink.send(OutboundSseEvent)  method.

8. Finally, our GUI resides in the ‘rest-common’
https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/rest-common
module. Here, the software.into.ala.rest.common.SinglePageApp  class registers static
resources to be served with the HTTP Whiteboard using the  org.osgi.service.http.whiteboard.propertytypes.HttpWhiteboardResource annotation, i.e.:

@HttpWhiteboardResource(pattern = "/spa/*", prefix = "static")

9. The above will result in picking up all of the files which comprise our single page application (i.e. HTML5, CSS3, JavaScript, and SVG files), source residing in the ‘src/main/resources/static’ folder in the ‘rest-common’ module, and serving them via http://IP:PORT/spa/index.html in a running application.

10. It’s also here that we use JavaScript’s EventSource, wrapped in the jQuery Plugin for Server-Sent Events (SSE). Therefore, we include the jQuery and jQuery SSE dependencies as well as our custom JavaScript contained in  ala.js, where we have the monitorStatus(fileId) method responsible for receiving each of these events and passing them to displayStatusMessage method to display each event in GUI.

As perhaps you’ve noticed, even though it’s been over a year since OSGi R7 specifications were released (April 2018), along with implementations for some of those specifications, it is unfortunately still very difficult or impossible to find in public repositories actual applications utilizing these technologies. This article attempted to fill this hole of lack and hopefully it succeeded in doing so.


Topics:
osgi ,data streaming ,server sent events ,javascript ,web application ,web dev ,java web development

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}