Introduction to Reactive Extensions In Scala for Stream Processing
Want to learn more about Reactive Extensions in Scala for stream processing of data? Check out this post to learn more!
Join the DZone community and get the full member experience.
Join For FreeToday, I will talk about a very powerful API in Scala. Reactive Extensions, also known as ReactiveX, try to standardize an API around processing a stream of data asynchronously.
From the official website, we got the following quote:
"ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming."
As mentioned, we will look at the Scala implementation, what kind of problems it can solve, and what is so cool about it.
What Are Reactive Extensions?
ReactiveX is simply a set of APIs that allow us to build programs that process streams of data asynchronously. They allow applications to react to data as it becomes available. In many cases, the data in the stream is events. So, we will talk more about event streams.
In a Nutshell
With Reactive Extensions, we can do programs that follow:
Event-Driven programming
Reactive programming
Hello World Example
Let's start with a basic build.sbt:
name := "reactive"
version := "1.0"
scalaVersion := "2.12.6"
libraryDependencies += "io.reactivex" %% "rxscala" % "0.26.5"
Now, let's take a look at the code. The following program will print Hello World in the console.
package org.caciquecode.reactive
import rx.lang.scala._
object IamReactive extends App {
val words = Observable.from(Seq("Hello ", "World"))
words.subscribe(print(_))
}
In that small chunk of code, we are looking already at the main API of ReactiveX. This is the Observable. An Observable is an object capable of producing a stream of events that other objects (called Observers) can subscribe to, in order to somehow consume those events.
In our example, we created an Observable with a static bounded stream of two strings. And, we subscribed to the stream using the subscribe method and passing a function to it. The function is then called for every event in the stream.
The example is obviously nothing special, and it is not even asynchronous. The real strength of ReactiveX comes when you don't know when events will actually happen, and you subscribe to the Observable to get notified when they actually do happen.
For example, let's create an Observable that continuously updates its subscribers with news from the BBC every five seconds (we won't parse the responses).
Note: Please don't run the following programs too often to not overwhelm BBC or Sky!
package org.caciquecode.reactive
import rx.lang.scala._
import scala.concurrent.Future
import scala.io.Source
import scala.concurrent.ExecutionContext.Implicits.global
object IamReactive extends App {
val news = Observable[String] { observer =>
Future {
while (true) {
val newsFeed = Source.fromURL("http://feeds.bbci.co.uk/news/rss.xml").mkString
observer.onNext(newsFeed)
Thread.sleep(5000)
}
}
Subscription()
}
news.subscribe(print(_))
Thread.sleep(60000)
println("End")
}
We can see how we created a Future that asynchronously loops forever, and in every loop iteration, the Observable retrieves the news feed from the BBC and notifies any registered Observer with the content of the feed.
The Subscription object returned by the subscription mechanism allows us to unsubscribe from the Observable.
Composing Observables
You might already be excited about Reactive Extensions, Observables, and Observers. The really cool thing is that we can think of Observableseither as a special type of Scala collection (a collection of objects that can arrive asynchronously) or as a special type of Future object (a Future that can be "completed" more than once). We can combine and compose Observable objects to obtain more sophisticated Observables.
Let's take the previous code, but this time we want to combine the BBC feeds with the ones from Sky News. Then, we only need to subscribe to the feeds that contain the word teenager on it, and we want to capitalize the whole feed and take just the top five feeds (as in order)
package org.caciquecode.reactive
import rx.lang.scala._
import scala.concurrent.Future
import scala.io.Source
import scala.concurrent.ExecutionContext.Implicits.global
object IamReactive extends App {
val news = Observable[String] { observer =>
Future {
while (true) {
val newsFeed = Source.fromURL("http://feeds.bbci.co.uk/news/rss.xml").mkString
observer.onNext(newsFeed)
Thread.sleep(5000)
}
}
Subscription()
}
val skyNews = Observable[String] { observer =>
Future {
while (true) {
val newsFeed = Source.fromURL("http://feeds.skynews.com/feeds/rss/uk.xml").mkString
observer.onNext(newsFeed)
Thread.sleep(5000)
}
}
Subscription()
}
(news ++ skyNews)
.flatMapIterable(_.split("/item"))
.filter(_.toLowerCase.contains("teenager"))
.map(_.toUpperCase)
.take(5)
.subscribe(println(_))
Thread.sleep(60000)
println("End")
}
Let's ignore the code repetition from the example (as is not the focus of the post). In my current run, I got the following results every five seconds.
This takes a look at one article from BBC and one from Sky. How cool is that? Imagine trying to do something similar with callbacks. You would quickly arrive at a callback nightmare.
This is just an introduction to Reactive Extensions, and there is a lot more you can do. I recommend you take a more formal look at the main project here and the Scala implementation here.
Opinions expressed by DZone contributors are their own.
Comments