Over a million developers have joined DZone.

Use Reactive Streams API to Combine akka-streams With rxJava

DZone's Guide to

Use Reactive Streams API to Combine akka-streams With rxJava

· Java Zone
Free Resource

Microservices! They are everywhere, or at least, the term is. When should you use a microservice architecture? What factors should be considered when making that decision? Do the benefits outweigh the costs? Why is everyone so excited about them, anyway?  Brought to you in partnership with IBM.

Just a quick article this time, since I'm still experimenting with this stuff. There is a lot of talk around reactive programming. In Java 8 we've got the Stream API, we got rxJava we got ratpack and Akka has got akka-streams.

The main issue with these implementations is that they aren't compatible. You can't connect the subscriber of one implementation to the publisher of another. Luckily an initiative has started to provide a way that these different implementations can work together:

"It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application."

From - http://www.reactive-streams.org/

How does this work

Now how do we do this? Lets look at a quick example based on the akka-stream provided examples (from here). In the following listing

package sample.stream
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source}
import com.google.common.collect.{DiscreteDomain, ContiguousSet}
import rx.RxReactiveStreams
import rx.Observable;
import scala.collection.JavaConverters._
object BasicTransformation {
  def main(args: Array[String]): Unit = {
    // define an implicit actorsystem and import the implicit dispatcher
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
    // flow materializer determines how the stream is realized.
    // this time as a flow between actors.
    implicit val materializer = FlowMaterializer()
    // input text for the stream.
    val text =
      """|Lorem Ipsum is simply dummy text of the printing and typesetting industry.
         |Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, 
         |when an unknown printer took a galley of type and scrambled it to make a type 
         |specimen book.""".stripMargin
    // create an observable from a simple list (this is in rxjava style)
    val first = Observable.from(text.split("\\s").toList.asJava);
    // convert the rxJava observable to a publisher
    val publisher = RxReactiveStreams.toPublisher(first);
    // based on the publisher create an akka source
    val source = PublisherSource(publisher);
    // now use the akka style syntax to stream the data from the source
    // to the sink (in this case this is println)
      map(_.toUpperCase).                 // executed as actors
      filter(_.length > 3).
      foreach { el =>                     // the sink/consumer
      onComplete(_ => system.shutdown())  // lifecycle event

The code comments in this example explain pretty much what is happening. What we do here is we create a rxJava based Observable. Convert this Observable to a "reactive streams" publisher and use this publisher to create an akka-streams source. For the rest of the code we can use the akka-stream style flow API to model the stream. In this case we just do some filtering and print out the result.

Discover how the Watson team is further developing SDKs in Java, Node.js, Python, iOS, and Android to access these services and make programming easy. Brought to you in partnership with IBM.


Published at DZone with permission of Jos Dirksen, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.


Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.


{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}