DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Coding
  3. Languages
  4. Introduction to Reactive Extensions In Scala for Stream Processing

Introduction to Reactive Extensions In Scala for Stream Processing

Want to learn more about Reactive Extensions in Scala for stream processing of data? Check out this post to learn more!

Carlo Scarioni user avatar by
Carlo Scarioni
·
Aug. 15, 18 · Presentation
Like (2)
Save
Tweet
Share
8.56K Views

Join the DZone community and get the full member experience.

Join For Free

Today, I will talk about a very powerful API in Scala. Reactive Extensions, also known as ReactiveX, try to standardize an API around processing a stream of data asynchronously.

From the official website, we got the following quote:

"ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming."

As mentioned, we will look at the Scala implementation, what kind of problems it can solve, and what is so cool about it.

What Are Reactive Extensions?

ReactiveX is simply a set of APIs that allow us to build programs that process streams of data asynchronously. They allow applications to react to data as it becomes available. In many cases, the data in the stream is events. So, we will talk more about event streams.

In a Nutshell

With Reactive Extensions, we can do programs that follow:

Event-Driven programming

Reactive programming

Hello World Example

Let's start with a basic build.sbt:

name := "reactive" 
version := "1.0" 
scalaVersion := "2.12.6" 

libraryDependencies += "io.reactivex" %% "rxscala" % "0.26.5"


Now, let's take a look at the code. The following program will print Hello World in the console.

package org.caciquecode.reactive

import rx.lang.scala._

object IamReactive extends App {
  val words = Observable.from(Seq("Hello ", "World"))
  words.subscribe(print(_))
}


In that small chunk of code, we are looking already at the main API of ReactiveX. This is the Observable. An Observable is an object capable of producing a stream of events that other objects (called Observers) can subscribe to, in order to somehow consume those events.

In our example, we created an Observable with a static bounded stream of two strings. And, we subscribed to the stream using the subscribe method and passing a function to it. The function is then called for every event in the stream.

The example is obviously nothing special, and it is not even asynchronous. The real strength of ReactiveX comes when you don't know when events will actually happen, and you subscribe to the Observable to get notified when they actually do happen.

For example, let's create an Observable that continuously updates its subscribers with news from the BBC every five seconds (we won't parse the responses).

Note: Please don't run the following programs too often to not overwhelm BBC or Sky!

package org.caciquecode.reactive

import rx.lang.scala._

import scala.concurrent.Future
import scala.io.Source
import scala.concurrent.ExecutionContext.Implicits.global

object IamReactive extends App {
  val news = Observable[String] { observer =>
    Future {
      while (true) {
        val newsFeed = Source.fromURL("http://feeds.bbci.co.uk/news/rss.xml").mkString
        observer.onNext(newsFeed)
        Thread.sleep(5000)
      }
    }
    Subscription()
  }
  news.subscribe(print(_))
  Thread.sleep(60000)
  println("End")
}


We can see how we created a Future that asynchronously loops forever, and in every loop iteration, the Observable retrieves the news feed from the BBC and notifies any registered Observer with the content of the feed.

The Subscription object returned by the subscription mechanism allows us to unsubscribe from the Observable.

Composing Observables

You might already be excited about Reactive Extensions, Observables, and Observers.  The really cool thing is that we can think of Observableseither as a special type of Scala collection (a collection of objects that can arrive asynchronously) or as a special type of Future object (a Future that can be "completed" more than once). We can combine and compose Observable objects to obtain more sophisticated Observables.

Let's take the previous code, but this time we want to combine the BBC feeds with the ones from Sky News. Then, we only need to subscribe to the feeds that contain the word teenager on it, and we want to capitalize the whole feed and take just the top five feeds (as in order)

package org.caciquecode.reactive

import rx.lang.scala._

import scala.concurrent.Future
import scala.io.Source
import scala.concurrent.ExecutionContext.Implicits.global

object IamReactive extends App {
  val news = Observable[String] { observer =>
    Future {
      while (true) {
        val newsFeed = Source.fromURL("http://feeds.bbci.co.uk/news/rss.xml").mkString
        observer.onNext(newsFeed)
        Thread.sleep(5000)
      }
    }
    Subscription()
  }

  val skyNews = Observable[String] { observer =>
    Future {
      while (true) {
        val newsFeed = Source.fromURL("http://feeds.skynews.com/feeds/rss/uk.xml").mkString
        observer.onNext(newsFeed)
        Thread.sleep(5000)
      }
    }
    Subscription()
  }

  (news ++ skyNews)
    .flatMapIterable(_.split("/item"))
    .filter(_.toLowerCase.contains("teenager"))
    .map(_.toUpperCase)
    .take(5)
    .subscribe(println(_))

  Thread.sleep(60000)
  println("End")
}


Let's ignore the code repetition from the example (as is not the focus of the post). In my current run, I got the following results every five seconds.

Screen Shot 2018-08-06 at 22.03.35.png

This takes a look at one article from BBC and one from Sky. How cool is that? Imagine trying to do something similar with callbacks. You would quickly arrive at a callback nightmare.

This is just an introduction to Reactive Extensions, and there is a lot more you can do. I recommend you take a more formal look at the main project here and the Scala implementation here.

Stream processing Scala (programming language)

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • 5 Common Firewall Misconfigurations and How to Address Them
  • How Elasticsearch Works
  • How To Select Multiple Checkboxes in Selenium WebDriver Using Java
  • Building the Next-Generation Data Lakehouse: 10X Performance

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: